Compare commits
2 Commits
4f7d2f018b
...
315f1470ec
Author | SHA1 | Date |
---|---|---|
TANGWY | 315f1470ec | 4 months ago |
TANGWY | b97f549d89 | 4 months ago |
@ -1,38 +1,150 @@ |
|||||||
{ |
{ |
||||||
"white_list": { |
"white_list": { |
||||||
"ip": [ |
"ip": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"account": [ |
"account": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"interface": [ |
"interface": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"menu": [ |
"menu": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
] |
] |
||||||
}, |
}, |
||||||
"grey_list": { |
"grey_list": { |
||||||
"ip": [ |
"ip": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"account": [ |
"account": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"interface": [ |
"interface": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
], |
], |
||||||
"menu": [ |
"menu": [ |
||||||
400000, |
510400, |
||||||
400001 |
510401, |
||||||
|
510402, |
||||||
|
510405, |
||||||
|
510406, |
||||||
|
510407, |
||||||
|
510404, |
||||||
|
510403, |
||||||
|
510030, |
||||||
|
510031, |
||||||
|
510009, |
||||||
|
510008, |
||||||
|
510004, |
||||||
|
510408, |
||||||
|
510410, |
||||||
|
510409 |
||||||
] |
] |
||||||
} |
} |
||||||
} |
} |
@ -1,78 +0,0 @@ |
|||||||
# coding=utf-8 |
|
||||||
""" |
|
||||||
@Author: fu-zhe |
|
||||||
@FileName: user_cron.py |
|
||||||
@DateTime: 2024/5/23 14:19 |
|
||||||
@Description: 用户相关的定时任务 全量获取,对已有数据进行先删除后写入 周期一天 |
|
||||||
""" |
|
||||||
from __future__ import unicode_literals |
|
||||||
|
|
||||||
import random,string |
|
||||||
import traceback |
|
||||||
import time |
|
||||||
from datetime import datetime, timedelta |
|
||||||
import calendar |
|
||||||
from uebaMetricsAnalysis.utils.ext_logging import logger |
|
||||||
from commandCyberRange.utils.db2json import DBUtils, DBType |
|
||||||
from uebaMetricsAnalysis.utils.base_dataclean import entry |
|
||||||
|
|
||||||
JOB_STATUS ={ |
|
||||||
"RUNNING":1, |
|
||||||
"FINISH":2, |
|
||||||
"ERROR":3 |
|
||||||
} |
|
||||||
|
|
||||||
class UserCron: |
|
||||||
def generate_job_id(): |
|
||||||
timestamp = int(time.time() * 1000) |
|
||||||
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7)) |
|
||||||
return str(timestamp) + random_letters |
|
||||||
|
|
||||||
#获取 job的执行时间 开始时间-结束时间 |
|
||||||
def get_job_period(self): |
|
||||||
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1" |
|
||||||
fields=["job_id", "end_time"] |
|
||||||
data = DBUtils.transition(fields, sql, DBType.LIST) |
|
||||||
start_time = '' |
|
||||||
end_time = '' |
|
||||||
if len(data)==0: |
|
||||||
start_time = datetime.datetime.now() - timedelta(minutes=5) |
|
||||||
end_time = datetime.datetime.now() |
|
||||||
if len(data)>0: |
|
||||||
start_time = data[0].get('end_time') |
|
||||||
end_time = data[0].get('end_time') + timedelta(minutes=5) |
|
||||||
if end_time > datetime.datetime.now(): |
|
||||||
return None,None |
|
||||||
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time) |
|
||||||
return start_time,end_time |
|
||||||
|
|
||||||
#处理跨天的场景 |
|
||||||
def adjust_end_time_if_cross_day(self,start_time, end_time): |
|
||||||
if start_time.date() != end_time.date(): |
|
||||||
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999)) |
|
||||||
|
|
||||||
return start_time, end_time |
|
||||||
#每5分钟执行一次 |
|
||||||
def processing(self): |
|
||||||
try: |
|
||||||
logger.info("job:开始执行") |
|
||||||
start,end=self.get_job_period() |
|
||||||
job_id =self.generate_job_id() |
|
||||||
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING")) |
|
||||||
logger.info("job:运行参数:{}".format(start,end)) |
|
||||||
if start is None or end is None: |
|
||||||
logger.info("job:结束时间大于服务器时间不执行") |
|
||||||
return |
|
||||||
|
|
||||||
logger.info("job:"+"准备获取es数据") |
|
||||||
#entry(start,end) |
|
||||||
logger.info("job:"+"执行完成") |
|
||||||
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"") |
|
||||||
except Exception ,e: |
|
||||||
err_info="定时任务执行失败:".format(str(e), traceback.format_exc()) |
|
||||||
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info) |
|
||||||
logger.error(err_info) |
|
||||||
raise |
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
UserCron().processing() |
|
@ -1,56 +0,0 @@ |
|||||||
# coding=utf-8 |
|
||||||
""" |
|
||||||
@Author: tangwy |
|
||||||
@FileName: user_cron_pg.py |
|
||||||
@DateTime: 2024/7/09 14:19 |
|
||||||
@Description: 定时清洗es数据 |
|
||||||
""" |
|
||||||
from __future__ import unicode_literals |
|
||||||
|
|
||||||
import random,string |
|
||||||
import traceback,json |
|
||||||
import time |
|
||||||
from datetime import datetime,timedelta |
|
||||||
import calendar |
|
||||||
from uebaMetricsAnalysis.utils.ext_logging import logger |
|
||||||
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType |
|
||||||
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry |
|
||||||
|
|
||||||
JOB_STATUS ={ |
|
||||||
"RUNNING":1, |
|
||||||
"FINISH":2, |
|
||||||
"ERROR":3 |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
class UserCron: |
|
||||||
def generate_job_id(self): |
|
||||||
timestamp = int(time.time() * 1000) |
|
||||||
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7)) |
|
||||||
return str(timestamp) + random_letters |
|
||||||
|
|
||||||
#每5分钟执行一次 |
|
||||||
def processing(self): |
|
||||||
job_id =self.generate_job_id() |
|
||||||
try: |
|
||||||
logger.info("job:开始执行") |
|
||||||
start,end= DBUtils.get_job_period() |
|
||||||
if start is None or end is None: |
|
||||||
logger.info("job:结束时间大于服务器时间不执行") |
|
||||||
return |
|
||||||
|
|
||||||
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING")) |
|
||||||
logger.info("job:运行参数:{}".format(start,end)) |
|
||||||
|
|
||||||
logger.info("job:"+"准备获取es数据") |
|
||||||
entry(start,end) |
|
||||||
logger.info("job:"+"执行完成") |
|
||||||
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"") |
|
||||||
except Exception ,e: |
|
||||||
err_info="定时任务执行失败:".format(str(e), traceback.format_exc()) |
|
||||||
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info) |
|
||||||
logger.error(err_info) |
|
||||||
raise |
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
UserCron().processing() |
|
@ -0,0 +1,142 @@ |
|||||||
|
# coding:utf-8 |
||||||
|
|
||||||
|
import sys |
||||||
|
import uuid |
||||||
|
import json |
||||||
|
import time |
||||||
|
import random |
||||||
|
|
||||||
|
path = str(sys.path[0]) |
||||||
|
home_path = path.split("isop_uebaapiData")[0] |
||||||
|
sys.path.append(home_path) |
||||||
|
from isop_uebaapiData.util import send_logs |
||||||
|
|
||||||
|
def alarm(cookies, api): |
||||||
|
"""2、HTTP日志""" |
||||||
|
inputstr = '''[{"msgtype":1,"hash":"8DE9-BDAB-F622-2FA8","dev_ip":"10.67.5.17","product":"uts"},{"sid":"6004744450036c44f815500016d00a5f5151105430a3ed","timestamp":1567673939,"sip":"10.67.0.52","sport":5624,"dip":"10.67.0.53","dport":80,"protocol":6,"app":3087428650795009,"app_proto":8,"direct":4,"app.detail":{"method":"GET","http_protocol":"1.1","ret_code":200,"host":"10.67.1.1","uri":"/webtest/uploadFile.php","referer":"http://[2222::65]/webtest/","content_type":" multipart/form-data; boundary=----WebKitFormBoundary2zcCUl4lQf1h7A7S","content_type_server":" text/html","server":"Apache/2.4.4 (Win32) OpenSSL/0.9.8y PHP/5.4.19","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36","link":"","cookies":"loginmainacctid=wangshiguang;operatorId=d2601586;com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;","content_encoding":"","location":"","content_length":70080,"content_length_server":200,"set_cookie":"","range":"","connection":"keep-alive","connection_server":"Keep-Alive","x_forwarded_for":"","post_data":"LS0tLS0tV2ViS2l0Rm9ybUJvdW5kYXJ5MnpjQ1VsNGxRZjFoN0E3Uw0KQ29udGVudC1EaXNwb3NpdGlvbjogZm9ybS1kYXRhOyBuYW1lPSJmaWxlIjsgZmlsZW5hbWU9IjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1Ig0=","response_body":"VXBsb2FkOiAwMDAxYWQ0NDFkY2IzODYyMThhNzY5OTJhY2Y4YjcwNTxiciAvPlR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTxiciAvPlNpemU6IDY4LjEyNzkyOTY4NzUgS2I8YnIgLz5UZW1wIGZpbGU6IEQ6XHhhbXBwXHRtcFxwaHA2ODI1LnRtcDxiciAvPjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1IGFscmVhZHkgZXhpc3RzLiA="}}]''' |
||||||
|
inputarr = json.loads(inputstr, strict=False) |
||||||
|
# 随机生成timestamp |
||||||
|
inputarr[1]["timestamp"] = int(time.time()) |
||||||
|
inputarr[1]["sid"] = str(uuid.uuid1()) |
||||||
|
# inputarr[1]["sip"] = "10.67.4.33" |
||||||
|
inputarr[1]["sip"] = generate_random_ip() |
||||||
|
inputarr[1]["dip"] = "10.67.1.1" |
||||||
|
inputarr[1]["dport"] = "8180" |
||||||
|
inputarr[1]["app.detail"]["uri"] = "/alarmtest.action?BMECID=352432757&BMETimestamp=1692788489260&queryNumber=158713459" |
||||||
|
inputarr[1]["app.detail"]["host"] = api |
||||||
|
inputarr[1]["app.detail"]["cookies"] = cookies |
||||||
|
return json.dumps(inputarr) |
||||||
|
|
||||||
|
def generate_random_ip(): |
||||||
|
# 固定前缀 "192.168." |
||||||
|
prefix = "192.168." |
||||||
|
# 生成随机的第三和第四段IP地址 |
||||||
|
third_octet = 1 |
||||||
|
fourth_octet = random.randint(0, 50) |
||||||
|
# 拼接IP地址 |
||||||
|
ip = "{}{}.{}".format(prefix, third_octet, fourth_octet) |
||||||
|
return ip |
||||||
|
|
||||||
|
def AbIDVisitAPINums510404(): |
||||||
|
datalist = {"TCP_5011": list()} |
||||||
|
ID2Area = { |
||||||
|
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"], |
||||||
|
"荆州": ["2001800", "2001801", "2001808"], |
||||||
|
"江汉": ["1801820", "1801810"], |
||||||
|
"省公司市场部": ["1002011", "1002012", "1002013"] |
||||||
|
} |
||||||
|
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com", |
||||||
|
"good.alarm.com"] |
||||||
|
info_list = [ |
||||||
|
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["武汉"][ |
||||||
|
0] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 60], |
||||||
|
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["荆州"][ |
||||||
|
2] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 120] |
||||||
|
] |
||||||
|
for i in range(len(info_list)): |
||||||
|
cookies = info_list[i][0] |
||||||
|
count = info_list[i][1] |
||||||
|
for j in range(count): |
||||||
|
api = random.choice(api_list) |
||||||
|
datalist["TCP_5011"].append(alarm(cookies, api)) |
||||||
|
for key in datalist.keys(): |
||||||
|
send_logs(datalist[key]) |
||||||
|
return "510405场景的告警数据已生成" |
||||||
|
|
||||||
|
def get_random_jobnum(): |
||||||
|
# 定义包含不同前缀的字符串数组 |
||||||
|
prefix_strings = [ |
||||||
|
['10243', '10895', '10134', '10781', '10962'], # 10打头的字符串示例 |
||||||
|
['11089', '11057', '11023', '11016', '11030'], # 110打头的字符串示例 |
||||||
|
['14076', '14049', '14098', '14032', '14061'], # 140打头的字符串示例 |
||||||
|
['26054', '26013', '26087', '26029', '26061'], # 260打头的字符串示例 |
||||||
|
['20083', '20015', '20072', '20096', '20048'], # 200打头的字符串示例 |
||||||
|
['19035', '19017', '19049', '19082', '19096'], # 190打头的字符串示例 |
||||||
|
['180237', '180276', '180204', '180295', '180219'] # 1802打头的字符串示例 |
||||||
|
] |
||||||
|
|
||||||
|
# 随机选择一个前缀数组 |
||||||
|
selected_prefix_array = random.choice(prefix_strings) |
||||||
|
# 随机选择一个具体的字符串 |
||||||
|
selected_string = random.choice(selected_prefix_array) |
||||||
|
return selected_string |
||||||
|
|
||||||
|
def get_random_person(): |
||||||
|
people_list = [ |
||||||
|
"Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Henry", "Isabel", "Jack", |
||||||
|
"Kate", "Liam", "Mia", "Noah", "Olivia" |
||||||
|
# 继续添加更多的名称... |
||||||
|
] |
||||||
|
|
||||||
|
random_person = random.choice(people_list) |
||||||
|
return random_person |
||||||
|
|
||||||
|
def get_random_menu(): |
||||||
|
# 定义系统菜单列表 |
||||||
|
system_menu = [ |
||||||
|
"主页", "设置", "个人资料", "消息", "通知", "帮助", "帐户", "关于", "联系我们", "服务", |
||||||
|
"购物车", "订单", "支付", "地址", "密码" |
||||||
|
] |
||||||
|
|
||||||
|
# 随机选择一个菜单项 |
||||||
|
random_menu_item = random.choice(system_menu) |
||||||
|
return random_menu_item |
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
datalist = {"TCP_5011": list()} |
||||||
|
ID2Area = { |
||||||
|
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"], |
||||||
|
"荆州": ["2001800", "2001801", "2001808"], |
||||||
|
"江汉": ["1801820", "1801810"], |
||||||
|
"省公司市场部": ["1002011", "1002012", "1002013"] |
||||||
|
} |
||||||
|
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com", "good.alarm.com","baidu.com","sohu.com","xinlang.com","erpx.com"] |
||||||
|
info_list = [ |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000], |
||||||
|
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000] |
||||||
|
] |
||||||
|
|
||||||
|
|
||||||
|
for i in range(len(info_list)): |
||||||
|
cookies = info_list[i][0] |
||||||
|
count = info_list[i][1] |
||||||
|
for j in range(count): |
||||||
|
api = random.choice(api_list) |
||||||
|
datalist["TCP_5011"].append(alarm(cookies, api)) |
||||||
|
for key in datalist.keys(): |
||||||
|
send_logs(datalist[key]) |
||||||
|
print "510405场景的告警数据已生成" |
@ -0,0 +1,37 @@ |
|||||||
|
# coding=utf-8 |
||||||
|
""" |
||||||
|
@Author: fu-zhe |
||||||
|
@FileName: user_cron.py |
||||||
|
@DateTime: 2024/5/23 14:19 |
||||||
|
@Description: 用户相关的定时任务 全量获取,对已有数据进行先删除后写入 周期一天 |
||||||
|
""" |
||||||
|
from __future__ import unicode_literals |
||||||
|
|
||||||
|
import random,string |
||||||
|
import traceback |
||||||
|
import time |
||||||
|
from datetime import datetime, timedelta |
||||||
|
import calendar |
||||||
|
from uebaMetricsAnalysis.utils.ext_logging import logger_cron |
||||||
|
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType |
||||||
|
from uebaMetricsAnalysis.utils.file_to_pg import entry |
||||||
|
|
||||||
|
JOB_STATUS ={ |
||||||
|
"RUNNING":1, |
||||||
|
"FINISH":2, |
||||||
|
"ERROR":3 |
||||||
|
} |
||||||
|
|
||||||
|
class UserCron: |
||||||
|
#每5分钟执行一次 |
||||||
|
def processing(self): |
||||||
|
try: |
||||||
|
logger_cron.info("INSERT:开始执行") |
||||||
|
entry() |
||||||
|
logger_cron.info("INSERT:"+"执行完成") |
||||||
|
except Exception ,e: |
||||||
|
err_info=traceback.format_exc() |
||||||
|
logger_cron.error("INSERT:"+"执行失败,"+err_info) |
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
UserCron().processing() |
@ -0,0 +1,73 @@ |
|||||||
|
# coding=utf-8 |
||||||
|
""" |
||||||
|
@Author: tangwy |
||||||
|
@FileName: user_cron_pg.py |
||||||
|
@DateTime: 2024/7/09 14:19 |
||||||
|
@Description: 定时清洗es数据 |
||||||
|
""" |
||||||
|
from __future__ import unicode_literals |
||||||
|
|
||||||
|
import random,string |
||||||
|
import traceback,json |
||||||
|
import time |
||||||
|
from datetime import datetime,timedelta |
||||||
|
import calendar |
||||||
|
from uebaMetricsAnalysis.utils.ext_logging import logger_cron |
||||||
|
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType |
||||||
|
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry |
||||||
|
|
||||||
|
JOB_STATUS ={ |
||||||
|
"RUNNING":1, |
||||||
|
"FINISH":2, |
||||||
|
"ERROR":3 |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
class UserCron: |
||||||
|
#生成job_id |
||||||
|
def generate_job_id(self): |
||||||
|
timestamp = int(time.time() * 1000) |
||||||
|
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7)) |
||||||
|
return str(timestamp) + random_letters |
||||||
|
|
||||||
|
#每5分钟执行一次 |
||||||
|
def processing(self): |
||||||
|
logger_cron.info("JOB:接收到执行指令") |
||||||
|
job_id =self.generate_job_id() |
||||||
|
task_run_count =0 |
||||||
|
try: |
||||||
|
start,end,status,run_count,jobid= DBUtils.get_job_period() |
||||||
|
if jobid !="": |
||||||
|
job_id=jobid |
||||||
|
logger_cron.info("JOB:"+job_id+"开始执行") |
||||||
|
if status ==1: |
||||||
|
logger_cron.info("JOB:"+job_id+"正在运行中不执行") |
||||||
|
return |
||||||
|
if status==3 and run_count >3: |
||||||
|
logger_cron.info("JOB:"+job_id+"失败次数大于3不执行") |
||||||
|
return |
||||||
|
|
||||||
|
if start is None or end is None: |
||||||
|
logger_cron.info("JOB:"+job_id+"结束时间大于(服务器时间-15分钟)不执行") |
||||||
|
return |
||||||
|
|
||||||
|
task_run_count = run_count+1 |
||||||
|
logger_cron.info("JOB:"+job_id+"运行参数:{},{}".format(start,end)) |
||||||
|
logger_cron.info("JOB:"+job_id+"准备将job写入job表") |
||||||
|
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING")) |
||||||
|
logger_cron.info("JOB:"+job_id+"完成job表写入") |
||||||
|
|
||||||
|
logger_cron.info("JOB:"+job_id+"准备获取es数据") |
||||||
|
entry(start,end,job_id) |
||||||
|
logger_cron.info("JOB:"+job_id+"完成es数据获取") |
||||||
|
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"",task_run_count) |
||||||
|
logger_cron.info("JOB:"+job_id+"更新job表状态完成") |
||||||
|
|
||||||
|
except Exception ,e: |
||||||
|
err_info=traceback.format_exc() |
||||||
|
logger_cron.error("JOB:"+job_id+"执行失败:"+err_info) |
||||||
|
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info,task_run_count) |
||||||
|
raise |
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
UserCron().processing() |
@ -1,44 +0,0 @@ |
|||||||
{ |
|
||||||
"ip":[ |
|
||||||
{ |
|
||||||
"ip":"192.168.1.1", |
|
||||||
"jobnum":"1122222", |
|
||||||
"count":212 |
|
||||||
}, |
|
||||||
{ |
|
||||||
"ip":"192.168.2.1", |
|
||||||
"jobnum":"1122222", |
|
||||||
"count":212 |
|
||||||
} |
|
||||||
], |
|
||||||
"account":[ |
|
||||||
{ |
|
||||||
"account":"zhangs", |
|
||||||
"jobnum":"1122222", |
|
||||||
"count":212 |
|
||||||
}, |
|
||||||
{ |
|
||||||
"account":"zhang3", |
|
||||||
"jobnum":"112222", |
|
||||||
"count":211 |
|
||||||
} |
|
||||||
], |
|
||||||
"interface":[ |
|
||||||
{ |
|
||||||
"interface":"www.baidu.com/user", |
|
||||||
"jobnum":"1122222", |
|
||||||
"account":"zhangs", |
|
||||||
"ip":"192.168.1.1", |
|
||||||
"count":212 |
|
||||||
} |
|
||||||
], |
|
||||||
"menu":[ |
|
||||||
{ |
|
||||||
"menu":"菜单1", |
|
||||||
"jobnum":"1122222", |
|
||||||
"account":"zhangs", |
|
||||||
"ip":"192.168.1.1", |
|
||||||
"count":212 |
|
||||||
} |
|
||||||
] |
|
||||||
} |
|
@ -1,13 +1,24 @@ |
|||||||
[ |
[ |
||||||
{ |
{ |
||||||
"task_name": "ueba_corn", |
"task_name": "ueba_cron", |
||||||
"task_type": 1, |
"task_type": 1, |
||||||
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/corn/ueba_corn.py", |
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_pg.py", |
||||||
"task_owner": "uebaMetricsAnalysis", |
"task_owner": "uebaMetricsAnalysis", |
||||||
"run_mode": 1, |
"run_mode": 1, |
||||||
"duration_args": "*/2 * * * * *", |
"duration_args": "0 */1 * * * ?", |
||||||
"retry_nums": 5, |
"retry_nums": 0, |
||||||
"is_enable": 1, |
"is_enable": 1, |
||||||
"task_description": "每天执行一次 清洗数据到es-ueba索引" |
"task_description": "每分钟执行一次数据清洗" |
||||||
|
}, |
||||||
|
{ |
||||||
|
"task_name": "ueba_cron_data_insert", |
||||||
|
"task_type": 1, |
||||||
|
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py", |
||||||
|
"task_owner": "uebaMetricsAnalysis", |
||||||
|
"run_mode": 1, |
||||||
|
"duration_args": "0 0 3 * * ?", |
||||||
|
"retry_nums": 0, |
||||||
|
"is_enable": 1, |
||||||
|
"task_description": "每天执行一次 将汇总数据写入pg" |
||||||
} |
} |
||||||
] |
] |
@ -0,0 +1,221 @@ |
|||||||
|
{ |
||||||
|
"summary": { |
||||||
|
"ip": [ |
||||||
|
{ |
||||||
|
"company": "孝感分公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"ip_count": 323, |
||||||
|
"ip_rate": 0.3, |
||||||
|
"ip_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"company": "宜昌分公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"ip_count": 323, |
||||||
|
"ip_rate": 0.3, |
||||||
|
"ip_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"company": "随州分公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"ip_count": 323, |
||||||
|
"ip_rate": 0.3, |
||||||
|
"ip_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"company": "黄冈分公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"ip_count": 323, |
||||||
|
"ip_rate": 0.3, |
||||||
|
"ip_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"company": "省公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"ip_count": 323, |
||||||
|
"ip_rate": 0.3, |
||||||
|
"ip_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
} |
||||||
|
], |
||||||
|
"account": [ |
||||||
|
{ |
||||||
|
"company": "湖北公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"account_count": 323, |
||||||
|
"account_rate": 0.3, |
||||||
|
"account_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"company": "宜昌公司", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"account_count": 323, |
||||||
|
"account_rate": 0.3, |
||||||
|
"account_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
} |
||||||
|
], |
||||||
|
"interface": [ |
||||||
|
{ |
||||||
|
"interface_addr": "/getuser", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"frequency_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"interface_addr": "/getcpminfo", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"frequency_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
} |
||||||
|
], |
||||||
|
"menu": [ |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"frequency_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"frequency_rate": 0.2, |
||||||
|
"frequency_avg": 0.43, |
||||||
|
"trend": 0.3 |
||||||
|
} |
||||||
|
] |
||||||
|
}, |
||||||
|
"detail": { |
||||||
|
"ip": { |
||||||
|
"湖北公司": [ |
||||||
|
{ |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_frequency": 22 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"req_ip": "xx1x.xx.xx.x", |
||||||
|
"req_frequency": 21 |
||||||
|
} |
||||||
|
], |
||||||
|
"宜昌公司": [ |
||||||
|
{ |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_frequency": 22 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"req_ip": "xx1x.xx.xx.x", |
||||||
|
"req_frequency": 21 |
||||||
|
} |
||||||
|
] |
||||||
|
}, |
||||||
|
"account": { |
||||||
|
"湖北公司": [ |
||||||
|
{ |
||||||
|
"req_account": "admin", |
||||||
|
"req_frequency": 22, |
||||||
|
"req_jobnum": 98799 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"req_account": "admin", |
||||||
|
"req_frequency": 22, |
||||||
|
"req_jobnum": 98799 |
||||||
|
} |
||||||
|
], |
||||||
|
"宜昌公司": [ |
||||||
|
{ |
||||||
|
"req_account": "admin", |
||||||
|
"req_frequency": 22, |
||||||
|
"req_jobnum": 98799 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"req_account": "admin", |
||||||
|
"req_frequency": 22, |
||||||
|
"req_jobnum": 98799 |
||||||
|
} |
||||||
|
] |
||||||
|
}, |
||||||
|
"interface": { |
||||||
|
"接口1": [ |
||||||
|
{ |
||||||
|
"interface_addr": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"interface_addr": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
} |
||||||
|
], |
||||||
|
"接口2": [ |
||||||
|
{ |
||||||
|
"interface_addr": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"interface_addr": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
} |
||||||
|
] |
||||||
|
}, |
||||||
|
"menu": { |
||||||
|
"菜单1": [ |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
} |
||||||
|
], |
||||||
|
"菜单2": [ |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
}, |
||||||
|
{ |
||||||
|
"menu_name": "接口地址", |
||||||
|
"req_frequency": 122, |
||||||
|
"req_ip": "xxx.xx.xx.x", |
||||||
|
"req_account": 0.2, |
||||||
|
"req_jobnum": 0.2 |
||||||
|
} |
||||||
|
] |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -1,18 +0,0 @@ |
|||||||
#encoding=utf-8 |
|
||||||
import json |
|
||||||
|
|
||||||
from isoc.utils.esUtil import EsUtil |
|
||||||
|
|
||||||
|
|
||||||
def createIndex(): |
|
||||||
map={ |
|
||||||
"field1": "text", |
|
||||||
"field2": "text" |
|
||||||
} |
|
||||||
es_util_instance = EsUtil() |
|
||||||
res = es_util_instance.create_index_simple("bsa_traffic*",3,scroll_search) |
|
||||||
return res |
|
||||||
|
|
||||||
res = createIndex() |
|
||||||
|
|
||||||
print(res) |
|
@ -1,292 +0,0 @@ |
|||||||
#encoding=utf-8 |
|
||||||
import json |
|
||||||
import time,datetime |
|
||||||
import traceback |
|
||||||
from datetime import datetime, timedelta |
|
||||||
import calendar |
|
||||||
from esUtil import EsUtil |
|
||||||
import pytz |
|
||||||
|
|
||||||
size = 1000# 可以根据实际情况调整 |
|
||||||
##01 创建索引 |
|
||||||
def createIndex(index): |
|
||||||
map={ |
|
||||||
"data_type":"keyword", |
|
||||||
"req_account":"keyword", |
|
||||||
"req_frequency":"integer", |
|
||||||
"req_jobnum":"keyword", |
|
||||||
"interface_addr":"keyword", |
|
||||||
"req_ip":"ip", |
|
||||||
"menu_name":"keyword", |
|
||||||
"date_time":"date" |
|
||||||
} |
|
||||||
es_util_instance = EsUtil() |
|
||||||
reqs = es_util_instance.is_index_exist(index) |
|
||||||
if reqs =="false": |
|
||||||
try: |
|
||||||
res = es_util_instance.create_index_simple(index,map) |
|
||||||
except Exception,e: |
|
||||||
print e.message |
|
||||||
## IP维度 |
|
||||||
def get_ip_group_data(index,startTime,endTime): |
|
||||||
try: |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"range": {"timestamp": {"gte": startTime,"lte": endTime}} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size": size, |
|
||||||
"sources": [ |
|
||||||
{"sip": { "terms": {"field": "sip"} }}, |
|
||||||
{"trojan_type": { "terms": { "field": "trojan_type"}}} |
|
||||||
] |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas=[] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
try: |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
except Exception,e: |
|
||||||
print "err" |
|
||||||
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: |
|
||||||
data = { |
|
||||||
"data_type": "ip", |
|
||||||
"req_account": "", |
|
||||||
"req_frequency": bucket['doc_count'], |
|
||||||
"req_jobnum": bucket['key']['trojan_type'] , |
|
||||||
"interface_addr": "", |
|
||||||
"req_ip":bucket['key']['sip'] , |
|
||||||
"menu_name": "", |
|
||||||
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
|
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
|
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
except Exception,e: |
|
||||||
print "x_err:"+e.message |
|
||||||
return datas |
|
||||||
|
|
||||||
|
|
||||||
## 账号维度 |
|
||||||
def get_account_group_data(index,startTime,endTime): |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"range": {"timestamp": {"gte": startTime,"lte": endTime}} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size": size, |
|
||||||
"sources": [ |
|
||||||
{"account": { "terms": {"field": "account"} }}, |
|
||||||
{"trojan_type": { "terms": { "field": "trojan_type"}}} |
|
||||||
] |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas=[] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: |
|
||||||
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) |
|
||||||
data = { |
|
||||||
"data_type": "account", |
|
||||||
"req_account": bucket['key']['account'], |
|
||||||
"req_frequency": bucket['doc_count'], |
|
||||||
"req_jobnum": bucket['key']['trojan_type'] , |
|
||||||
"interface_addr": "", |
|
||||||
"req_ip":"0.0.0.0" , |
|
||||||
"menu_name": "", |
|
||||||
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
|
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
|
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
|
|
||||||
return datas |
|
||||||
|
|
||||||
## 接口维度 |
|
||||||
def get_interface_group_data(index,startTime,endTime): |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"range": {"timestamp": {"gte": startTime,"lte": endTime}} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size": size, |
|
||||||
"sources": [ |
|
||||||
{"interface": { "terms": {"field": "interface"} }}, |
|
||||||
{"sip": { "terms": { "field": "sip"}}}, |
|
||||||
{"account": { "terms": { "field": "account"}}}, |
|
||||||
{"trojan_type": { "terms": { "field": "trojan_type"}}}, |
|
||||||
] |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas=[] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: |
|
||||||
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) |
|
||||||
data = { |
|
||||||
"data_type": "interface", |
|
||||||
"req_account": bucket['key']['account'], |
|
||||||
"req_frequency": bucket['doc_count'], |
|
||||||
"req_jobnum": bucket['key']['trojan_type'] , |
|
||||||
"interface_addr": bucket['key']['interface'] , |
|
||||||
"req_ip":bucket['key']['sip'], |
|
||||||
"menu_name": "", |
|
||||||
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
|
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
|
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
|
|
||||||
return datas |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## 菜单维度 |
|
||||||
def get_menu_group_data(index,startTime,endTime): |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"range": {"timestamp": {"gte": startTime,"lte": endTime}} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size": size, |
|
||||||
"sources": [ |
|
||||||
{"worm_family": { "terms": {"field": "worm_family"} }}, |
|
||||||
{"sip": { "terms": { "field": "sip"}}}, |
|
||||||
{"account": { "terms": { "field": "account"}}}, |
|
||||||
{"trojan_type": { "terms": { "field": "trojan_type"}}}, |
|
||||||
] |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas=[] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: |
|
||||||
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) |
|
||||||
data = { |
|
||||||
"data_type": "menu", |
|
||||||
"req_account": bucket['key']['account'], |
|
||||||
"req_frequency": bucket['doc_count'], |
|
||||||
"req_jobnum": bucket['key']['trojan_type'] , |
|
||||||
"interface_addr": "" , |
|
||||||
"req_ip":bucket['key']['sip'], |
|
||||||
"menu_name": bucket['key']['worm_family'], |
|
||||||
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
|
|
||||||
return datas |
|
||||||
|
|
||||||
##03 数据写入 |
|
||||||
def data_insert(index,data): |
|
||||||
es_util_instance = EsUtil() |
|
||||||
response = es_util_instance.bulk_insert(index,data) |
|
||||||
return response |
|
||||||
|
|
||||||
def clean_data(write_index,read_index,start,end): |
|
||||||
data_ip = get_ip_group_data(read_index,start,end) |
|
||||||
print "data_ip:"+str(len(data_ip)) |
|
||||||
data_account = get_account_group_data(read_index,start,end) |
|
||||||
print "data_ip:"+str(len(data_account)) |
|
||||||
data_interface = get_interface_group_data(read_index,start,end) |
|
||||||
print "data_ip:"+str(len(data_interface)) |
|
||||||
data_menu = get_menu_group_data(read_index,start,end) |
|
||||||
print "data_ip:"+str(len(data_menu)) |
|
||||||
res_data = data_ip+data_account+data_interface+data_menu |
|
||||||
response = data_insert(write_index,res_data) |
|
||||||
print json.dumps(response) |
|
||||||
|
|
||||||
#入口 |
|
||||||
def entry(write_index,read_index,start,end): |
|
||||||
createIndex(write_index) |
|
||||||
clean_data(write_index,read_index,start,end) |
|
||||||
|
|
||||||
|
|
||||||
#前一天的0点0分0秒 |
|
||||||
def get_start_end_time(hour,minute,second): |
|
||||||
# 获取当前日期时间 |
|
||||||
now = datetime.now() |
|
||||||
|
|
||||||
# 计算昨天的日期时间 |
|
||||||
yesterday = now - timedelta(days=1) |
|
||||||
|
|
||||||
# 将时间部分设为 00:00:00 |
|
||||||
yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0) |
|
||||||
|
|
||||||
# 使用 pytz 来获取 UTC 时区对象 |
|
||||||
utc = pytz.utc |
|
||||||
# 将时间对象本地化为 UTC 时区 |
|
||||||
yesterday_midnight_utc = utc.localize(yesterday_midnight) |
|
||||||
|
|
||||||
# 格式化为带时区的字符串(ISO 8601格式) |
|
||||||
formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ") |
|
||||||
return formatted_date |
|
||||||
|
|
||||||
def index(): |
|
||||||
try: |
|
||||||
#写入的索引 按月创建,注意跨天的场景 |
|
||||||
write_index= "b_ueba_2024_07" |
|
||||||
read_index ="bsa_traffic*" |
|
||||||
#任务执行时间是每天 凌晨12点 |
|
||||||
#查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒 |
|
||||||
|
|
||||||
start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0) |
|
||||||
end = get_start_end_time(23,59,59) |
|
||||||
print start +":"+ end |
|
||||||
entry(write_index,read_index,start,end) |
|
||||||
except Exception ,e: |
|
||||||
print "定时任务执行失败:"+traceback.format_exc() |
|
||||||
# logger.error("定时任务执行失败:".format(str(e), traceback.format_exc())) |
|
||||||
|
|
||||||
index() |
|
@ -1,281 +0,0 @@ |
|||||||
#!/usr/bin/python |
|
||||||
#encoding=utf-8 |
|
||||||
# author: tangwy |
|
||||||
|
|
||||||
import json |
|
||||||
import os,re |
|
||||||
import codecs |
|
||||||
import traceback |
|
||||||
from isoc.utils.esUtil import EsUtil |
|
||||||
from dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \ |
|
||||||
interface_summary_data_format, menu_summary_data_format |
|
||||||
from ext_logging import logger |
|
||||||
## IP维度 |
|
||||||
def es_get_ip_group_data(index,startTime,endTime): |
|
||||||
page_size = 9000 #可以根据实际情况调整 |
|
||||||
query_body={ |
|
||||||
"query": { |
|
||||||
"bool": { |
|
||||||
"filter": [ |
|
||||||
{ "term": { "data_type": "ip" } }, |
|
||||||
{"range":{ |
|
||||||
"date_time": { |
|
||||||
"gte": startTime, |
|
||||||
"lte": endTime |
|
||||||
} |
|
||||||
}} |
|
||||||
] |
|
||||||
} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size" : page_size, |
|
||||||
"sources": [ |
|
||||||
{ "req_ip": { "terms": { "field": "req_ip" } } }, |
|
||||||
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } } |
|
||||||
] |
|
||||||
}, |
|
||||||
"aggregations": { |
|
||||||
"total_count": { |
|
||||||
"sum": { |
|
||||||
"field": "req_frequency" |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas = [] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", []) |
|
||||||
for bucket in buckets: |
|
||||||
data= { |
|
||||||
"ip":bucket['key']['req_ip'], |
|
||||||
"jobnum":bucket['key']['req_jobnum'], |
|
||||||
"count":bucket['total_count']['value'] |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
return datas |
|
||||||
|
|
||||||
|
|
||||||
## 账号维度 |
|
||||||
def es_get_account_group_data(index,startTime,endTime): |
|
||||||
page_size = 9000 #可以根据实际情况调整 |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"bool": { |
|
||||||
"filter": [ |
|
||||||
{ "term": { "data_type": "account" } }, |
|
||||||
{"range":{ |
|
||||||
"date_time": { |
|
||||||
"gte": startTime, |
|
||||||
"lte": endTime |
|
||||||
} |
|
||||||
}} |
|
||||||
] |
|
||||||
} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size" : page_size, |
|
||||||
"sources": [ |
|
||||||
{ "req_account": { "terms": { "field": "req_account" } } }, |
|
||||||
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } } |
|
||||||
] |
|
||||||
}, |
|
||||||
"aggregations": { |
|
||||||
"total_count": { |
|
||||||
"sum": { |
|
||||||
"field": "req_frequency" |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas = [] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", []) |
|
||||||
for bucket in buckets: |
|
||||||
data= { |
|
||||||
"account":bucket['key']['req_account'], |
|
||||||
"jobnum":bucket['key']['req_jobnum'], |
|
||||||
"count":bucket['total_count']['value'] |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
return datas |
|
||||||
|
|
||||||
|
|
||||||
## 菜单维度 |
|
||||||
def es_get_menu_group_data(index,startTime,endTime): |
|
||||||
page_size = 9000 #可以根据实际情况调整 |
|
||||||
query_body={ |
|
||||||
"size": 0, |
|
||||||
"query": { |
|
||||||
"bool": { |
|
||||||
"filter": [ |
|
||||||
{ "term": { "data_type": "menu" } }, |
|
||||||
{"range":{ |
|
||||||
"date_time": { |
|
||||||
"gte": startTime, |
|
||||||
"lte": endTime |
|
||||||
} |
|
||||||
}} |
|
||||||
] |
|
||||||
} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"composite_buckets": { |
|
||||||
"composite": { |
|
||||||
"size" : page_size, |
|
||||||
"sources": [ |
|
||||||
{ "menu_name": { "terms": { "field": "menu_name" } } }, |
|
||||||
{ "req_account": { "terms": { "field": "req_account" } } }, |
|
||||||
{ "req_ip": { "terms": { "field": "req_ip" } } }, |
|
||||||
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } } |
|
||||||
] |
|
||||||
}, |
|
||||||
"aggregations": { |
|
||||||
"total_count": { |
|
||||||
"sum": { |
|
||||||
"field": "req_frequency" |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas = [] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", []) |
|
||||||
for bucket in buckets: |
|
||||||
data= { |
|
||||||
"menu":bucket['key']['menu_name'], |
|
||||||
"ip":bucket['key']['req_ip'], |
|
||||||
"account":bucket['key']['req_account'], |
|
||||||
"jobnum":bucket['key']['req_jobnum'], |
|
||||||
"count":bucket['total_count']['value'] |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
return datas |
|
||||||
|
|
||||||
|
|
||||||
## 接口维度 |
|
||||||
def es_get_interface_group_data(index,startTime,endTime): |
|
||||||
page_size = 9999 #可以根据实际情况调整 |
|
||||||
query_body={ |
|
||||||
"query": { |
|
||||||
"bool": { |
|
||||||
"filter": [ |
|
||||||
{ "term": { "data_type": "interface" } }, |
|
||||||
{"range":{ |
|
||||||
"date_time": { |
|
||||||
"gte": startTime, |
|
||||||
"lte": endTime |
|
||||||
} |
|
||||||
}} |
|
||||||
] |
|
||||||
} |
|
||||||
}, |
|
||||||
"aggs": { |
|
||||||
"group_by_menu": { |
|
||||||
"composite": { |
|
||||||
"size" : page_size, |
|
||||||
"sources": [ |
|
||||||
{ "interface_addr": { "terms": { "field": "interface_addr" } } }, |
|
||||||
{ "req_account": { "terms": { "field": "req_account" } } }, |
|
||||||
{ "req_ip": { "terms": { "field": "req_ip" } } }, |
|
||||||
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } } |
|
||||||
] |
|
||||||
}, |
|
||||||
"aggregations": { |
|
||||||
"total_count": { |
|
||||||
"sum": { |
|
||||||
"field": "req_frequency" |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
after_key = None |
|
||||||
es_util_instance = EsUtil() |
|
||||||
datas = [] |
|
||||||
while True: |
|
||||||
if after_key: |
|
||||||
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key |
|
||||||
response = es_util_instance.search(index,query_body) |
|
||||||
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", []) |
|
||||||
for bucket in buckets: |
|
||||||
data= { |
|
||||||
"interface":bucket['key']['interface_addr'], |
|
||||||
"ip":bucket['key']['req_ip'], |
|
||||||
"account":bucket['key']['req_account'], |
|
||||||
"jobnum":bucket['key']['req_jobnum'], |
|
||||||
"count":bucket['total_count']['value'] |
|
||||||
} |
|
||||||
datas.append(data) |
|
||||||
after_key = bucket["key"] |
|
||||||
if not response["aggregations"]["composite_buckets"].get("after_key"): |
|
||||||
break |
|
||||||
after_key = response["aggregations"]["composite_buckets"]["after_key"] |
|
||||||
return datas |
|
||||||
|
|
||||||
def entry(data_type,start,end): |
|
||||||
base_index = 'c_ueba_001' |
|
||||||
# es_util_instance = EsUtil() |
|
||||||
# res=es_util_instance.get_available_index_name(start,end,base_index) |
|
||||||
# if len(res)==0: |
|
||||||
# return |
|
||||||
# index =",".join(res) |
|
||||||
|
|
||||||
index=base_index |
|
||||||
|
|
||||||
try: |
|
||||||
data = {} |
|
||||||
if data_type == "1": |
|
||||||
ip_summary_data = es_get_ip_group_data(index, start, end) |
|
||||||
data = ip_summary_data_format(ip_summary_data) |
|
||||||
if data_type == "2": |
|
||||||
account_summary_data = es_get_account_group_data(index, start, end) |
|
||||||
data = account_summary_data_format(account_summary_data) |
|
||||||
if data_type == "3": |
|
||||||
interface_summary_data = es_get_interface_group_data(index, start, end) |
|
||||||
data = interface_summary_data_format(interface_summary_data) |
|
||||||
if data_type == "4": |
|
||||||
menu_summary_data = es_get_menu_group_data(index, start, end) |
|
||||||
data = menu_summary_data_format(menu_summary_data) |
|
||||||
return data |
|
||||||
except Exception, e: |
|
||||||
logger.error(traceback.format_exc()) |
|
||||||
raise e |
|
@ -1,90 +0,0 @@ |
|||||||
#!/usr/bin/python |
|
||||||
#encoding=utf-8 |
|
||||||
# author: tangwy |
|
||||||
|
|
||||||
import json |
|
||||||
import os,re |
|
||||||
import codecs |
|
||||||
import csv |
|
||||||
import ConfigParser |
|
||||||
from isoc.utils.esUtil import EsUtil |
|
||||||
|
|
||||||
|
|
||||||
print json.dumps(es_host_list) |
|
||||||
# conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf') |
|
||||||
# ini_path = os.path.join(conf_path, 'conf.ini') |
|
||||||
# config = ConfigParser.ConfigParser() |
|
||||||
# config.read(ini_path) |
|
||||||
|
|
||||||
# ES_HOST = config.get('COMMON', 'es_host') |
|
||||||
# ES_PER_COUNT = config.get('COMMON', 'es_per_count') |
|
||||||
# ES_INDEX_NAME = config.get('COMMON', 'es_index_name') |
|
||||||
# CSV_FILE_PATH = config.get('COMMON', 'csv_file_path') |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def createIndex(): |
|
||||||
es = Elasticsearch(es_host_list) |
|
||||||
es.create(index="urba_analyse_2024_06", ignore=400) |
|
||||||
|
|
||||||
map={ |
|
||||||
"ip1": "text", |
|
||||||
"ip2": "text", |
|
||||||
"ip3": "text", |
|
||||||
"ip4": "text", |
|
||||||
} |
|
||||||
es_instance = EsUtil() |
|
||||||
res = es_instance.create_index_simple("urba_analyse_2024_06") |
|
||||||
return res |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# def generate_ip_range(start_ip, end_ip): |
|
||||||
# start_parts = list(map(int, start_ip.split('.'))) |
|
||||||
# end_parts = list(map(int, end_ip.split('.'))) |
|
||||||
# ip_range = [] |
|
||||||
|
|
||||||
# while start_parts < end_parts: |
|
||||||
# ip_range.append('.'.join(map(str, start_parts))) |
|
||||||
# start_parts[3] += 1 |
|
||||||
# for i in range(3, 0, -1): |
|
||||||
# if start_parts[i] == 256: |
|
||||||
# start_parts[i] = 0 |
|
||||||
# start_parts[i-1] += 1 |
|
||||||
|
|
||||||
# ip_range.append('.'.join(map(str, start_parts))) # 添加结束IP地址 |
|
||||||
# return ip_range |
|
||||||
|
|
||||||
# # scroll查询数据 |
|
||||||
# def get_ip_summary_data(start_time,end_time,query_body): |
|
||||||
# es = Elasticsearch(ES_HOST) |
|
||||||
# msg = es.search(index=ES_INDEX_NAME,scroll="3m",size=ES_PER_COUNT,_source_includes= ["cookies","url","sip","dip"], query=query_body) |
|
||||||
|
|
||||||
# result = msg['hits']['hits'] |
|
||||||
# total = msg['hits']['total'] |
|
||||||
# scroll_id = msg['_scroll_id'] |
|
||||||
|
|
||||||
# for i in range(0,int(total["value"]/ES_PER_COUNT)+1): |
|
||||||
# query_scroll = es.scroll(scroll_id=scroll_id, scroll='3m')["hits"]["hits"] |
|
||||||
# result += query_scroll |
|
||||||
# return result |
|
||||||
|
|
||||||
# # 读取csv文件 获取ip归属地 |
|
||||||
# def get_ip_area_relation(csv_file_path): |
|
||||||
# iprange_map = {} |
|
||||||
# with codecs.open(csv_file_path, mode='r',encoding='utf-8') as file: |
|
||||||
# csv_reader = csv.reader(file) |
|
||||||
# for row in csv_reader: |
|
||||||
# headers = next(csv_reader) |
|
||||||
# ip_start = headers[0] |
|
||||||
# ip_end = headers[1] |
|
||||||
# ip_range = generate_ip_range(ip_start, ip_end) |
|
||||||
# ip_area = headers[5] |
|
||||||
# print (ip_area) |
|
||||||
# for ip in ip_range: |
|
||||||
# iprange_map[ip] = ip_area |
|
||||||
# return iprange_map |
|
||||||
|
|
||||||
# get_ip_area_relation("/tmp/data/ip_area_relation.csv") |
|
@ -0,0 +1,205 @@ |
|||||||
|
#!/usr/bin/python |
||||||
|
#encoding=utf-8 |
||||||
|
# author: tangwy |
||||||
|
import re,os,json |
||||||
|
import codecs |
||||||
|
from db2json import DBUtils |
||||||
|
from datetime import datetime, timedelta |
||||||
|
from ext_logging import logger_cron,get_clean_file_path |
||||||
|
from dataInterface.functions import CFunction |
||||||
|
from dataInterface.db.params import CPgSqlParam |
||||||
|
|
||||||
|
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$') |
||||||
|
|
||||||
|
LOG_TABLE_NAME = "ueba_analysis_schema.logs" |
||||||
|
|
||||||
|
DATA_TYPE = { |
||||||
|
"IP": 1, |
||||||
|
"ACCOUNT": 2, |
||||||
|
"INTERFACE": 3, |
||||||
|
"MENU": 4, |
||||||
|
} |
||||||
|
|
||||||
|
# 获取当前日期并格式化为"年-月" |
||||||
|
def get_current_year_month(): |
||||||
|
now = datetime.now() |
||||||
|
return now.strftime("%Y_%m") |
||||||
|
|
||||||
|
# 获取当前月份的第一天并格式化为"年-月-日" |
||||||
|
def get_first_day_of_current_month(): |
||||||
|
now = datetime.now() |
||||||
|
first_day = now.replace(day=1) |
||||||
|
return first_day.strftime("%Y-%m-%d") |
||||||
|
|
||||||
|
# 获取当前日期,然后计算下个月的第一天 |
||||||
|
def get_first_day_of_next_month(): |
||||||
|
now = datetime.now() |
||||||
|
if now.month == 12: |
||||||
|
next_month = now.replace(year=now.year+1, month=1, day=1) |
||||||
|
else: |
||||||
|
next_month = now.replace(month=now.month+1, day=1) |
||||||
|
return next_month.strftime("%Y-%m-%d") |
||||||
|
|
||||||
|
#获取表名 |
||||||
|
def get_table_name(): |
||||||
|
year_month = get_current_year_month() |
||||||
|
return LOG_TABLE_NAME+'_'+ year_month |
||||||
|
|
||||||
|
#获取表区间 |
||||||
|
def get_table_data_range(): |
||||||
|
start= get_first_day_of_current_month() |
||||||
|
end = get_first_day_of_next_month() |
||||||
|
return start,end |
||||||
|
|
||||||
|
#创建分区表 |
||||||
|
def create_fq_table(): |
||||||
|
table_name = get_table_name() |
||||||
|
start,end = get_table_data_range() |
||||||
|
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) |
||||||
|
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs |
||||||
|
FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end) |
||||||
|
CFunction.execute(CPgSqlParam(sql)) |
||||||
|
logger_cron.info("INSERT:创建分区表完成") |
||||||
|
|
||||||
|
#读取大文件 |
||||||
|
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据 |
||||||
|
json_object = '' |
||||||
|
with codecs.open(filename, 'r', encoding='utf-8') as f: |
||||||
|
while True: |
||||||
|
chunk = f.read(chunk_size) |
||||||
|
if not chunk: |
||||||
|
break |
||||||
|
json_object += chunk |
||||||
|
|
||||||
|
data = json.loads(json_object) |
||||||
|
return data |
||||||
|
|
||||||
|
def get_all_files(path): |
||||||
|
# 列出所有包含匹配模式的文件名 |
||||||
|
files = [] |
||||||
|
for filename in os.listdir(path): |
||||||
|
if date_pattern.search(filename): |
||||||
|
if datetime.now().strftime("%Y-%m-%d")+".json" != filename: |
||||||
|
files.append({"filename": filename, "path": os.path.join(path,filename)}) |
||||||
|
return files |
||||||
|
|
||||||
|
def insert_data(files): |
||||||
|
for itemFile in files: |
||||||
|
if os.path.exists(itemFile.get("path",'')): |
||||||
|
data =read_large_json_file(itemFile.get("path",'')) |
||||||
|
logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path','')) |
||||||
|
logger_cron.info("INSERT: 读取聚合文件完成") |
||||||
|
ip_list = data.get('ip', []) |
||||||
|
account_list = data.get('account', []) |
||||||
|
interface_list = data.get('interface', []) |
||||||
|
menu_list = data.get('menu', []) |
||||||
|
|
||||||
|
logger_cron.info("INSERT: IP维度 " +str(len(ip_list))) |
||||||
|
logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list))) |
||||||
|
logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list))) |
||||||
|
logger_cron.info("INSERT: MENU维度 " +str(len(menu_list))) |
||||||
|
|
||||||
|
basename, extension = os.path.splitext(itemFile.get('filename', '')) |
||||||
|
log_date = basename |
||||||
|
print ("filename:"+log_date) |
||||||
|
records = [] |
||||||
|
for item in ip_list: |
||||||
|
menu = item.get('menu', '') |
||||||
|
ip = item.get('ip', '0.0.0.0') |
||||||
|
account = item.get('account', '') |
||||||
|
jobnum = item.get('jobnum', '') |
||||||
|
count = item.get('count', 0) |
||||||
|
logdate = log_date |
||||||
|
datatype = DATA_TYPE.get("IP",1) |
||||||
|
interface = item.get('interface', '') |
||||||
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) |
||||||
|
for item in account_list: |
||||||
|
menu = item.get('menu', '') |
||||||
|
ip = item.get('ip', '0.0.0.0') |
||||||
|
account = item.get('account', '') |
||||||
|
jobnum = item.get('jobnum', '') |
||||||
|
count = item.get('count', 0) |
||||||
|
logdate = log_date |
||||||
|
datatype = DATA_TYPE.get("ACCOUNT",2) |
||||||
|
interface = item.get('interface', '') |
||||||
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) |
||||||
|
for item in interface_list: |
||||||
|
menu = item.get('menu', '') |
||||||
|
ip = item.get('ip', '0.0.0.0') |
||||||
|
account = item.get('account', '') |
||||||
|
jobnum = item.get('jobnum', '') |
||||||
|
count = item.get('count', 0) |
||||||
|
logdate = log_date |
||||||
|
datatype = DATA_TYPE.get("INTERFACE",3) |
||||||
|
interface = item.get('interface', '') |
||||||
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) |
||||||
|
for item in menu_list: |
||||||
|
menu = item.get('menu', '') |
||||||
|
ip = item.get('ip', '0.0.0.0') |
||||||
|
account = item.get('account', '') |
||||||
|
jobnum = item.get('jobnum', '') |
||||||
|
count = item.get('count', 0) |
||||||
|
logdate = log_date |
||||||
|
datatype = DATA_TYPE.get("MENU",4) |
||||||
|
interface = item.get('interface', '') |
||||||
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) |
||||||
|
|
||||||
|
data_list=[] |
||||||
|
for item in records: |
||||||
|
data_list.append(item.get('menu', '')) |
||||||
|
data_list.append(item.get('ip', '')) |
||||||
|
data_list.append(item.get('account', '')) |
||||||
|
data_list.append(item.get('jobnum', '')) |
||||||
|
data_list.append(item.get('count', '')) |
||||||
|
data_list.append(item.get('logdate', '')) |
||||||
|
data_list.append(item.get('datatype', '')) |
||||||
|
data_list.append(item.get('interface', '')) |
||||||
|
|
||||||
|
sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface) |
||||||
|
VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records])) |
||||||
|
CFunction.execute(CPgSqlParam(sql, params=data_list)) |
||||||
|
logger_cron.info("INSERT: 数据插入完成") |
||||||
|
|
||||||
|
#重命名文件 |
||||||
|
logger_cron.info(itemFile.get('path','')) |
||||||
|
logger_cron.info("done_"+itemFile.get('filename', '')) |
||||||
|
os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', '')) |
||||||
|
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', '')) |
||||||
|
|
||||||
|
def delete_files(directory_path): |
||||||
|
""" |
||||||
|
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件, |
||||||
|
其中日期部分早于10天以前。 |
||||||
|
|
||||||
|
:param directory_path: 要检查的目录的绝对路径 |
||||||
|
""" |
||||||
|
# 计算10天前的日期 |
||||||
|
ten_days_ago = datetime.now() - timedelta(days=10) |
||||||
|
ten_days_ago_str = ten_days_ago.strftime('%Y-%m-%d') |
||||||
|
|
||||||
|
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名 |
||||||
|
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})') |
||||||
|
|
||||||
|
# 遍历目录中的文件 |
||||||
|
for filename in os.listdir(directory_path): |
||||||
|
match = date_pattern.search(filename) |
||||||
|
if match: |
||||||
|
file_date_str = match.group(1) |
||||||
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d') |
||||||
|
|
||||||
|
# 检查文件日期是否在10天前 |
||||||
|
if file_date <= ten_days_ago: |
||||||
|
file_path = os.path.join(directory_path, filename) |
||||||
|
os.remove(file_path) |
||||||
|
logger_cron.info("INSERT: 删除文件"+file_path) |
||||||
|
|
||||||
|
def entry(): |
||||||
|
base_path = get_clean_file_path() |
||||||
|
files = get_all_files(base_path) |
||||||
|
logger_cron.info("INSERT:获取文件数量"+str(len(files))) |
||||||
|
#创建分区表 |
||||||
|
create_fq_table() |
||||||
|
#数据入库 |
||||||
|
insert_data(files) |
||||||
|
#删除文件 |
||||||
|
delete_files(base_path) |
Loading…
Reference in new issue