Compare commits

...

2 Commits

  1. 78
      corn/ueba_corn_data_insert.py
  2. 48
      corn/ueba_corn_pg.py
  3. 28
      utils/base_dataclean_pg.py
  4. 11
      utils/dashboard_data_conversion.py
  5. 151
      utils/dashboard_data_pg.py
  6. 59
      utils/db2json.py
  7. 7
      utils/ext_logging.py
  8. 14
      views/dashboard_views.py

@ -0,0 +1,78 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id():
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
#每5分钟执行一次
def processing(self):
try:
logger.info("job:开始执行")
start,end=self.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
logger.info("job:"+"准备获取es数据")
#entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,20 +1,20 @@
# coding=utf-8 # coding=utf-8
""" """
@Author: fu-zhe @Author: tangwy
@FileName: user_cron.py @FileName: user_cron_pg.py
@DateTime: 2024/5/23 14:19 @DateTime: 2024/7/09 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天 @Description: 定时清洗es数据
""" """
from __future__ import unicode_literals from __future__ import unicode_literals
import collections
import random,string import random,string
import time,datetime import traceback,json
from datetime import datetime, timedelta import time
from datetime import datetime,timedelta
import calendar import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={ JOB_STATUS ={
"RUNNING":1, "RUNNING":1,
@ -24,48 +24,32 @@ JOB_STATUS ={
class UserCron: class UserCron:
def generate_job_id(): def generate_job_id(self):
timestamp = int(time.time() * 1000) timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7)) random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_clean_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
return start_time,end_time
#每5分钟执行一次 #每5分钟执行一次
def processing(self): def processing(self):
job_id =self.generate_job_id()
try: try:
logger.info("job:开始执行") logger.info("job:开始执行")
start,end=self.get_job_period() start,end= DBUtils.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None: if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行") logger.info("job:结束时间大于服务器时间不执行")
return return
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
logger.info("job:"+"准备获取es数据") logger.info("job:"+"准备获取es数据")
entry(start,end) entry(start,end)
logger.info("job:"+"执行完成") logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"") DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e: except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc()) err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
logger.error(err_info)
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info) DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise raise
if __name__ == '__main__': if __name__ == '__main__':

@ -203,6 +203,8 @@ def get_menu_group_data(index,startTime,endTime):
return datas return datas
def datetime_to_timestamp(dt):
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end): def clean_data(read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end) data_ip = get_ip_group_data(read_index,start,end)
# print "data_ip:"+str(len(data_ip)) # print "data_ip:"+str(len(data_ip))
@ -214,11 +216,11 @@ def clean_data(read_index,start,end):
# print "data_menu:"+str(len(data_menu)) # print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu res_data = data_ip+data_account+data_interface+data_menu
print ("resdata:"+json.dumps(res_data))
#todo 读取上一次5分钟的文件,与这5分钟的文件合并 #todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件 #合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start) group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start): def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
ipGroupStr = "ip,jobnum" ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr) ipGroup = group_and_sum(data_ip, ipGroupStr)
@ -265,8 +267,6 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
# 关闭文件 # 关闭文件
f.close() f.close()
def group_and_sum(data, by_fields="ip,jobnum"): def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表 # 将by_fields转换为列表
by_fields_list = by_fields.split(',') by_fields_list = by_fields.split(',')
@ -345,12 +345,30 @@ def merge_data(datasets):
return aggregated_data return aggregated_data
#入口 #入口
def entry(start,end): def entry(start,end):
base_index ="bsa_traffic*" base_index ="bsa_traffic*"
es_util_instance = EsUtil() es_util_instance = EsUtil()
# start = datetime_to_timestamp(start)
# end = datetime_to_timestamp(end)
res=es_util_instance.get_available_index_name(start,end,base_index) res=es_util_instance.get_available_index_name(start,end,base_index)
print "xxxx:"+str(len(res))
if len(res)==0: if len(res)==0:
return return
index =",".join(res) index =",".join(res)
clean_data(index,start,end) clean_data(index,start,end)
start = 1720772586000
end = 1720776186000
# # 将 datetime 对象转换为秒级时间戳
# timestamp_seconds = time.mktime(dt.timetuple())
# # 获取微秒数
# microseconds = dt.microsecond
# # 转换为毫秒级时间戳
# timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
entry(start,end)

