diff --git a/utils/base_dataclean_pg.py b/utils/base_dataclean_pg.py index 3908544..43bc208 100644 --- a/utils/base_dataclean_pg.py +++ b/utils/base_dataclean_pg.py @@ -1,5 +1,6 @@ #encoding=utf-8 import json +import codecs,os import time,datetime import traceback from datetime import datetime, timedelta @@ -159,24 +160,24 @@ def get_interface_group_data(index,startTime,endTime): ## 菜单维度 def get_menu_group_data(index,startTime,endTime): query_body={ - "size": 0, - "query": { - "range": {"timestamp": {"gte": startTime,"lte": endTime}} - }, - "aggs": { - "composite_buckets": { - "composite": { - "size": size, - "sources": [ - {"worm_family": { "terms": {"field": "worm_family"} }}, - {"sip": { "terms": { "field": "sip"}}}, - {"account": { "terms": { "field": "account"}}}, - {"trojan_type": { "terms": { "field": "trojan_type"}}}, - ] + "size": 0, + "query": { + "range": {"timestamp": {"gte": startTime,"lte": endTime}} + }, + "aggs": { + "composite_buckets": { + "composite": { + "size": size, + "sources": [ + {"worm_family": { "terms": {"field": "worm_family"} }}, + {"sip": { "terms": { "field": "sip"}}}, + {"account": { "terms": { "field": "account"}}}, + {"trojan_type": { "terms": { "field": "trojan_type"}}}, + ] + } } } } -} after_key = None es_util_instance = EsUtil() datas=[] @@ -192,7 +193,7 @@ def get_menu_group_data(index,startTime,endTime): "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "ip":bucket['key']['sip'], - "menu_name": bucket['key']['worm_family'], + "menu": bucket['key']['worm_family'], } datas.append(data) after_key = bucket["key"] @@ -206,17 +207,145 @@ def datetime_to_timestamp(dt): return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000) def clean_data(read_index,start,end): data_ip = get_ip_group_data(read_index,start,end) - print "data_ip:"+str(len(data_ip)) + # print "data_ip:"+str(len(data_ip)) data_account = get_account_group_data(read_index,start,end) - print "data_ip:"+str(len(data_account)) + # print "data_account:"+str(len(data_account)) data_interface = get_interface_group_data(read_index,start,end) - print "data_ip:"+str(len(data_interface)) + # print "data_interface:"+str(len(data_interface)) data_menu = get_menu_group_data(read_index,start,end) - print "data_ip:"+str(len(data_menu)) + # print "data_menu:"+str(len(data_menu)) res_data = data_ip+data_account+data_interface+data_menu + print ("resdata:"+json.dumps(res_data)) + #todo 读取上一次5分钟的文件,与这5分钟的文件合并 #合并完成后 写文件 + group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start) +def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start): + ipGroupStr = "ip,jobnum" + ipGroup = group_and_sum(data_ip, ipGroupStr) + accountGroupStr = "account,jobnum" + accountGroup = group_and_sum(data_account, accountGroupStr) + interfaceGroupStr = "interface,ip,account,jobnum" + interfaceGroup = group_and_sum(data_interface, interfaceGroupStr) + menuGroupStr = "menu,ip,account,jobnum" + menuGroup = group_and_sum(data_menu, menuGroupStr) + + + data = {} + data["ip"] = ipGroup + data["account"] = accountGroup + data["interface"] = interfaceGroup + data["menu"] = menuGroup + + # 获取当前工作目录 + current_dir = os.getcwd() + time_struct = time.gmtime(long(start) / 1000.0) # UTC时间 + date_time = time.strftime('%Y-%m-%d', time_struct) + + file_path = os.path.join(current_dir, 'files/' + date_time + '.json') + + all_data = [data] + + if os.path.exists(file_path): + # 打开文件并读取内容 + with codecs.open(file_path, 'r', encoding='utf-8') as file: + content = file.read() + old_json_data = json.loads(content) + all_data = [data, old_json_data] + + merged_data = merge_data(all_data) + + # 使用codecs模块以UTF-8编码打开文件 + f = codecs.open(file_path, 'w', encoding='utf-8') + + json_data = json.dumps(merged_data) + + # 写入Unicode字符串 + f.write(json_data) + + # 关闭文件 + f.close() + +def group_and_sum(data, by_fields="ip,jobnum"): + # 将by_fields转换为列表 + by_fields_list = by_fields.split(',') + + aggregated_data = {} + + # 遍历数据,进行聚合 + for record in data: + # 确保所有字段都存在于记录中 + if all(field in record for field in by_fields_list): + # 构建聚合键 + key = tuple(record[field] for field in by_fields_list) + + # 如果键不存在,则初始化计数器 + if key not in aggregated_data: + aggregated_data[key] = 0 + + # 累加count值 + aggregated_data[key] += record['count'] + + # 将结果转换为期望的输出格式 + output = [ + dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count) + for key, count in aggregated_data.items() + ] + + return output + +def merge_data(datasets): + # 初始化一个空的字典来保存合并后的数据 + merged_data = { + "ip": [], + "account": [], + "interface": [], + "menu": [] + } + + # 遍历所有数据集 + for dataset in datasets: + # 遍历数据集中的每个类别 + for category, items in dataset.items(): + # 将当前数据集的项目添加到合并数据的相应类别中 + merged_data[category].extend(items) + + # 定义一个字典来存储聚合后的数据 + aggregated_data = { + "ip": [], + "account": [], + "interface": [], + "menu": [] + } + + # 遍历所有类别 + for category in aggregated_data: + # 创建一个字典来存储每个类别的聚合数据 + category_data = {} + + # 如果当前类别存在于merged_data中 + if category in merged_data: + for item in merged_data[category]: + # 确定非计数字段 + keys_to_use = [k for k in item if k != 'count'] + # 使用元组作为键,包含所有非计数字段 + key_tuple = tuple(item[k] for k in keys_to_use) + + if key_tuple not in category_data: + category_data[key_tuple] = item['count'] + else: + category_data[key_tuple] += item['count'] + + # 将聚合后的数据转换回原始格式 + aggregated_data[category] = [ + dict(zip(keys_to_use, key_tuple) + [('count', count)]) + for key_tuple, count in category_data.items() + ] + + return aggregated_data + + #入口 def entry(start,end): base_index ="bsa_traffic*" @@ -224,6 +353,7 @@ def entry(start,end): # start = datetime_to_timestamp(start) # end = datetime_to_timestamp(end) res=es_util_instance.get_available_index_name(start,end,base_index) + print "xxxx:"+str(len(res)) if len(res)==0: return index =",".join(res) @@ -231,8 +361,8 @@ def entry(start,end): -start = 1720775286000 -end = 1720775586000 +start = 1720772586000 +end = 1720776186000 # # 将 datetime 对象转换为秒级时间戳 # timestamp_seconds = time.mktime(dt.timetuple()) # # 获取微秒数