#encoding=utf-8 import json import codecs,os import time,datetime import traceback from datetime import datetime, timedelta import calendar import codecs from esUtil import EsUtil from config import read_json_config from file_helper import write_large_file,get_file_content,TRACE_PATH from dashboard_data_conversion import find_region_by_code from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path from collections import defaultdict from appsUtils import env from ext_logging import logger_trace,APPFOLDERNAME size = 9999#根据实际情况调整 DATA_TYPE = { "IP": 1, "ACCOUNT": 2, "INTERFACE": 3, "MENU": 4, } ## IP维度 def get_ip_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"terms": {"dip": diplist}} ] } }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"sip": { "terms": {"field": "sip"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": DATA_TYPE.get("IP"), "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "ip":bucket['key']['sip'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 账号维度 def get_account_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"terms": {"dip": diplist}} ] } }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"account": { "terms": {"field": "account"} }}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": DATA_TYPE.get("ACCOUNT"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 接口维度 def get_interface_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"terms": {"dip": diplist}} ] } }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"service_name": { "terms": {"field": "service_name"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": DATA_TYPE.get("INTERFACE"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "interface": bucket['key']['service_name'] , "ip":bucket['key']['sip'] } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas ## 菜单维度 def get_menu_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"terms": {"dip": diplist}} ] } }, "aggs": { "composite_buckets": { "composite": { "size": size, "sources": [ {"worm_family": { "terms": {"field": "worm_family"} }}, {"sip": { "terms": { "field": "sip"}}}, {"account": { "terms": { "field": "account"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}} ] } } } } after_key = None es_util_instance = EsUtil() datas=[] while True: if after_key: query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]: data = { "data_type": DATA_TYPE.get("MENU"), "account": bucket['key']['account'], "count": bucket['doc_count'], "jobnum": bucket['key']['trojan_type'] , "ip":bucket['key']['sip'], "menu": bucket['key']['worm_family'], } datas.append(data) after_key = bucket["key"] if not response["aggregations"]["composite_buckets"].get("after_key"): break after_key = response["aggregations"]["composite_buckets"]["after_key"] return datas def datetime_to_timestamp(dt): dtstr=dt.strftime("%Y-%m-%d %H:%M:%S") return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000) #获取扩展名称 def get_file_extension(filename): base_name, extension = os.path.splitext(filename) return extension def clean_data(read_index,start,end,jobid): APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME config_path = os.path.normpath(APPHOME + "/conf/sys_config.json") rule_data = read_json_config(config_path) dips=rule_data["dip"] static_ext = rule_data["static_ext"] region_dict = rule_data["region_dict"] logger_cron.info("JOB:dip "+json.dumps(dips)) data_ip = get_ip_group_data(read_index,start,end,dips) data_account = get_account_group_data(read_index,start,end,dips) data_interface = get_interface_group_data(read_index,start,end,dips) data_menu = get_menu_group_data(read_index,start,end,dips) if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0: logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并") return logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据") logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据") logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据") logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据") new_interface_date=[] logger_cron.info("JOB:"+jobid+","+json.dumps(static_ext)) for item in data_interface: if get_file_extension(item.get('interface', '')) in static_ext: continue new_interface_date.append(item) logger_cron.info("JOB:"+jobid+",接口维度实际数据量 "+str(len(new_interface_date))+" 条数据") #todo 读取上一次5分钟的文件,与这5分钟的文件合并 #合并完成后 写文件 group_and_write_to_file(data_ip, data_account, new_interface_date, data_menu, start,jobid,region_dict) def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid,region_dict): # 获取当前工作目录 base_path = get_clean_file_path() logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path) date_time = convert_utc_to_local_time(start) #临时文件 临时文件格式:20240720-1630_tmp.json tmp_file_name = time.strftime("%Y%m%d-%H%M_tmp.json", date_time) tmp_file_path = os.path.join(base_path,tmp_file_name) #正式文件 正式文件格式:20240720-1630.json file_name = time.strftime("%Y%m%d-%H%M.json", date_time) file_path = os.path.join(base_path,file_name) logger_cron.info("JOB:"+jobid+", tmpfilepath"+tmp_file_path) #(datatype,menu,ip,account,jobnum,interface) count jobnum_max_length = 20 records = {} for item in data_ip: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') if jobnum == "" or len(jobnum) >jobnum_max_length: continue count = item.get('count', 0) datatype = DATA_TYPE.get("IP",1) interface = remove_commas(item.get('interface', '')) company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count #日志追踪 if not os.path.exists(TRACE_PATH): write_large_file(TRACE_PATH, ",".join([str(datatype), menu, ip,account,jobnum,interface,company])) for item in data_account: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') if jobnum == "" or len(jobnum) >jobnum_max_length: continue count = item.get('count', 0) datatype = DATA_TYPE.get("ACCOUNT",2) interface = remove_commas(item.get('interface', '')) company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count for item in data_interface: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') if jobnum == "" or len(jobnum) >jobnum_max_length: continue count = item.get('count', 0) datatype = DATA_TYPE.get("INTERFACE",3) interface = remove_commas(item.get('interface', '')) company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count for item in data_menu: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') if menu == "" or menu == "null" or len(jobnum) >jobnum_max_length: continue count = item.get('count', 0) datatype = DATA_TYPE.get("MENU",4) interface = remove_commas(item.get('interface', '')) company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count json_data = json.dumps(records) ########问题排查################# key=get_file_content() if key in records: logger_trace.info("baseclean:"+jobid+file_path+":"+str(records[key])) #写入文件 logger_cron.info("JOB: "+jobid+",准备写入文件") write_large_file(tmp_file_path,json_data) #重命名文件 os.rename(tmp_file_path, file_path) logger_cron.info("JOB: "+jobid+",写入文件完成") #原始数据去掉逗号 def remove_commas(record): return ''.join(c for c in record if c != ',') def group_and_sum(data, by_fields="ip,jobnum"): # 将by_fields转换为列表 by_fields_list = by_fields.split(',') aggregated_data = {} # 遍历数据,进行聚合 for record in data: # 确保所有字段都存在于记录中 if all(field in record for field in by_fields_list): # 构建聚合键 key = tuple(record[field] for field in by_fields_list) # 如果键不存在,则初始化计数器 if key not in aggregated_data: aggregated_data[key] = 0 # 累加count值 aggregated_data[key] += record['count'] # 将结果转换为期望的输出格式 output = [ dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count) for key, count in aggregated_data.items() ] return output def convert_utc_to_local_time(timestamp): # 将毫秒时间戳转换为秒 seconds_since_epoch = long(timestamp) / 1000.0 # 使用 gmtime() 得到 UTC 时间结构体 time_struct_utc = time.gmtime(seconds_since_epoch) # 将 UTC 时间转换为北京时间(东八区),即加 8 小时 time_struct_beijing = time.mktime(time_struct_utc) + (8 * 60 * 60) # 将时间戳转换回 struct_time time_struct_beijing = time.localtime(time_struct_beijing) return time_struct_beijing #入口 def entry(start,end,jobid): base_index ="bsa_traffic*" es_util_instance = EsUtil() start = datetime_to_timestamp(start) end = datetime_to_timestamp(end) logger_cron.info("JOB:"+jobid+",start为"+str(start)) logger_cron.info("JOB:"+jobid+",end为"+str(end)) res=es_util_instance.get_available_index_name(start,end,base_index) logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res)) if len(res)==0: return index =",".join(res) clean_data(index,start,end,jobid) # start = '2024-07-18 15:20:31' # end = '2024-07-18 15:25:31' # date_format = "%Y-%m-%d %H:%M:%S" # date_str = datetime.strptime(start, date_format) # end_str = datetime.strptime(end, date_format) # # # 将 datetime 对象转换为秒级时间戳 # # timestamp_seconds = time.mktime(dt.timetuple()) # # # 获取微秒数 # # microseconds = dt.microsecond # # # 转换为毫秒级时间戳 # # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0) # entry(date_str,end_str,"xxxxxxxxxxxxxxxxxxx")