#encoding=utf-8 import json import time,datetime import traceback from datetime import datetime, timedelta import calendar from esUtil import EsUtil import pytz size = 1000# 可以根据实际情况调整 ##01 创建索引 def createIndex(index): map={ "data_type":"keyword", "req_account":"keyword", "req_frequency":"integer", "req_jobnum":"keyword", "interface_addr":"keyword", "req_ip":"ip", "menu_name":"keyword", "date_time":"date" } es_util_instance = EsUtil() reqs = es_util_instance.is_index_exist(index) if reqs =="false": try: res = es_util_instance.create_index_simple(index,map) except Exception,e: print e.message ## IP维度 def get_ip_group_data(index,startTime,endTime): try: query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"sip": { "terms": {"field": "sip"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key try: response = es_util_instance.search(index,query_body) except Exception,e: print "err" for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": "ip", "req_account": "", "req_frequency": bucket['doc_count'], "req_jobnum": bucket['key']['trojan_type'] , "interface_addr": "", "req_ip":bucket['key']['sip'] , "menu_name": "", "date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] except Exception,e: print "x_err:"+e.message return datas ## 账号维度 def get_account_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"account": { "terms": {"field": "account"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": "account", "req_account": bucket['key']['account'], "req_frequency": bucket['doc_count'], "req_jobnum": bucket['key']['trojan_type'] , "interface_addr": "", "req_ip":"0.0.0.0" , "menu_name": "", "date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 接口维度 def get_interface_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"interface": { "terms": {"field": "interface"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}, ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": "interface", "req_account": bucket['key']['account'], "req_frequency": bucket['doc_count'], "req_jobnum": bucket['key']['trojan_type'] , "interface_addr": bucket['key']['interface'] , "req_ip":bucket['key']['sip'], "menu_name": "", "date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 菜单维度 def get_menu_group_data(index,startTime,endTime): query_body={ "size": 0, "query": { "range": {"timestamp": {"gte": startTime,"lte": endTime}} }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"worm_family": { "terms": {"field": "worm_family"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}, ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: #print(bucket['key']['sip'] + ":" + str(bucket['doc_count'])) data = { "data_type": "menu", "req_account": bucket['key']['account'], "req_frequency": bucket['doc_count'], "req_jobnum": bucket['key']['trojan_type'] , "interface_addr": "" , "req_ip":bucket['key']['sip'], "menu_name": bucket['key']['worm_family'], "date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化 } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ##03 数据写入 def data_insert(index,data): es_util_instance = EsUtil() response = es_util_instance.bulk_insert(index,data) return response def clean_data(write_index,read_index,start,end): data_ip = get_ip_group_data(read_index,start,end) print "data_ip:"+str(len(data_ip)) data_account = get_account_group_data(read_index,start,end) print "data_ip:"+str(len(data_account)) data_interface = get_interface_group_data(read_index,start,end) print "data_ip:"+str(len(data_interface)) data_menu = get_menu_group_data(read_index,start,end) print "data_ip:"+str(len(data_menu)) res_data = data_ip+data_account+data_interface+data_menu response = data_insert(write_index,res_data) print json.dumps(response) #入口 def entry(write_index,read_index,start,end): createIndex(write_index) clean_data(write_index,read_index,start,end) #前一天的0点0分0秒 def get_start_end_time(hour,minute,second): # 获取当前日期时间 now = datetime.now() # 计算昨天的日期时间 yesterday = now - timedelta(days=1) # 将时间部分设为 00:00:00 yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0) # 使用 pytz 来获取 UTC 时区对象 utc = pytz.utc # 将时间对象本地化为 UTC 时区 yesterday_midnight_utc = utc.localize(yesterday_midnight) # 格式化为带时区的字符串(ISO 8601格式) formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ") return formatted_date def index(): try: #写入的索引 按月创建,注意跨天的场景 write_index= "b_ueba_2024_07" read_index ="bsa_traffic*" #任务执行时间是每天 凌晨12点 #查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒 start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0) end = get_start_end_time(23,59,59) print start +":"+ end entry(write_index,read_index,start,end) except Exception ,e: print "定时任务执行失败:"+traceback.format_exc() # logger.error("定时任务执行失败:".format(str(e), traceback.format_exc())) index()