You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hbyd_ueba/utils/base_dataclean_pg.py

429 lines
15 KiB

12 months ago
#encoding=utf-8
import json
import codecs,os
12 months ago
import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
12 months ago
import codecs
12 months ago
from esUtil import EsUtil
10 months ago
from config import read_json_config
from file_helper import write_large_file,get_file_content,TRACE_PATH
10 months ago
from dashboard_data_conversion import find_region_by_code
12 months ago
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
11 months ago
from collections import defaultdict
10 months ago
from appsUtils import env
from ext_logging import logger_trace,APPFOLDERNAME
12 months ago
12 months ago
size = 9999#根据实际情况调整
12 months ago
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
## IP维度
10 months ago
def get_ip_group_data(index,startTime,endTime,diplist):
12 months ago
query_body={
12 months ago
"size": 0,
"query": {
12 months ago
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
10 months ago
{"terms": {"dip": diplist}}
12 months ago
]
}
12 months ago
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"sip": { "terms": {"field": "sip"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
12 months ago
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("IP"),
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"ip":bucket['key']['sip']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
12 months ago
return datas
## 账号维度
10 months ago
def get_account_group_data(index,startTime,endTime,diplist):
12 months ago
query_body={
"size": 0,
"query": {
12 months ago
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
10 months ago
{"terms": {"dip": diplist}}
12 months ago
]
}
12 months ago
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"account": { "terms": {"field": "account"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("ACCOUNT"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
10 months ago
def get_interface_group_data(index,startTime,endTime,diplist):
12 months ago
query_body={
"size": 0,
"query": {
12 months ago
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
10 months ago
{"terms": {"dip": diplist}}
12 months ago
]
}
12 months ago
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"service_name": { "terms": {"field": "service_name"} }},
12 months ago
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
12 months ago
{"trojan_type": { "terms": { "field": "trojan_type"}}}
12 months ago
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("INTERFACE"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"interface": bucket['key']['service_name'] ,
12 months ago
"ip":bucket['key']['sip']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
10 months ago
def get_menu_group_data(index,startTime,endTime,diplist):
12 months ago
query_body={
12 months ago
"size": 0,
"query": {
12 months ago
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
10 months ago
{"terms": {"dip": diplist}}
12 months ago
]
}
12 months ago
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
12 months ago
{"trojan_type": { "terms": { "field": "trojan_type"}}}
12 months ago
]
}
12 months ago
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("MENU"),
"account": bucket['key']['account'],
"count": bucket['doc_count'],
"jobnum": bucket['key']['trojan_type'] ,
"ip":bucket['key']['sip'],
"menu": bucket['key']['worm_family'],
12 months ago
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
12 months ago
def datetime_to_timestamp(dt):
12 months ago
dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
10 months ago
#获取扩展名称
def get_file_extension(filename):
base_name, extension = os.path.splitext(filename)
return extension
12 months ago
def clean_data(read_index,start,end,jobid):
10 months ago
APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME
config_path = os.path.normpath(APPHOME + "/conf/sys_config.json")
rule_data = read_json_config(config_path)
dips=rule_data["dip"]
10 months ago
static_ext = rule_data["static_ext"]
region_dict = rule_data["region_dict"]
10 months ago
logger_cron.info("JOB:dip "+json.dumps(dips))
data_ip = get_ip_group_data(read_index,start,end,dips)
data_account = get_account_group_data(read_index,start,end,dips)
data_interface = get_interface_group_data(read_index,start,end,dips)
data_menu = get_menu_group_data(read_index,start,end,dips)
12 months ago
if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")
return
logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据")
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
10 months ago
new_interface_date=[]
logger_cron.info("JOB:"+jobid+","+json.dumps(static_ext))
for item in data_interface:
if get_file_extension(item.get('interface', '')) in static_ext:
continue
new_interface_date.append(item)
logger_cron.info("JOB:"+jobid+",接口维度实际数据量 "+str(len(new_interface_date))+" 条数据")
12 months ago
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
10 months ago
group_and_write_to_file(data_ip, data_account, new_interface_date, data_menu, start,jobid,region_dict)
12 months ago
10 months ago
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid,region_dict):
# 获取当前工作目录
12 months ago
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
12 months ago
date_time = convert_utc_to_local_time(start)
11 months ago
#临时文件 临时文件格式:20240720-1630_tmp.json
tmp_file_name = time.strftime("%Y%m%d-%H%M_tmp.json", date_time)
11 months ago
tmp_file_path = os.path.join(base_path,tmp_file_name)
#正式文件 正式文件格式:20240720-1630.json
file_name = time.strftime("%Y%m%d-%H%M.json", date_time)
11 months ago
file_path = os.path.join(base_path,file_name)
logger_cron.info("JOB:"+jobid+", tmpfilepath"+tmp_file_path)
11 months ago
#(datatype,menu,ip,account,jobnum,interface) count
10 months ago
jobnum_max_length = 20
records = {}
for item in data_ip:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
10 months ago
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("IP",1)
interface = remove_commas(item.get('interface', ''))
10 months ago
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
#日志追踪
if not os.path.exists(TRACE_PATH):
write_large_file(TRACE_PATH, ",".join([str(datatype), menu, ip,account,jobnum,interface,company]))
for item in data_account:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
10 months ago
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = remove_commas(item.get('interface', ''))
10 months ago
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
for item in data_interface:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
10 months ago
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("INTERFACE",3)
interface = remove_commas(item.get('interface', ''))
10 months ago
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
for item in data_menu:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
9 months ago
if menu == "" or menu == "null" or len(jobnum) >jobnum_max_length:
10 months ago
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("MENU",4)
interface = remove_commas(item.get('interface', ''))
10 months ago
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
json_data = json.dumps(records)
10 months ago
########问题排查#################
key=get_file_content()
if key in records:
logger_trace.info("baseclean:"+jobid+file_path+":"+str(records[key]))
12 months ago
#写入文件
11 months ago
logger_cron.info("JOB: "+jobid+",准备写入文件")
write_large_file(tmp_file_path,json_data)
11 months ago
#重命名文件
os.rename(tmp_file_path, file_path)
12 months ago
logger_cron.info("JOB: "+jobid+",写入文件完成")
11 months ago
#原始数据去掉逗号
def remove_commas(record):
return ''.join(c for c in record if c != ',')
def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表
by_fields_list = by_fields.split(',')
12 months ago
aggregated_data = {}
# 遍历数据,进行聚合
for record in data:
# 确保所有字段都存在于记录中
if all(field in record for field in by_fields_list):
# 构建聚合键
key = tuple(record[field] for field in by_fields_list)
# 如果键不存在,则初始化计数器
if key not in aggregated_data:
aggregated_data[key] = 0
# 累加count值
aggregated_data[key] += record['count']
# 将结果转换为期望的输出格式
output = [
dict({field: value for (field, value) in zip(by_fields_list, key)}, count=count)
for key, count in aggregated_data.items()
]
return output
12 months ago
def convert_utc_to_local_time(timestamp):
# 将毫秒时间戳转换为秒
seconds_since_epoch = long(timestamp) / 1000.0
# 使用 gmtime() 得到 UTC 时间结构体
time_struct_utc = time.gmtime(seconds_since_epoch)
# 将 UTC 时间转换为北京时间(东八区),即加 8 小时
time_struct_beijing = time.mktime(time_struct_utc) + (8 * 60 * 60)
# 将时间戳转换回 struct_time
time_struct_beijing = time.localtime(time_struct_beijing)
return time_struct_beijing
12 months ago
#入口
12 months ago
def entry(start,end,jobid):
12 months ago
base_index ="bsa_traffic*"
es_util_instance = EsUtil()
12 months ago
start = datetime_to_timestamp(start)
end = datetime_to_timestamp(end)
12 months ago
logger_cron.info("JOB:"+jobid+",start为"+str(start))
logger_cron.info("JOB:"+jobid+",end为"+str(end))
12 months ago
res=es_util_instance.get_available_index_name(start,end,base_index)
12 months ago
logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res))
12 months ago
if len(res)==0:
return
index =",".join(res)
12 months ago
clean_data(index,start,end,jobid)
12 months ago
# start = '2024-07-18 15:20:31'
# end = '2024-07-18 15:25:31'
# date_format = "%Y-%m-%d %H:%M:%S"
# date_str = datetime.strptime(start, date_format)
# end_str = datetime.strptime(end, date_format)
12 months ago
# # # 将 datetime 对象转换为秒级时间戳
# # timestamp_seconds = time.mktime(dt.timetuple())
# # # 获取微秒数
# # microseconds = dt.microsecond
# # # 转换为毫秒级时间戳
# # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
12 months ago
# entry(date_str,end_str,"xxxxxxxxxxxxxxxxxxx")