Compare commits

..

2 Commits

  1. 78
      corn/ueba_corn_data_insert.py
  2. 46
      corn/ueba_corn_pg.py
  3. 26
      utils/base_dataclean_pg.py
  4. 11
      utils/dashboard_data_conversion.py
  5. 107
      utils/dashboard_data_pg.py
  6. 55
      utils/db2json.py
  7. 7
      utils/ext_logging.py
  8. 12
      views/dashboard_views.py

@ -0,0 +1,78 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id():
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
#每5分钟执行一次
def processing(self):
try:
logger.info("job:开始执行")
start,end=self.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
logger.info("job:"+"准备获取es数据")
#entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,20 +1,20 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
@Author: tangwy
@FileName: user_cron_pg.py
@DateTime: 2024/7/09 14:19
@Description: 定时清洗es数据
"""
from __future__ import unicode_literals
import collections
import random,string
import time,datetime
import traceback,json
import time
from datetime import datetime,timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={
"RUNNING":1,
@ -24,48 +24,32 @@ JOB_STATUS ={
class UserCron:
def generate_job_id():
def generate_job_id(self):
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_clean_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
return start_time,end_time
#每5分钟执行一次
def processing(self):
job_id =self.generate_job_id()
try:
logger.info("job:开始执行")
start,end=self.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
start,end= DBUtils.get_job_period()
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
logger.info("job:"+"准备获取es数据")
entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
logger.error(err_info)
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':

@ -203,6 +203,8 @@ def get_menu_group_data(index,startTime,endTime):
return datas
def datetime_to_timestamp(dt):
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end)
# print "data_ip:"+str(len(data_ip))
@ -214,11 +216,11 @@ def clean_data(read_index,start,end):
# print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
print ("resdata:"+json.dumps(res_data))
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr)
@ -265,8 +267,6 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
# 关闭文件
f.close()
def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表
by_fields_list = by_fields.split(',')
@ -345,12 +345,30 @@ def merge_data(datasets):
return aggregated_data
#入口
def entry(start,end):
base_index ="bsa_traffic*"
es_util_instance = EsUtil()
# start = datetime_to_timestamp(start)
# end = datetime_to_timestamp(end)
res=es_util_instance.get_available_index_name(start,end,base_index)
print "xxxx:"+str(len(res))
if len(res)==0:
return
index =",".join(res)
clean_data(index,start,end)
start = 1720772586000
end = 1720776186000
# # 将 datetime 对象转换为秒级时间戳
# timestamp_seconds = time.mktime(dt.timetuple())
# # 获取微秒数
# microseconds = dt.microsecond
# # 转换为毫秒级时间戳
# timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
entry(start,end)

@ -1,6 +1,7 @@
# coding=utf-8
from __future__ import division
import json
from datetime import timedelta,datetime
from collections import defaultdict
jobnum_region_dict = {
@ -268,3 +269,13 @@ def menu_summary_data_format(menu_summary_data):
return result
#调整时间
def adjust_times(start_time, end_time):
# 计算开始时间和结束时间之间的天数差
delta_days = (end_time - start_time).days
# 从开始和结束时间各自减去这个天数差
adjusted_start_time = start_time - timedelta(days=delta_days)
adjusted_end_time = end_time - timedelta(days=delta_days)
return adjusted_start_time, adjusted_end_time

@ -6,11 +6,10 @@ import json
import os, re
import codecs
import traceback
from datetime import datetime, timedelta
from collections import defaultdict
from isoc.utils.esUtil import EsUtil
from isoc.utils.dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
interface_summary_data_format, menu_summary_data_format, calculate_time_difference, summary_data_reqs_format
from dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
@ -34,15 +33,15 @@ def pg_get_ip_group_data(startTime, endTime):
"""
result = []
sql = """ select ip, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
where logdate >= %s and logdate <= %s and data_type = %s
group by ip, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"])))
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"]))))
if res:
for item in res:
result.append({
"ip": item[0],
"jobnum": item[2],
"count": item[3],
"jobnum": item[1],
"count": item[2],
})
return result
@ -55,15 +54,17 @@ def pg_get_account_group_data(startTime, endTime):
"""
result = []
sql = """ select account, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
where logdate >= %s and logdate <= %s and data_type = %s
group by account, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"])))
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"]))))
if res:
for item in res:
print("items:"+json.dumps(item))
print("pg_get_account_group_data维度data:"+json.dumps(item[0]))
result.append({
"account": item[0],
"jobnum": item[2],
"count": item[3],
"jobnum": item[1],
"count": item[2],
})
return result
@ -75,17 +76,18 @@ def pg_get_interface_group_data(startTime, endTime):
:param endTime: 结束时间,
"""
result = []
sql = """ select interface, sip, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
group by interface, ip, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"])))
sql = """ select interface, ip, jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by interface, ip, jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res:
for item in res:
result.append({
"interface": item[0],
"ip": item[1],
"jobnum": item[2],
"count": item[3],
"account": item[3],
"count": item[4],
})
return result
@ -97,17 +99,18 @@ def pg_get_menu_group_data(startTime, endTime):
:param endTime: 结束时间,
"""
result = []
sql = """ select menu, sip, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
group by menu, ip, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"])))
sql = """ select menu, ip,jobnum,account,sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by menu, ip, jobnum ,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res:
for item in res:
result.append({
"menu": item[0],
"ip": item[1],
"jobnum": item[2],
"count": item[3],
"account": item[3],
"count": item[4],
})
return result
@ -123,16 +126,15 @@ def pg_get_previous_company_count(startTime, endTime, data_type):
if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type]
sql = """ select jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type)))
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res:
for item in res:
company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[1]
return result
def pg_get_previous_interface_count(startTime, endTime):
"""
接口维度查询请求总次数
@ -141,9 +143,9 @@ def pg_get_previous_interface_count(startTime, endTime):
"""
result = defaultdict(int)
sql = """ select interface, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
where logdate >= %s and logdate <= %s and data_type = %s
group by interface""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"])))
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res:
for item in res:
result[item[0]] += item[1]
@ -158,27 +160,31 @@ def pg_get_previous_menu_count(startTime, endTime):
"""
result = defaultdict(int)
sql = """ select menu, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s
where logdate >= %s and logdate <= %s and data_type = %s
group by menu""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"])))
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res:
for item in res:
result[item[0]] += item[1]
return result
def entry(data_type, start, end):
# 前一段开始时间
previous_time = calculate_time_difference(start, end)
date_format = "%Y-%m-%d %H:%M:%S"
start = datetime.strptime(start, date_format)
end = datetime.strptime(end, date_format)
old_start,old_end = adjust_times(start, end)
try:
data = {}
if data_type == "1":
ip_summary_data = pg_get_ip_group_data(start, end)
data = ip_summary_data_format(ip_summary_data)
previous_company_dict = pg_get_previous_company_count(previous_time, start, "IP")
for d in data["summary"]["account"]:
previous_company_dict = pg_get_previous_company_count(old_start, start, "IP")
for d in data["summary"]["ip"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4)
@ -187,8 +193,11 @@ def entry(data_type, start, end):
account_summary_data = pg_get_account_group_data(start, end)
data = account_summary_data_format(account_summary_data)
previous_company_dict = pg_get_previous_company_count(previous_time, start, "ACCOUNT")
previous_company_dict = pg_get_previous_company_count(old_start, start, "ACCOUNT")
for d in data["summary"]["account"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4)
@ -197,22 +206,26 @@ def entry(data_type, start, end):
interface_summary_data = pg_get_interface_group_data(start, end)
data = interface_summary_data_format(interface_summary_data)
previous_interface_dict = pg_get_previous_interface_count(previous_time, start)
for d in data["summary"]["account"]:
previous_interface_dict = pg_get_previous_interface_count(old_start, start)
for d in data["summary"]["interface"]:
if previous_interface_dict.get(d["interface_addr"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round(
(d["req_frequency"] - previous_interface_dict.get(d["company"], 0)) / previous_interface_dict.get(
d["company"], 0), 4)
(d["req_frequency"] - previous_interface_dict.get(d["interface_addr"], 0)) / previous_interface_dict.get(
d["interface_addr"], 0), 4)
if data_type == "4":
menu_summary_data = pg_get_menu_group_data(start, end)
data = menu_summary_data_format(menu_summary_data)
previous_menu_dict = pg_get_previous_menu_count(previous_time, start)
for d in data["summary"]["account"]:
previous_menu_dict = pg_get_previous_menu_count(old_start, start)
for d in data["summary"]["menu"]:
if previous_menu_dict.get(d["menu_name"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round(
(d["req_frequency"] - previous_menu_dict.get(d["company"], 0)) / previous_menu_dict.get(
d["company"], 0), 4)
(d["req_frequency"] - previous_menu_dict.get(d["menu_name"], 0)) / previous_menu_dict.get(
d["menu_name"], 0), 4)
return data
except Exception, e:
logger.error("分析结构获取失败, err: {}, traceback: {}".format(str(e), traceback.format_exc()))
raise e

@ -7,6 +7,10 @@
"""
import json
import traceback
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
@ -17,7 +21,7 @@ class DBType(object):
LIST = 'list'
DICT = 'dict'
JOB_TABLE_NAME = "ueba_clean_jobs"
JOB_TABLE_NAME = "ueba_jobs"
ANALYSIS_TABLE_NAME = "ueba_analysis_log"
class DBUtils(object):
@ -73,7 +77,7 @@ class DBUtils(object):
"""
try:
sql_list = CPgSqlParam(sql)
logger.info("execute sql :\n {}\n".format(sql))
logger.info("execute sql:"+sql)
data = CFunction.execute(sql_list)
logger.info("execute result : {}".format(data))
return json.loads(data)
@ -100,10 +104,55 @@ class DBUtils(object):
@classmethod
def write_job_status(self,job_id,status,err):
sql = """update {JOB_TABLE_NAME} set status=%s err=%s
#success
if status == 2:
sql = """update {JOB_TABLE_NAME} set status=%s ,complate_time = %s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status,datetime.now(), job_id)))
#failed
if status == 3:
sql = """update {JOB_TABLE_NAME} set status=%s, err=%s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id)))
@classmethod
def insert_job_record(self,job_id,start_time,end_time,status):
sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status) values(%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status)))
#获取 job的执行时间 开始时间-结束时间
@classmethod
def get_job_period(self):
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=())))
data = {}
if res:
data["job_id"]=res[0][0]
data["end_time"]=res[0][1]
fields=["job_id", "end_time"]
#data = DBUtils.transition(fields, sql, DBType.LIST)
if len(data)==0:
start_time = datetime.now() - timedelta(minutes=5)
end_time = datetime.now()
if len(data)>0:
start_time = data["end_time"]
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
end_time = data["end_time"]
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + timedelta(minutes=5)
if end_time > datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
@classmethod
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
# if __name__ == '__main__':
# DBUtils.get_job_period()

