You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
292 lines
9.9 KiB
292 lines
9.9 KiB
5 months ago
|
#encoding=utf-8
|
||
|
import json
|
||
|
import time,datetime
|
||
|
import traceback
|
||
|
from datetime import datetime, timedelta
|
||
|
import calendar
|
||
|
from esUtil import EsUtil
|
||
|
import pytz
|
||
|
|
||
|
size = 1000# 可以根据实际情况调整
|
||
|
##01 创建索引
|
||
|
def createIndex(index):
|
||
|
map={
|
||
|
"data_type":"keyword",
|
||
|
"req_account":"keyword",
|
||
|
"req_frequency":"integer",
|
||
|
"req_jobnum":"keyword",
|
||
|
"interface_addr":"keyword",
|
||
|
"req_ip":"ip",
|
||
|
"menu_name":"keyword",
|
||
|
"date_time":"date"
|
||
|
}
|
||
|
es_util_instance = EsUtil()
|
||
|
reqs = es_util_instance.is_index_exist(index)
|
||
|
if reqs =="false":
|
||
|
try:
|
||
|
res = es_util_instance.create_index_simple(index,map)
|
||
|
except Exception,e:
|
||
|
print e.message
|
||
|
## IP维度
|
||
|
def get_ip_group_data(index,startTime,endTime):
|
||
|
try:
|
||
|
query_body={
|
||
|
"size": 0,
|
||
|
"query": {
|
||
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
||
|
},
|
||
|
"aggs": {
|
||
|
"composite_buckets": {
|
||
|
"composite": {
|
||
|
"size": size,
|
||
|
"sources": [
|
||
|
{"sip": { "terms": {"field": "sip"} }},
|
||
|
{"trojan_type": { "terms": { "field": "trojan_type"}}}
|
||
|
]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
after_key = None
|
||
|
es_util_instance = EsUtil()
|
||
|
datas=[]
|
||
|
while True:
|
||
|
if after_key:
|
||
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
||
|
try:
|
||
|
response = es_util_instance.search(index,query_body)
|
||
|
except Exception,e:
|
||
|
print "err"
|
||
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
||
|
data = {
|
||
|
"data_type": "ip",
|
||
|
"req_account": "",
|
||
|
"req_frequency": bucket['doc_count'],
|
||
|
"req_jobnum": bucket['key']['trojan_type'] ,
|
||
|
"interface_addr": "",
|
||
|
"req_ip":bucket['key']['sip'] ,
|
||
|
"menu_name": "",
|
||
|
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
|
||
|
}
|
||
|
datas.append(data)
|
||
|
after_key = bucket["key"]
|
||
|
|
||
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
||
|
break
|
||
|
|
||
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
||
|
except Exception,e:
|
||
|
print "x_err:"+e.message
|
||
|
return datas
|
||
|
|
||
|
|
||
|
## 账号维度
|
||
|
def get_account_group_data(index,startTime,endTime):
|
||
|
query_body={
|
||
|
"size": 0,
|
||
|
"query": {
|
||
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
||
|
},
|
||
|
"aggs": {
|
||
|
"composite_buckets": {
|
||
|
"composite": {
|
||
|
"size": size,
|
||
|
"sources": [
|
||
|
{"account": { "terms": {"field": "account"} }},
|
||
|
{"trojan_type": { "terms": { "field": "trojan_type"}}}
|
||
|
]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
after_key = None
|
||
|
es_util_instance = EsUtil()
|
||
|
datas=[]
|
||
|
while True:
|
||
|
if after_key:
|
||
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
||
|
response = es_util_instance.search(index,query_body)
|
||
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
||
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
||
|
data = {
|
||
|
"data_type": "account",
|
||
|
"req_account": bucket['key']['account'],
|
||
|
"req_frequency": bucket['doc_count'],
|
||
|
"req_jobnum": bucket['key']['trojan_type'] ,
|
||
|
"interface_addr": "",
|
||
|
"req_ip":"0.0.0.0" ,
|
||
|
"menu_name": "",
|
||
|
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
|
||
|
}
|
||
|
datas.append(data)
|
||
|
after_key = bucket["key"]
|
||
|
|
||
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
||
|
break
|
||
|
|
||
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
||
|
|
||
|
return datas
|
||
|
|
||
|
## 接口维度
|
||
|
def get_interface_group_data(index,startTime,endTime):
|
||
|
query_body={
|
||
|
"size": 0,
|
||
|
"query": {
|
||
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
||
|
},
|
||
|
"aggs": {
|
||
|
"composite_buckets": {
|
||
|
"composite": {
|
||
|
"size": size,
|
||
|
"sources": [
|
||
|
{"interface": { "terms": {"field": "interface"} }},
|
||
|
{"sip": { "terms": { "field": "sip"}}},
|
||
|
{"account": { "terms": { "field": "account"}}},
|
||
|
{"trojan_type": { "terms": { "field": "trojan_type"}}},
|
||
|
]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
after_key = None
|
||
|
es_util_instance = EsUtil()
|
||
|
datas=[]
|
||
|
while True:
|
||
|
if after_key:
|
||
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
||
|
response = es_util_instance.search(index,query_body)
|
||
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
||
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
||
|
data = {
|
||
|
"data_type": "interface",
|
||
|
"req_account": bucket['key']['account'],
|
||
|
"req_frequency": bucket['doc_count'],
|
||
|
"req_jobnum": bucket['key']['trojan_type'] ,
|
||
|
"interface_addr": bucket['key']['interface'] ,
|
||
|
"req_ip":bucket['key']['sip'],
|
||
|
"menu_name": "",
|
||
|
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
|
||
|
}
|
||
|
datas.append(data)
|
||
|
after_key = bucket["key"]
|
||
|
|
||
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
||
|
break
|
||
|
|
||
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
||
|
|
||
|
return datas
|
||
|
|
||
|
|
||
|
|
||
|
## 菜单维度
|
||
|
def get_menu_group_data(index,startTime,endTime):
|
||
|
query_body={
|
||
|
"size": 0,
|
||
|
"query": {
|
||
|
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
|
||
|
},
|
||
|
"aggs": {
|
||
|
"composite_buckets": {
|
||
|
"composite": {
|
||
|
"size": size,
|
||
|
"sources": [
|
||
|
{"worm_family": { "terms": {"field": "worm_family"} }},
|
||
|
{"sip": { "terms": { "field": "sip"}}},
|
||
|
{"account": { "terms": { "field": "account"}}},
|
||
|
{"trojan_type": { "terms": { "field": "trojan_type"}}},
|
||
|
]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
after_key = None
|
||
|
es_util_instance = EsUtil()
|
||
|
datas=[]
|
||
|
while True:
|
||
|
if after_key:
|
||
|
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
|
||
|
response = es_util_instance.search(index,query_body)
|
||
|
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
|
||
|
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
|
||
|
data = {
|
||
|
"data_type": "menu",
|
||
|
"req_account": bucket['key']['account'],
|
||
|
"req_frequency": bucket['doc_count'],
|
||
|
"req_jobnum": bucket['key']['trojan_type'] ,
|
||
|
"interface_addr": "" ,
|
||
|
"req_ip":bucket['key']['sip'],
|
||
|
"menu_name": bucket['key']['worm_family'],
|
||
|
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
|
||
|
}
|
||
|
datas.append(data)
|
||
|
after_key = bucket["key"]
|
||
|
if not response["aggregations"]["composite_buckets"].get("after_key"):
|
||
|
break
|
||
|
after_key = response["aggregations"]["composite_buckets"]["after_key"]
|
||
|
|
||
|
return datas
|
||
|
|
||
|
##03 数据写入
|
||
|
def data_insert(index,data):
|
||
|
es_util_instance = EsUtil()
|
||
|
response = es_util_instance.bulk_insert(index,data)
|
||
|
return response
|
||
|
|
||
|
def clean_data(write_index,read_index,start,end):
|
||
|
data_ip = get_ip_group_data(read_index,start,end)
|
||
|
print "data_ip:"+str(len(data_ip))
|
||
|
data_account = get_account_group_data(read_index,start,end)
|
||
|
print "data_ip:"+str(len(data_account))
|
||
|
data_interface = get_interface_group_data(read_index,start,end)
|
||
|
print "data_ip:"+str(len(data_interface))
|
||
|
data_menu = get_menu_group_data(read_index,start,end)
|
||
|
print "data_ip:"+str(len(data_menu))
|
||
|
res_data = data_ip+data_account+data_interface+data_menu
|
||
|
response = data_insert(write_index,res_data)
|
||
|
print json.dumps(response)
|
||
|
|
||
|
#入口
|
||
|
def entry(write_index,read_index,start,end):
|
||
|
createIndex(write_index)
|
||
|
clean_data(write_index,read_index,start,end)
|
||
|
|
||
|
|
||
|
#前一天的0点0分0秒
|
||
|
def get_start_end_time(hour,minute,second):
|
||
|
# 获取当前日期时间
|
||
|
now = datetime.now()
|
||
|
|
||
|
# 计算昨天的日期时间
|
||
|
yesterday = now - timedelta(days=1)
|
||
|
|
||
|
# 将时间部分设为 00:00:00
|
||
|
yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0)
|
||
|
|
||
|
# 使用 pytz 来获取 UTC 时区对象
|
||
|
utc = pytz.utc
|
||
|
# 将时间对象本地化为 UTC 时区
|
||
|
yesterday_midnight_utc = utc.localize(yesterday_midnight)
|
||
|
|
||
|
# 格式化为带时区的字符串(ISO 8601格式)
|
||
|
formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
return formatted_date
|
||
|
|
||
|
def index():
|
||
|
try:
|
||
|
#写入的索引 按月创建,注意跨天的场景
|
||
|
write_index= "b_ueba_2024_07"
|
||
|
read_index ="bsa_traffic*"
|
||
|
#任务执行时间是每天 凌晨12点
|
||
|
#查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒
|
||
|
|
||
|
start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0)
|
||
|
end = get_start_end_time(23,59,59)
|
||
|
print start +":"+ end
|
||
|
entry(write_index,read_index,start,end)
|
||
|
except Exception ,e:
|
||
|
print "定时任务执行失败:"+traceback.format_exc()
|
||
|
# logger.error("定时任务执行失败:".format(str(e), traceback.format_exc()))
|
||
|
|
||
|
index()
|