@ -1,6 +1,7 @@
# coding=utf-8 # coding=utf-8
from __future__ import division from __future__ import division
import json import json
from datetime import timedelta,datetime
from collections import defaultdict from collections import defaultdict
jobnum_region_dict = { jobnum_region_dict = {
@ -268,3 +269,13 @@ def menu_summary_data_format(menu_summary_data):
return result return result
#调整时间
def adjust_times(start_time, end_time):
# 计算开始时间和结束时间之间的天数差
delta_days = (end_time - start_time).days
# 从开始和结束时间各自减去这个天数差
adjusted_start_time = start_time - timedelta(days=delta_days)
adjusted_end_time = end_time - timedelta(days=delta_days)
return adjusted_start_time, adjusted_end_time

@ -6,11 +6,10 @@ import json
import os, re import os, re
import codecs import codecs
import traceback import traceback
from datetime import datetime, timedelta
from collections import defaultdict from collections import defaultdict
from dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
from isoc.utils.esUtil import EsUtil interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code
from isoc.utils.dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
interface_summary_data_format, menu_summary_data_format, calculate_time_difference, summary_data_reqs_format
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
@ -34,15 +33,15 @@ def pg_get_ip_group_data(startTime, endTime):
""" """
result = [] result = []
sql = """ select ip, jobnum, sum(count) from {TABLE_NAME} sql = """ select ip, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by ip, jobnum""".format(TABLE_NAME=TABLE_NAME) group by ip, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"]))))
if res: if res:
for item in res: for item in res:
result.append({ result.append({
"ip": item[0], "ip": item[0],
"jobnum": item[2], "jobnum": item[1],
"count": item[3], "count": item[2],
}) })
return result return result
@ -55,15 +54,17 @@ def pg_get_account_group_data(startTime, endTime):
""" """
result = [] result = []
sql = """ select account, jobnum, sum(count) from {TABLE_NAME} sql = """ select account, jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by account, jobnum""".format(TABLE_NAME=TABLE_NAME) group by account, jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"]))))
if res: if res:
for item in res: for item in res:
print("items:"+json.dumps(item))
print("pg_get_account_group_data维度data:"+json.dumps(item[0]))
result.append({ result.append({
"account": item[0], "account": item[0],
"jobnum": item[2], "jobnum": item[1],
"count": item[3], "count": item[2],
}) })
return result return result
@ -75,17 +76,18 @@ def pg_get_interface_group_data(startTime, endTime):
:param endTime: 结束时间, :param endTime: 结束时间,
""" """
result = [] result = []
sql = """ select interface, sip, jobnum, sum(count) from {TABLE_NAME} sql = """ select interface, ip, jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by interface, ip, jobnum""".format(TABLE_NAME=TABLE_NAME) group by interface, ip, jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res: if res:
for item in res: for item in res:
result.append({ result.append({
"interface": item[0], "interface": item[0],
"ip": item[1], "ip": item[1],
"jobnum": item[2], "jobnum": item[2],
"count": item[3], "account": item[3],
"count": item[4],
}) })
return result return result
@ -97,17 +99,18 @@ def pg_get_menu_group_data(startTime, endTime):
:param endTime: 结束时间, :param endTime: 结束时间,
""" """
result = [] result = []
sql = """ select menu, sip, jobnum, sum(count) from {TABLE_NAME} sql = """ select menu, ip,jobnum,account,sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by menu, ip, jobnum""".format(TABLE_NAME=TABLE_NAME) group by menu, ip, jobnum ,account""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res: if res:
for item in res: for item in res:
result.append({ result.append({
"menu": item[0], "menu": item[0],
"ip": item[1], "ip": item[1],
"jobnum": item[2], "jobnum": item[2],
"count": item[3], "account": item[3],
"count": item[4],
}) })
return result return result
@ -123,16 +126,15 @@ def pg_get_previous_company_count(startTime, endTime, data_type):
if data_type in DATA_TYPE: if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type] data_type = DATA_TYPE[data_type]
sql = """ select jobnum, sum(count) from {TABLE_NAME} sql = """ select jobnum, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum""".format(TABLE_NAME=TABLE_NAME) group by jobnum""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res: if res:
for item in res: for item in res:
company = find_region_by_code(item[0], jobnum_region_dict) company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[1] result[company] += item[1]
return result return result
def pg_get_previous_interface_count(startTime, endTime): def pg_get_previous_interface_count(startTime, endTime):
""" """
接口维度查询请求总次数 接口维度查询请求总次数
@ -141,9 +143,9 @@ def pg_get_previous_interface_count(startTime, endTime):
""" """
result = defaultdict(int) result = defaultdict(int)
sql = """ select interface, sum(count) from {TABLE_NAME} sql = """ select interface, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by interface""".format(TABLE_NAME=TABLE_NAME) group by interface""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res: if res:
for item in res: for item in res:
result[item[0]] += item[1] result[item[0]] += item[1]
@ -158,61 +160,72 @@ def pg_get_previous_menu_count(startTime, endTime):
""" """
result = defaultdict(int) result = defaultdict(int)
sql = """ select menu, sum(count) from {TABLE_NAME} sql = """ select menu, sum(count) from {TABLE_NAME}
where logdate >= %s and logate < %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by menu""".format(TABLE_NAME=TABLE_NAME) group by menu""".format(TABLE_NAME=TABLE_NAME)
res = CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res: if res:
for item in res: for item in res:
result[item[0]] += item[1] result[item[0]] += item[1]
return result return result
def entry(data_type, start, end): def entry(data_type, start, end):
# 前一段开始时间 # 前一段开始时间
previous_time = calculate_time_difference(start, end) date_format = "%Y-%m-%d %H:%M:%S"
try: start = datetime.strptime(start, date_format)
data = {} end = datetime.strptime(end, date_format)
if data_type == "1": old_start,old_end = adjust_times(start, end)
ip_summary_data = pg_get_ip_group_data(start, end)
data = ip_summary_data_format(ip_summary_data) data = {}
if data_type == "1":
previous_company_dict = pg_get_previous_company_count(previous_time, start, "IP") ip_summary_data = pg_get_ip_group_data(start, end)
for d in data["summary"]["account"]: data = ip_summary_data_format(ip_summary_data)
previous_company_dict = pg_get_previous_company_count(old_start, start, "IP")
for d in data["summary"]["ip"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round( d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get( (d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4) d["company"], 0), 4)
if data_type == "2": if data_type == "2":
account_summary_data = pg_get_account_group_data(start, end) account_summary_data = pg_get_account_group_data(start, end)
data = account_summary_data_format(account_summary_data) data = account_summary_data_format(account_summary_data)
previous_company_dict = pg_get_previous_company_count(previous_time, start, "ACCOUNT") previous_company_dict = pg_get_previous_company_count(old_start, start, "ACCOUNT")
for d in data["summary"]["account"]: for d in data["summary"]["account"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round( d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get( (d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4) d["company"], 0), 4)
if data_type == "3": if data_type == "3":
interface_summary_data = pg_get_interface_group_data(start, end) interface_summary_data = pg_get_interface_group_data(start, end)
data = interface_summary_data_format(interface_summary_data) data = interface_summary_data_format(interface_summary_data)
previous_interface_dict = pg_get_previous_interface_count(previous_time, start) previous_interface_dict = pg_get_previous_interface_count(old_start, start)
for d in data["summary"]["account"]: for d in data["summary"]["interface"]:
if previous_interface_dict.get(d["interface_addr"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round( d["trend"] = round(
(d["req_frequency"] - previous_interface_dict.get(d["company"], 0)) / previous_interface_dict.get( (d["req_frequency"] - previous_interface_dict.get(d["interface_addr"], 0)) / previous_interface_dict.get(
d["company"], 0), 4) d["interface_addr"], 0), 4)
if data_type == "4": if data_type == "4":
menu_summary_data = pg_get_menu_group_data(start, end) menu_summary_data = pg_get_menu_group_data(start, end)
data = menu_summary_data_format(menu_summary_data) data = menu_summary_data_format(menu_summary_data)
previous_menu_dict = pg_get_previous_menu_count(previous_time, start) previous_menu_dict = pg_get_previous_menu_count(old_start, start)
for d in data["summary"]["account"]: for d in data["summary"]["menu"]:
if previous_menu_dict.get(d["menu_name"], 0) == 0:
d["trend"] = 0
else:
d["trend"] = round( d["trend"] = round(
(d["req_frequency"] - previous_menu_dict.get(d["company"], 0)) / previous_menu_dict.get( (d["req_frequency"] - previous_menu_dict.get(d["menu_name"], 0)) / previous_menu_dict.get(
d["company"], 0), 4) d["menu_name"], 0), 4)
return data return data
except Exception, e:
logger.error("分析结构获取失败, err: {}, traceback: {}".format(str(e), traceback.format_exc()))
raise e

@ -7,6 +7,10 @@
""" """
import json import json
import traceback import traceback
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
@ -17,7 +21,7 @@ class DBType(object):
LIST = 'list' LIST = 'list'
DICT = 'dict' DICT = 'dict'
JOB_TABLE_NAME = "ueba_clean_jobs" JOB_TABLE_NAME = "ueba_jobs"
ANALYSIS_TABLE_NAME = "ueba_analysis_log" ANALYSIS_TABLE_NAME = "ueba_analysis_log"
class DBUtils(object): class DBUtils(object):
@ -73,7 +77,7 @@ class DBUtils(object):
""" """
try: try:
sql_list = CPgSqlParam(sql) sql_list = CPgSqlParam(sql)
logger.info("execute sql :\n {}\n".format(sql)) logger.info("execute sql:"+sql)
data = CFunction.execute(sql_list) data = CFunction.execute(sql_list)
logger.info("execute result : {}".format(data)) logger.info("execute result : {}".format(data))
return json.loads(data) return json.loads(data)
@ -100,10 +104,55 @@ class DBUtils(object):
@classmethod @classmethod
def write_job_status(self,job_id,status,err): def write_job_status(self,job_id,status,err):
sql = """update {JOB_TABLE_NAME} set status=%s err=%s #success
if status == 2:
sql = """update {JOB_TABLE_NAME} set status=%s ,complate_time = %s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME) where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id))) CFunction.execute(CPgSqlParam(sql, params=(status,datetime.now(), job_id)))
#failed
if status == 3:
sql = """update {JOB_TABLE_NAME} set status=%s, err=%s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id)))
@classmethod @classmethod
def insert_job_record(self,job_id,start_time,end_time,status): def insert_job_record(self,job_id,start_time,end_time,status):
sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status) values(%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME) sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status) values(%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status))) CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status)))
#获取 job的执行时间 开始时间-结束时间
@classmethod
def get_job_period(self):
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=())))
data = {}
if res:
data["job_id"]=res[0][0]
data["end_time"]=res[0][1]
fields=["job_id", "end_time"]
#data = DBUtils.transition(fields, sql, DBType.LIST)
if len(data)==0:
start_time = datetime.now() - timedelta(minutes=5)
end_time = datetime.now()
if len(data)>0:
start_time = data["end_time"]
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
end_time = data["end_time"]
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + timedelta(minutes=5)
if end_time > datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
@classmethod
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
# if __name__ == '__main__':
# DBUtils.get_job_period()

@ -13,6 +13,13 @@ from appsUtils import env
APPFOLDERNAME = 'uebaMetricsAnalysis' APPFOLDERNAME = 'uebaMetricsAnalysis'
def get_clean_files():
fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files"
if not os.path.exists(fileroot):
os.mkdir(fileroot)
def get_logger(logfile): def get_logger(logfile):
""" """
获取日志句柄 获取日志句柄

@ -14,14 +14,14 @@ from rest_framework.decorators import list_route, detail_route
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger
from uebaMetricsAnalysis.lib.result import Result from uebaMetricsAnalysis.lib.result import Result
from uebaMetricsAnalysis.utils import config from uebaMetricsAnalysis.utils import config
from uebaMetricsAnalysis.utils.dashboard_data import entry from uebaMetricsAnalysis.utils.dashboard_data_pg import entry
class DashboardViewSets(viewsets.GenericViewSet): class DashboardViewSets(viewsets.GenericViewSet):
@list_route(methods=['GET']) @list_route(methods=['GET'])
def get_summary_data_list(self,request): def get_summary_data_list(self,request):
try: try:
data_type = request.GET.get('type') data_type = request.GET.get('type')
startTime = "2024-01-01T00:00:00Z"# request.GET.get('startDate') startTime = request.GET.get('startDate')
endTime = "2024-07-11T00:00:00Z" #request.GET.get('endDate') endTime = request.GET.get('endDate')
#1:ip,2:账号,3:接口,4:菜单 #1:ip,2:账号,3:接口,4:菜单
logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime) logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime)
return Result.ok(entry(data_type,startTime,endTime)) return Result.ok(entry(data_type,startTime,endTime))
@ -46,10 +46,4 @@ class DashboardViewSets(viewsets.GenericViewSet):
conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf') conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf')
jsonfile_path = os.path.join(conf_path, 'defaultRule.json') jsonfile_path = os.path.join(conf_path, 'defaultRule.json')
rule_data = config.read_json_config(jsonfile_path) rule_data = config.read_json_config(jsonfile_path)
return Result.ok(rule_data) return Result.ok(rule_data)
@list_route(methods=['GET'])
def create_index(self,request):
res= es_operation.createIndex()
logger.error(json.dumps(res))
return Result.ok(json.dumps(res))
Loading…
Cancel
Save