You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hbyd_ueba/utils/base_dataclean.py

292 lines
9.9 KiB

#encoding=utf-8
import json
import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
from esUtil import EsUtil
import pytz
size = 1000# 可以根据实际情况调整
##01 创建索引
def createIndex(index):
map={
"data_type":"keyword",
"req_account":"keyword",
"req_frequency":"integer",
"req_jobnum":"keyword",
"interface_addr":"keyword",
"req_ip":"ip",
"menu_name":"keyword",
"date_time":"date"
}
es_util_instance = EsUtil()
reqs = es_util_instance.is_index_exist(index)
if reqs =="false":
try:
res = es_util_instance.create_index_simple(index,map)
except Exception,e:
print e.message
## IP维度
def get_ip_group_data(index,startTime,endTime):
try:
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"sip": { "terms": {"field": "sip"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try:
response = es_util_instance.search(index,query_body)
except Exception,e:
print "err"
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": "ip",
"req_account": "",
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":bucket['key']['sip'] ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas
## 账号维度
def get_account_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"account": { "terms": {"field": "account"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "account",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":"0.0.0.0" ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def get_interface_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "interface",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": bucket['key']['interface'] ,
"req_ip":bucket['key']['sip'],
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def get_menu_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "menu",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "" ,
"req_ip":bucket['key']['sip'],
"menu_name": bucket['key']['worm_family'],
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
##03 数据写入
def data_insert(index,data):
es_util_instance = EsUtil()
response = es_util_instance.bulk_insert(index,data)
return response
def clean_data(write_index,read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end)
print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end)
print "data_ip:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end)
print "data_ip:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end)
print "data_ip:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
response = data_insert(write_index,res_data)
print json.dumps(response)
#入口
def entry(write_index,read_index,start,end):
createIndex(write_index)
clean_data(write_index,read_index,start,end)
#前一天的0点0分0秒
def get_start_end_time(hour,minute,second):
# 获取当前日期时间
now = datetime.now()
# 计算昨天的日期时间
yesterday = now - timedelta(days=1)
# 将时间部分设为 00:00:00
yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0)
# 使用 pytz 来获取 UTC 时区对象
utc = pytz.utc
# 将时间对象本地化为 UTC 时区
yesterday_midnight_utc = utc.localize(yesterday_midnight)
# 格式化为带时区的字符串(ISO 8601格式)
formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_date
def index():
try:
#写入的索引 按月创建,注意跨天的场景
write_index= "b_ueba_2024_07"
read_index ="bsa_traffic*"
#任务执行时间是每天 凌晨12点
#查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒
start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0)
end = get_start_end_time(23,59,59)
print start +":"+ end
entry(write_index,read_index,start,end)
except Exception ,e:
print "定时任务执行失败:"+traceback.format_exc()
# logger.error("定时任务执行失败:".format(str(e), traceback.format_exc()))
index()