You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hbyd_ueba/utils/base_dataclean_pg.py

439 lines
14 KiB

#encoding=utf-8
import json
import codecs,os
import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
import codecs
from esUtil import EsUtil
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
size = 9999#根据实际情况调整
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
## IP维度
def get_ip_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"sip": { "terms": {"field": "sip"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("IP"),
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"ip":bucket['key']['sip']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 账号维度
def get_account_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"account": { "terms": {"field": "account"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("ACCOUNT"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def get_interface_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("INTERFACE"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"interface": bucket['key']['interface'] ,
"ip":bucket['key']['sip']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def get_menu_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("MENU"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"ip":bucket['key']['sip'],
"menu": bucket['key']['worm_family'],
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
def datetime_to_timestamp(dt):
dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end,jobid):
data_ip = get_ip_group_data(read_index,start,end)
data_account = get_account_group_data(read_index,start,end)
data_interface = get_interface_group_data(read_index,start,end)
data_menu = get_menu_group_data(read_index,start,end)
if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")
return
logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据")
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid)
#读取大文件
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
#写入大文件
def write_large_file(filename, data_list, chunk_size=1024*1024*5):
with codecs.open(filename, 'w', encoding='utf-8') as f:
for i in range(0, len(data_list), chunk_size):
chunk = data_list[i:i + chunk_size]
f.write(chunk)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid):
ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr)
accountGroupStr = "account,jobnum"
accountGroup = group_and_sum(data_account, accountGroupStr)
interfaceGroupStr = "interface,ip,account,jobnum"
interfaceGroup = group_and_sum(data_interface, interfaceGroupStr)
menuGroupStr = "menu,ip,account,jobnum"
menuGroup = group_and_sum(data_menu, menuGroupStr)
data = {}
data["ip"] = ipGroup
data["account"] = accountGroup
data["interface"] = interfaceGroup
data["menu"] = menuGroup
# 获取当前工作目录
# current_dir = os.getcwd()
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
date_time = convert_utc_to_local_time(start)
date_str = time.strftime('%Y-%m-%d', date_time)
file_path = os.path.join(base_path,date_str + '.json')
logger_cron.info("JOB:"+jobid+", 写入文件路径"+file_path)
all_data = [data]
logger_cron.info("JOB: "+jobid+",准备读取已有文件")
if os.path.exists(file_path):
# 打开文件并读取内容
old_json_data =read_large_json_file(file_path)
all_data = [data, old_json_data]
logger_cron.info("JOB:"+jobid+", 读取已有文件完成")
merged_data = merge_data(all_data)
json_data = json.dumps(merged_data)
#写入文件
write_large_file(file_path,json_data)
logger_cron.info("JOB: "+jobid+",写入文件完成")
def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表
by_fields_list = by_fields.split(',')
aggregated_data = {}
# 遍历数据,进行聚合
for record in data:
# 确保所有字段都存在于记录中
if all(field in record for field in by_fields_list):
# 构建聚合键
key = tuple(record[field] for field in by_fields_list)
# 如果键不存在,则初始化计数器
if key not in aggregated_data:
aggregated_data[key] = 0
# 累加count值
aggregated_data[key] += record['count']
# 将结果转换为期望的输出格式
output = [
dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count)
for key, count in aggregated_data.items()
]
return output
def merge_data(datasets):
# 初始化一个空的字典来保存合并后的数据
merged_data = {
"ip": [],
"account": [],
"interface": [],
"menu": []
}
# 遍历所有数据集
for dataset in datasets:
# 遍历数据集中的每个类别
for category, items in dataset.items():
# 将当前数据集的项目添加到合并数据的相应类别中
merged_data[category].extend(items)
# 定义一个字典来存储聚合后的数据
aggregated_data = {
"ip": [],
"account": [],
"interface": [],
"menu": []
}
# 遍历所有类别
for category in aggregated_data:
# 创建一个字典来存储每个类别的聚合数据
category_data = {}
# 如果当前类别存在于merged_data中
if category in merged_data:
for item in merged_data[category]:
# 确定非计数字段
keys_to_use = [k for k in item if k != 'count']
# 使用元组作为键,包含所有非计数字段
key_tuple = tuple(item[k] for k in keys_to_use)
if key_tuple not in category_data:
category_data[key_tuple] = item['count']
else:
category_data[key_tuple] += item['count']
# 将聚合后的数据转换回原始格式
aggregated_data[category] = [
dict(zip(keys_to_use, key_tuple) + [('count', count)])
for key_tuple, count in category_data.items()
]
return aggregated_data
def convert_utc_to_local_time(timestamp):
# 将毫秒时间戳转换为秒
seconds_since_epoch = long(timestamp) / 1000.0
# 使用 gmtime() 得到 UTC 时间结构体
time_struct_utc = time.gmtime(seconds_since_epoch)
# 将 UTC 时间转换为北京时间(东八区),即加 8 小时
time_struct_beijing = time.mktime(time_struct_utc) + (8 * 60 * 60)
# 将时间戳转换回 struct_time
time_struct_beijing = time.localtime(time_struct_beijing)
return time_struct_beijing
#入口
def entry(start,end,jobid):
base_index ="bsa_traffic*"
es_util_instance = EsUtil()
start = datetime_to_timestamp(start)
end = datetime_to_timestamp(end)
logger_cron.info("JOB:"+jobid+",start为"+str(start))
logger_cron.info("JOB:"+jobid+",end为"+str(end))
res=es_util_instance.get_available_index_name(start,end,base_index)
logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res))
if len(res)==0:
return
index =",".join(res)
clean_data(index,start,end,jobid)
# start = '2024-07-18 15:20:31'
# end = '2024-07-18 15:25:31'
# date_format = "%Y-%m-%d %H:%M:%S"
# date_str = datetime.strptime(start, date_format)
# end_str = datetime.strptime(end, date_format)
# # # 将 datetime 对象转换为秒级时间戳
# # timestamp_seconds = time.mktime(dt.timetuple())
# # # 获取微秒数
# # microseconds = dt.microsecond
# # # 转换为毫秒级时间戳
# # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
# entry(date_str,end_str,"xxxxxxxxxxxxxxxxxxx")