|
|
@ -1,5 +1,6 @@ |
|
|
|
#encoding=utf-8 |
|
|
|
#encoding=utf-8 |
|
|
|
import json |
|
|
|
import json |
|
|
|
|
|
|
|
import codecs,os |
|
|
|
import time,datetime |
|
|
|
import time,datetime |
|
|
|
import traceback |
|
|
|
import traceback |
|
|
|
from datetime import datetime, timedelta |
|
|
|
from datetime import datetime, timedelta |
|
|
@ -192,7 +193,7 @@ def get_menu_group_data(index,startTime,endTime): |
|
|
|
"count": bucket['doc_count'], |
|
|
|
"count": bucket['doc_count'], |
|
|
|
"jobnum": bucket['key']['trojan_type'] , |
|
|
|
"jobnum": bucket['key']['trojan_type'] , |
|
|
|
"ip":bucket['key']['sip'], |
|
|
|
"ip":bucket['key']['sip'], |
|
|
|
"menu_name": bucket['key']['worm_family'], |
|
|
|
"menu": bucket['key']['worm_family'], |
|
|
|
} |
|
|
|
} |
|
|
|
datas.append(data) |
|
|
|
datas.append(data) |
|
|
|
after_key = bucket["key"] |
|
|
|
after_key = bucket["key"] |
|
|
@ -206,17 +207,145 @@ def datetime_to_timestamp(dt): |
|
|
|
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000) |
|
|
|
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000) |
|
|
|
def clean_data(read_index,start,end): |
|
|
|
def clean_data(read_index,start,end): |
|
|
|
data_ip = get_ip_group_data(read_index,start,end) |
|
|
|
data_ip = get_ip_group_data(read_index,start,end) |
|
|
|
print "data_ip:"+str(len(data_ip)) |
|
|
|
# print "data_ip:"+str(len(data_ip)) |
|
|
|
data_account = get_account_group_data(read_index,start,end) |
|
|
|
data_account = get_account_group_data(read_index,start,end) |
|
|
|
print "data_ip:"+str(len(data_account)) |
|
|
|
# print "data_account:"+str(len(data_account)) |
|
|
|
data_interface = get_interface_group_data(read_index,start,end) |
|
|
|
data_interface = get_interface_group_data(read_index,start,end) |
|
|
|
print "data_ip:"+str(len(data_interface)) |
|
|
|
# print "data_interface:"+str(len(data_interface)) |
|
|
|
data_menu = get_menu_group_data(read_index,start,end) |
|
|
|
data_menu = get_menu_group_data(read_index,start,end) |
|
|
|
print "data_ip:"+str(len(data_menu)) |
|
|
|
# print "data_menu:"+str(len(data_menu)) |
|
|
|
res_data = data_ip+data_account+data_interface+data_menu |
|
|
|
res_data = data_ip+data_account+data_interface+data_menu |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print ("resdata:"+json.dumps(res_data)) |
|
|
|
|
|
|
|
|
|
|
|
#todo 读取上一次5分钟的文件,与这5分钟的文件合并 |
|
|
|
#todo 读取上一次5分钟的文件,与这5分钟的文件合并 |
|
|
|
#合并完成后 写文件 |
|
|
|
#合并完成后 写文件 |
|
|
|
|
|
|
|
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start) |
|
|
|
|
|
|
|
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start): |
|
|
|
|
|
|
|
ipGroupStr = "ip,jobnum" |
|
|
|
|
|
|
|
ipGroup = group_and_sum(data_ip, ipGroupStr) |
|
|
|
|
|
|
|
accountGroupStr = "account,jobnum" |
|
|
|
|
|
|
|
accountGroup = group_and_sum(data_account, accountGroupStr) |
|
|
|
|
|
|
|
interfaceGroupStr = "interface,ip,account,jobnum" |
|
|
|
|
|
|
|
interfaceGroup = group_and_sum(data_interface, interfaceGroupStr) |
|
|
|
|
|
|
|
menuGroupStr = "menu,ip,account,jobnum" |
|
|
|
|
|
|
|
menuGroup = group_and_sum(data_menu, menuGroupStr) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data = {} |
|
|
|
|
|
|
|
data["ip"] = ipGroup |
|
|
|
|
|
|
|
data["account"] = accountGroup |
|
|
|
|
|
|
|
data["interface"] = interfaceGroup |
|
|
|
|
|
|
|
data["menu"] = menuGroup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 获取当前工作目录 |
|
|
|
|
|
|
|
current_dir = os.getcwd() |
|
|
|
|
|
|
|
time_struct = time.gmtime(long(start) / 1000.0) # UTC时间 |
|
|
|
|
|
|
|
date_time = time.strftime('%Y-%m-%d', time_struct) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
file_path = os.path.join(current_dir, 'files/' + date_time + '.json') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_data = [data] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if os.path.exists(file_path): |
|
|
|
|
|
|
|
# 打开文件并读取内容 |
|
|
|
|
|
|
|
with codecs.open(file_path, 'r', encoding='utf-8') as file: |
|
|
|
|
|
|
|
content = file.read() |
|
|
|
|
|
|
|
old_json_data = json.loads(content) |
|
|
|
|
|
|
|
all_data = [data, old_json_data] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
merged_data = merge_data(all_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 使用codecs模块以UTF-8编码打开文件 |
|
|
|
|
|
|
|
f = codecs.open(file_path, 'w', encoding='utf-8') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
json_data = json.dumps(merged_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 写入Unicode字符串 |
|
|
|
|
|
|
|
f.write(json_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 关闭文件 |
|
|
|
|
|
|
|
f.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def group_and_sum(data, by_fields="ip,jobnum"): |
|
|
|
|
|
|
|
# 将by_fields转换为列表 |
|
|
|
|
|
|
|
by_fields_list = by_fields.split(',') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
aggregated_data = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 遍历数据,进行聚合 |
|
|
|
|
|
|
|
for record in data: |
|
|
|
|
|
|
|
# 确保所有字段都存在于记录中 |
|
|
|
|
|
|
|
if all(field in record for field in by_fields_list): |
|
|
|
|
|
|
|
# 构建聚合键 |
|
|
|
|
|
|
|
key = tuple(record[field] for field in by_fields_list) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 如果键不存在,则初始化计数器 |
|
|
|
|
|
|
|
if key not in aggregated_data: |
|
|
|
|
|
|
|
aggregated_data[key] = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 累加count值 |
|
|
|
|
|
|
|
aggregated_data[key] += record['count'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 将结果转换为期望的输出格式 |
|
|
|
|
|
|
|
output = [ |
|
|
|
|
|
|
|
dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count) |
|
|
|
|
|
|
|
for key, count in aggregated_data.items() |
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_data(datasets): |
|
|
|
|
|
|
|
# 初始化一个空的字典来保存合并后的数据 |
|
|
|
|
|
|
|
merged_data = { |
|
|
|
|
|
|
|
"ip": [], |
|
|
|
|
|
|
|
"account": [], |
|
|
|
|
|
|
|
"interface": [], |
|
|
|
|
|
|
|
"menu": [] |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 遍历所有数据集 |
|
|
|
|
|
|
|
for dataset in datasets: |
|
|
|
|
|
|
|
# 遍历数据集中的每个类别 |
|
|
|
|
|
|
|
for category, items in dataset.items(): |
|
|
|
|
|
|
|
# 将当前数据集的项目添加到合并数据的相应类别中 |
|
|
|
|
|
|
|
merged_data[category].extend(items) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 定义一个字典来存储聚合后的数据 |
|
|
|
|
|
|
|
aggregated_data = { |
|
|
|
|
|
|
|
"ip": [], |
|
|
|
|
|
|
|
"account": [], |
|
|
|
|
|
|
|
"interface": [], |
|
|
|
|
|
|
|
"menu": [] |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 遍历所有类别 |
|
|
|
|
|
|
|
for category in aggregated_data: |
|
|
|
|
|
|
|
# 创建一个字典来存储每个类别的聚合数据 |
|
|
|
|
|
|
|
category_data = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 如果当前类别存在于merged_data中 |
|
|
|
|
|
|
|
if category in merged_data: |
|
|
|
|
|
|
|
for item in merged_data[category]: |
|
|
|
|
|
|
|
# 确定非计数字段 |
|
|
|
|
|
|
|
keys_to_use = [k for k in item if k != 'count'] |
|
|
|
|
|
|
|
# 使用元组作为键,包含所有非计数字段 |
|
|
|
|
|
|
|
key_tuple = tuple(item[k] for k in keys_to_use) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if key_tuple not in category_data: |
|
|
|
|
|
|
|
category_data[key_tuple] = item['count'] |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
category_data[key_tuple] += item['count'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 将聚合后的数据转换回原始格式 |
|
|
|
|
|
|
|
aggregated_data[category] = [ |
|
|
|
|
|
|
|
dict(zip(keys_to_use, key_tuple) + [('count', count)]) |
|
|
|
|
|
|
|
for key_tuple, count in category_data.items() |
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return aggregated_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#入口 |
|
|
|
#入口 |
|
|
|
def entry(start,end): |
|
|
|
def entry(start,end): |
|
|
|
base_index ="bsa_traffic*" |
|
|
|
base_index ="bsa_traffic*" |
|
|
@ -224,6 +353,7 @@ def entry(start,end): |
|
|
|
# start = datetime_to_timestamp(start) |
|
|
|
# start = datetime_to_timestamp(start) |
|
|
|
# end = datetime_to_timestamp(end) |
|
|
|
# end = datetime_to_timestamp(end) |
|
|
|
res=es_util_instance.get_available_index_name(start,end,base_index) |
|
|
|
res=es_util_instance.get_available_index_name(start,end,base_index) |
|
|
|
|
|
|
|
print "xxxx:"+str(len(res)) |
|
|
|
if len(res)==0: |
|
|
|
if len(res)==0: |
|
|
|
return |
|
|
|
return |
|
|
|
index =",".join(res) |
|
|
|
index =",".join(res) |
|
|
@ -231,8 +361,8 @@ def entry(start,end): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start = 1720775286000 |
|
|
|
start = 1720772586000 |
|
|
|
end = 1720775586000 |
|
|
|
end = 1720776186000 |
|
|
|
# # 将 datetime 对象转换为秒级时间戳 |
|
|
|
# # 将 datetime 对象转换为秒级时间戳 |
|
|
|
# timestamp_seconds = time.mktime(dt.timetuple()) |
|
|
|
# timestamp_seconds = time.mktime(dt.timetuple()) |
|
|
|
# # 获取微秒数 |
|
|
|
# # 获取微秒数 |
|
|
|