#encoding=utf-8 import json import time,datetime import traceback from datetime import datetime, timedelta import calendar from esUtil import EsUtil import pytz size = 1000# 可以根据实际情况调整 DATA_TYPE = { "IP": 1, "ACCOUNT": 2, "INTERFACE": 3, "MENU": 4, } ## IP维度 def get_ip_group_data(index,startTime,endTime): try: query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"sip": { "terms": {"field": "sip"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key try: response = es_util_instance.search(index,query_body) except Exception,e: print "err" for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": DATA_TYPE.get("IP"), "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "ip":bucket['key']['sip'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] except Exception,e: print "x_err:"+e.message return datas ## 账号维度 def get_account_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"account": { "terms": {"field": "account"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": DATA_TYPE.get("ACCOUNT"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 接口维度 def get_interface_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"interface": { "terms": {"field": "interface"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}, ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": DATA_TYPE.get("INTERFACE"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "interface": bucket['key']['interface'] , "ip":bucket['key']['sip'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 菜单维度 def get_menu_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"worm_family": { "terms": {"field": "worm_family"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}, ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": DATA_TYPE.get("MENU"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "ip":bucket['key']['sip'], "menu_name": bucket['key']['worm_family'], } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas def clean_data(read_index,start,end): data_ip = get_ip_group_data(read_index,start,end) print "data_ip:"+str(len(data_ip)) data_account = get_account_group_data(read_index,start,end) print "data_ip:"+str(len(data_account)) data_interface = get_interface_group_data(read_index,start,end) print "data_ip:"+str(len(data_interface)) data_menu = get_menu_group_data(read_index,start,end) print "data_ip:"+str(len(data_menu)) res_data = data_ip+data_account+data_interface+data_menu #todo 读取上一次5分钟的文件,与这5分钟的文件合并 #合并完成后 写文件 #入口 def entry(start,end): base_index ="bsa_traffic*" es_util_instance = EsUtil() res=es_util_instance.get_available_index_name(start,end,base_index) if len(res)==0: return index =",".join(res) clean_data(index,start,end)