You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
356 lines
12 KiB
356 lines
12 KiB
#encoding=utf-8
|
|
import json
|
|
import codecs,os
|
|
import time,datetime
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
import calendar
|
|
from esUtil import EsUtil
|
|
import pytz
|
|
|
|
size = 1000# 可以根据实际情况调整
|
|
|
|
DATA_TYPE = {
|
|
"IP": 1,
|
|
"ACCOUNT": 2,
|
|
"INTERFACE": 3,
|
|
"MENU": 4,
|
|
}
|
|
|
|
## IP维度
|
|
def get_ip_group_data(index,startTime,endTime):
|
|
try:
|
|
query_body={
|
|
"size": 0,
|
|
"query": {
|
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
|
},
|
|
"aggs": {
|
|
"composite_buckets": {
|
|
"composite": {
|
|
"size": size,
|
|
"sources": [
|
|
{"sip": { "terms": {"field": "sip"} }},
|
|
{"trojan_type": { "terms": { "field": "trojan_type"}}}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
after_key = None
|
|
es_util_instance = EsUtil()
|
|
datas=[]
|
|
while True:
|
|
if after_key:
|
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
|
try:
|
|
response = es_util_instance.search(index,query_body)
|
|
except Exception,e:
|
|
print "err"
|
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
|
data = {
|
|
"data_type": DATA_TYPE.get("IP"),
|
|
"count": bucket['doc_count'],
|
|
"jobnum": bucket['key']['trojan_type'] ,
|
|
"ip":bucket['key']['sip']
|
|
}
|
|
datas.append(data)
|
|
after_key = bucket["key"]
|
|
|
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
|
break
|
|
|
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
|
except Exception,e:
|
|
print "x_err:"+e.message
|
|
return datas
|
|
|
|
## 账号维度
|
|
def get_account_group_data(index,startTime,endTime):
|
|
query_body={
|
|
"size": 0,
|
|
"query": {
|
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
|
},
|
|
"aggs": {
|
|
"composite_buckets": {
|
|
"composite": {
|
|
"size": size,
|
|
"sources": [
|
|
{"account": { "terms": {"field": "account"} }},
|
|
{"trojan_type": { "terms": { "field": "trojan_type"}}}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
after_key = None
|
|
es_util_instance = EsUtil()
|
|
datas=[]
|
|
while True:
|
|
if after_key:
|
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
|
response = es_util_instance.search(index,query_body)
|
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
|
data = {
|
|
"data_type": DATA_TYPE.get("ACCOUNT"),
|
|
"account": bucket['key']['account'],
|
|
"count": bucket['doc_count'],
|
|
"jobnum": bucket['key']['trojan_type']
|
|
}
|
|
datas.append(data)
|
|
after_key = bucket["key"]
|
|
|
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
|
break
|
|
|
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
|
|
|
return datas
|
|
|
|
## 接口维度
|
|
def get_interface_group_data(index,startTime,endTime):
|
|
query_body={
|
|
"size": 0,
|
|
"query": {
|
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
|
},
|
|
"aggs": {
|
|
"composite_buckets": {
|
|
"composite": {
|
|
"size": size,
|
|
"sources": [
|
|
{"interface": { "terms": {"field": "interface"} }},
|
|
{"sip": { "terms": { "field": "sip"}}},
|
|
{"account": { "terms": { "field": "account"}}},
|
|
{"trojan_type": { "terms": { "field": "trojan_type"}}},
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
after_key = None
|
|
es_util_instance = EsUtil()
|
|
datas=[]
|
|
while True:
|
|
if after_key:
|
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
|
response = es_util_instance.search(index,query_body)
|
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
|
data = {
|
|
"data_type": DATA_TYPE.get("INTERFACE"),
|
|
"account": bucket['key']['account'],
|
|
"count": bucket['doc_count'],
|
|
"jobnum": bucket['key']['trojan_type'] ,
|
|
"interface": bucket['key']['interface'] ,
|
|
"ip":bucket['key']['sip']
|
|
}
|
|
datas.append(data)
|
|
after_key = bucket["key"]
|
|
|
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
|
break
|
|
|
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
|
|
|
return datas
|
|
|
|
## 菜单维度
|
|
def get_menu_group_data(index,startTime,endTime):
|
|
query_body={
|
|
"size": 0,
|
|
"query": {
|
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
|
},
|
|
"aggs": {
|
|
"composite_buckets": {
|
|
"composite": {
|
|
"size": size,
|
|
"sources": [
|
|
{"worm_family": { "terms": {"field": "worm_family"} }},
|
|
{"sip": { "terms": { "field": "sip"}}},
|
|
{"account": { "terms": { "field": "account"}}},
|
|
{"trojan_type": { "terms": { "field": "trojan_type"}}},
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
after_key = None
|
|
es_util_instance = EsUtil()
|
|
datas=[]
|
|
while True:
|
|
if after_key:
|
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
|
response = es_util_instance.search(index,query_body)
|
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
|
data = {
|
|
"data_type": DATA_TYPE.get("MENU"),
|
|
"account": bucket['key']['account'],
|
|
"count": bucket['doc_count'],
|
|
"jobnum": bucket['key']['trojan_type'] ,
|
|
"ip":bucket['key']['sip'],
|
|
"menu": bucket['key']['worm_family'],
|
|
}
|
|
datas.append(data)
|
|
after_key = bucket["key"]
|
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
|
break
|
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
|
|
|
return datas
|
|
|
|
def clean_data(read_index,start,end):
|
|
data_ip = get_ip_group_data(read_index,start,end)
|
|
# print "data_ip:"+str(len(data_ip))
|
|
data_account = get_account_group_data(read_index,start,end)
|
|
# print "data_account:"+str(len(data_account))
|
|
data_interface = get_interface_group_data(read_index,start,end)
|
|
# print "data_interface:"+str(len(data_interface))
|
|
data_menu = get_menu_group_data(read_index,start,end)
|
|
# print "data_menu:"+str(len(data_menu))
|
|
res_data = data_ip+data_account+data_interface+data_menu
|
|
|
|
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
|
|
#合并完成后 写文件
|
|
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start)
|
|
|
|
|
|
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
|
|
ipGroupStr = "ip,jobnum"
|
|
ipGroup = group_and_sum(data_ip, ipGroupStr)
|
|
accountGroupStr = "account,jobnum"
|
|
accountGroup = group_and_sum(data_account, accountGroupStr)
|
|
interfaceGroupStr = "interface,ip,account,jobnum"
|
|
interfaceGroup = group_and_sum(data_interface, interfaceGroupStr)
|
|
menuGroupStr = "menu,ip,account,jobnum"
|
|
menuGroup = group_and_sum(data_menu, menuGroupStr)
|
|
|
|
|
|
data = {}
|
|
data["ip"] = ipGroup
|
|
data["account"] = accountGroup
|
|
data["interface"] = interfaceGroup
|
|
data["menu"] = menuGroup
|
|
|
|
# 获取当前工作目录
|
|
current_dir = os.getcwd()
|
|
time_struct = time.gmtime(long(start) / 1000.0) # UTC时间
|
|
date_time = time.strftime('%Y-%m-%d', time_struct)
|
|
|
|
file_path = os.path.join(current_dir, 'files/' + date_time + '.json')
|
|
|
|
all_data = [data]
|
|
|
|
if os.path.exists(file_path):
|
|
# 打开文件并读取内容
|
|
with codecs.open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
old_json_data = json.loads(content)
|
|
all_data = [data, old_json_data]
|
|
|
|
merged_data = merge_data(all_data)
|
|
|
|
# 使用codecs模块以UTF-8编码打开文件
|
|
f = codecs.open(file_path, 'w', encoding='utf-8')
|
|
|
|
json_data = json.dumps(merged_data)
|
|
|
|
# 写入Unicode字符串
|
|
f.write(json_data)
|
|
|
|
# 关闭文件
|
|
f.close()
|
|
|
|
|
|
|
|
def group_and_sum(data, by_fields="ip,jobnum"):
|
|
# 将by_fields转换为列表
|
|
by_fields_list = by_fields.split(',')
|
|
|
|
aggregated_data = {}
|
|
|
|
# 遍历数据,进行聚合
|
|
for record in data:
|
|
# 确保所有字段都存在于记录中
|
|
if all(field in record for field in by_fields_list):
|
|
# 构建聚合键
|
|
key = tuple(record[field] for field in by_fields_list)
|
|
|
|
# 如果键不存在,则初始化计数器
|
|
if key not in aggregated_data:
|
|
aggregated_data[key] = 0
|
|
|
|
# 累加count值
|
|
aggregated_data[key] += record['count']
|
|
|
|
# 将结果转换为期望的输出格式
|
|
output = [
|
|
dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count)
|
|
for key, count in aggregated_data.items()
|
|
]
|
|
|
|
return output
|
|
|
|
def merge_data(datasets):
|
|
# 初始化一个空的字典来保存合并后的数据
|
|
merged_data = {
|
|
"ip": [],
|
|
"account": [],
|
|
"interface": [],
|
|
"menu": []
|
|
}
|
|
|
|
# 遍历所有数据集
|
|
for dataset in datasets:
|
|
# 遍历数据集中的每个类别
|
|
for category, items in dataset.items():
|
|
# 将当前数据集的项目添加到合并数据的相应类别中
|
|
merged_data[category].extend(items)
|
|
|
|
# 定义一个字典来存储聚合后的数据
|
|
aggregated_data = {
|
|
"ip": [],
|
|
"account": [],
|
|
"interface": [],
|
|
"menu": []
|
|
}
|
|
|
|
# 遍历所有类别
|
|
for category in aggregated_data:
|
|
# 创建一个字典来存储每个类别的聚合数据
|
|
category_data = {}
|
|
|
|
# 如果当前类别存在于merged_data中
|
|
if category in merged_data:
|
|
for item in merged_data[category]:
|
|
# 确定非计数字段
|
|
keys_to_use = [k for k in item if k != 'count']
|
|
# 使用元组作为键,包含所有非计数字段
|
|
key_tuple = tuple(item[k] for k in keys_to_use)
|
|
|
|
if key_tuple not in category_data:
|
|
category_data[key_tuple] = item['count']
|
|
else:
|
|
category_data[key_tuple] += item['count']
|
|
|
|
# 将聚合后的数据转换回原始格式
|
|
aggregated_data[category] = [
|
|
dict(zip(keys_to_use, key_tuple) + [('count', count)])
|
|
for key_tuple, count in category_data.items()
|
|
]
|
|
|
|
return aggregated_data
|
|
|
|
#入口
|
|
def entry(start,end):
|
|
base_index ="bsa_traffic*"
|
|
es_util_instance = EsUtil()
|
|
res=es_util_instance.get_available_index_name(start,end,base_index)
|
|
if len(res)==0:
|
|
return
|
|
index =",".join(res)
|
|
clean_data(index,start,end) |