es查询出结果分组聚合,写入文件

dev
Yang 4 months ago
parent bfaa289153
commit bde11596e4
  1. 139
      utils/base_dataclean_pg.py

@ -1,5 +1,6 @@
#encoding=utf-8
import json
import codecs,os
import time,datetime
import traceback
from datetime import datetime, timedelta
@ -192,7 +193,7 @@ def get_menu_group_data(index,startTime,endTime):
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"ip":bucket['key']['sip'],
"menu_name": bucket['key']['worm_family'],
"menu": bucket['key']['worm_family'],
}
datas.append(data)
after_key = bucket["key"]
@ -204,18 +205,146 @@ def get_menu_group_data(index,startTime,endTime):
def clean_data(read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end)
print "data_ip:"+str(len(data_ip))
# print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end)
print "data_ip:"+str(len(data_account))
# print "data_account:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end)
print "data_ip:"+str(len(data_interface))
# print "data_interface:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end)
print "data_ip:"+str(len(data_menu))
# print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr)
accountGroupStr = "account,jobnum"
accountGroup = group_and_sum(data_account, accountGroupStr)
interfaceGroupStr = "interface,ip,account,jobnum"
interfaceGroup = group_and_sum(data_interface, interfaceGroupStr)
menuGroupStr = "menu,ip,account,jobnum"
menuGroup = group_and_sum(data_menu, menuGroupStr)
data = {}
data["ip"] = ipGroup
data["account"] = accountGroup
data["interface"] = interfaceGroup
data["menu"] = menuGroup
# 获取当前工作目录
current_dir = os.getcwd()
time_struct = time.gmtime(long(start) / 1000.0) # UTC时间
date_time = time.strftime('%Y-%m-%d', time_struct)
file_path = os.path.join(current_dir, 'files/' + date_time + '.json')
all_data = [data]
if os.path.exists(file_path):
# 打开文件并读取内容
with codecs.open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
old_json_data = json.loads(content)
all_data = [data, old_json_data]
merged_data = merge_data(all_data)
# 使用codecs模块以UTF-8编码打开文件
f = codecs.open(file_path, 'w', encoding='utf-8')
json_data = json.dumps(merged_data)
# 写入Unicode字符串
f.write(json_data)
# 关闭文件
f.close()
def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表
by_fields_list = by_fields.split(',')
aggregated_data = {}
# 遍历数据,进行聚合
for record in data:
# 确保所有字段都存在于记录中
if all(field in record for field in by_fields_list):
# 构建聚合键
key = tuple(record[field] for field in by_fields_list)
# 如果键不存在,则初始化计数器
if key not in aggregated_data:
aggregated_data[key] = 0
# 累加count值
aggregated_data[key] += record['count']
# 将结果转换为期望的输出格式
output = [
dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count)
for key, count in aggregated_data.items()
]
return output
def merge_data(datasets):
# 初始化一个空的字典来保存合并后的数据
merged_data = {
"ip": [],
"account": [],
"interface": [],
"menu": []
}
# 遍历所有数据集
for dataset in datasets:
# 遍历数据集中的每个类别
for category, items in dataset.items():
# 将当前数据集的项目添加到合并数据的相应类别中
merged_data[category].extend(items)
# 定义一个字典来存储聚合后的数据
aggregated_data = {
"ip": [],
"account": [],
"interface": [],
"menu": []
}
# 遍历所有类别
for category in aggregated_data:
# 创建一个字典来存储每个类别的聚合数据
category_data = {}
# 如果当前类别存在于merged_data中
if category in merged_data:
for item in merged_data[category]:
# 确定非计数字段
keys_to_use = [k for k in item if k != 'count']
# 使用元组作为键,包含所有非计数字段
key_tuple = tuple(item[k] for k in keys_to_use)
if key_tuple not in category_data:
category_data[key_tuple] = item['count']
else:
category_data[key_tuple] += item['count']
# 将聚合后的数据转换回原始格式
aggregated_data[category] = [
dict(zip(keys_to_use, key_tuple) + [('count', count)])
for key_tuple, count in category_data.items()
]
return aggregated_data
#入口
def entry(start,end):
base_index ="bsa_traffic*"

Loading…
Cancel
Save