@ -13,6 +13,13 @@ from appsUtils import env
APPFOLDERNAME = 'uebaMetricsAnalysis'
def get_clean_files():
fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files"
if not os.path.exists(fileroot):
os.mkdir(fileroot)
def get_logger(logfile):
"""
获取日志句柄

@ -14,14 +14,14 @@ from rest_framework.decorators import list_route, detail_route
from uebaMetricsAnalysis.utils.ext_logging import logger
from uebaMetricsAnalysis.lib.result import Result
from uebaMetricsAnalysis.utils import config
from uebaMetricsAnalysis.utils.dashboard_data import entry
from uebaMetricsAnalysis.utils.dashboard_data_pg import entry
class DashboardViewSets(viewsets.GenericViewSet):
@list_route(methods=['GET'])
def get_summary_data_list(self,request):
try:
data_type = request.GET.get('type')
startTime = "2024-01-01T00:00:00Z"# request.GET.get('startDate')
endTime = "2024-07-11T00:00:00Z" #request.GET.get('endDate')
startTime = request.GET.get('startDate')
endTime = request.GET.get('endDate')
#1:ip,2:账号,3:接口,4:菜单
logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime)
return Result.ok(entry(data_type,startTime,endTime))
@ -47,9 +47,3 @@ class DashboardViewSets(viewsets.GenericViewSet):
jsonfile_path = os.path.join(conf_path, 'defaultRule.json')
rule_data = config.read_json_config(jsonfile_path)
return Result.ok(rule_data)
@list_route(methods=['GET'])
def create_index(self,request):
res= es_operation.createIndex()
logger.error(json.dumps(res))
return Result.ok(json.dumps(res))
Loading…
Cancel
Save