From 216269d3cbb366cf0e1e39abe7cf6068eb51b474 Mon Sep 17 00:00:00 2001 From: TANGWY Date: Tue, 27 Aug 2024 10:08:07 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=BA=8C=E8=BD=AE=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/sys_config.json | 3 + sql/pg_struct.sql | 11 +- utils/base_dataclean_pg.py | 89 ++++++++---- utils/dashboard_data_pg.py | 1 + utils/dashboard_detail_data.py | 134 +++++++++++++++++ utils/dashboard_summary_data.py | 247 ++++++++++++++++++++++++++++++++ utils/ext_logging.py | 18 ++- utils/file_helper.py | 45 +++++- utils/file_merge.py | 151 +++++++++++++++++-- utils/file_to_pg.py | 192 ++++++++++++++----------- views/dashboard_views.py | 59 ++++++-- 11 files changed, 811 insertions(+), 139 deletions(-) create mode 100644 conf/sys_config.json create mode 100644 utils/dashboard_detail_data.py create mode 100644 utils/dashboard_summary_data.py diff --git a/conf/sys_config.json b/conf/sys_config.json new file mode 100644 index 0000000..745cdb2 --- /dev/null +++ b/conf/sys_config.json @@ -0,0 +1,3 @@ +{ + "search_limit": 15 +} \ No newline at end of file diff --git a/sql/pg_struct.sql b/sql/pg_struct.sql index 66a3108..2f72933 100644 --- a/sql/pg_struct.sql +++ b/sql/pg_struct.sql @@ -1,13 +1,14 @@ CREATE SCHEMA if not exists ueba_analysis_schema; CREATE TABLE if not EXISTS ueba_analysis_schema.logs ( id SERIAL, - menu VARCHAR(100), - interface VARCHAR(500), + menu VARCHAR(50), + interface VARCHAR(300), ip INET, - account VARCHAR(50), - jobnum VARCHAR(50), + account VARCHAR(30), + jobnum VARCHAR(30), count int, logdate date NOT NULL, + company VARCHAR(30), data_type int) PARTITION BY RANGE (logdate); CREATE TABLE if not EXISTS ueba_analysis_schema.jobs ( @@ -20,5 +21,5 @@ CREATE TABLE if not EXISTS ueba_analysis_schema.jobs ( complate_time TIMESTAMP, err text ); -CREATE INDEX if not exists idx_logdate_data_type ON ueba_analysis_schema.logs (data_type,logdate); +CREATE INDEX if not exists idx_logdate_data_type ON ueba_analysis_schema.logs (logdate,data_type); CREATE INDEX if not exists idx_job_id ON ueba_analysis_schema.jobs (job_id); diff --git a/utils/base_dataclean_pg.py b/utils/base_dataclean_pg.py index b6d8afe..51015ee 100644 --- a/utils/base_dataclean_pg.py +++ b/utils/base_dataclean_pg.py @@ -7,10 +7,11 @@ from datetime import datetime, timedelta import calendar import codecs from esUtil import EsUtil -from file_helper import write_large_file,merge_data -from file_helper import read_large_json_file,json_to_csv_data,write_csv +from file_helper import write_large_file,get_file_content,TRACE_PATH +from dashboard_data_conversion import find_region_by_code,jobnum_region_dict from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path from collections import defaultdict +from ext_logging import logger_trace size = 9999#根据实际情况调整 @@ -254,45 +255,85 @@ def clean_data(read_index,start,end,jobid): group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid) def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid): - ipGroupStr = "ip,jobnum" - ipGroup = group_and_sum(data_ip, ipGroupStr) - accountGroupStr = "account,jobnum" - accountGroup = group_and_sum(data_account, accountGroupStr) - interfaceGroupStr = "interface,ip,account,jobnum" - interfaceGroup = group_and_sum(data_interface, interfaceGroupStr) - menuGroupStr = "menu,ip,account,jobnum" - menuGroup = group_and_sum(data_menu, menuGroupStr) - - data = {} - data["ip"] = ipGroup - data["account"] = accountGroup - data["interface"] = interfaceGroup - data["menu"] = menuGroup - # 获取当前工作目录 base_path = get_clean_file_path() logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path) date_time = convert_utc_to_local_time(start) #临时文件 临时文件格式:20240720-1630_tmp.json - tmp_file_name = time.strftime("%Y%m%d-%H%M_tmp.csv", date_time) + tmp_file_name = time.strftime("%Y%m%d-%H%M_tmp.json", date_time) tmp_file_path = os.path.join(base_path,tmp_file_name) #正式文件 正式文件格式:20240720-1630.json - file_name = time.strftime("%Y%m%d-%H%M.csv", date_time) + file_name = time.strftime("%Y%m%d-%H%M.json", date_time) file_path = os.path.join(base_path,file_name) logger_cron.info("JOB:"+jobid+", tmpfilepath"+tmp_file_path) - all_data = [data] - merged_data = merge_data(all_data) - csv_data = json_to_csv_data(merged_data,"") + #(datatype,menu,ip,account,jobnum,interface) count + records = {} + for item in data_ip: + menu = remove_commas(item.get('menu', '')) + ip = item.get('ip', '0.0.0.0') + account = remove_commas(item.get('account', '')) + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + datatype = DATA_TYPE.get("IP",1) + interface = remove_commas(item.get('interface', '')) + company = find_region_by_code(jobnum,jobnum_region_dict) + records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count + #日志追踪 + if not os.path.exists(TRACE_PATH): + write_large_file(TRACE_PATH, ",".join([str(datatype), menu, ip,account,jobnum,interface,company])) + + for item in data_account: + menu = remove_commas(item.get('menu', '')) + ip = item.get('ip', '0.0.0.0') + account = remove_commas(item.get('account', '')) + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + datatype = DATA_TYPE.get("ACCOUNT",2) + interface = remove_commas(item.get('interface', '')) + company = find_region_by_code(jobnum,jobnum_region_dict) + records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count + for item in data_interface: + menu = remove_commas(item.get('menu', '')) + ip = item.get('ip', '0.0.0.0') + account = remove_commas(item.get('account', '')) + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + datatype = DATA_TYPE.get("INTERFACE",3) + interface = remove_commas(item.get('interface', '')) + company = find_region_by_code(jobnum,jobnum_region_dict) + records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count + for item in data_menu: + menu = remove_commas(item.get('menu', '')) + ip = item.get('ip', '0.0.0.0') + account = remove_commas(item.get('account', '')) + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + datatype = DATA_TYPE.get("MENU",4) + interface = remove_commas(item.get('interface', '')) + company = find_region_by_code(jobnum,jobnum_region_dict) + records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count + + json_data = json.dumps(records) + + ########问题排查################# + key=get_file_content() + if key in records: + logger_trace.info("baseclean:"+jobid+file_path+":"+str(records[key])) + #写入文件 logger_cron.info("JOB: "+jobid+",准备写入文件") - write_csv(csv_data, tmp_file_path) - + write_large_file(tmp_file_path,json_data) #重命名文件 os.rename(tmp_file_path, file_path) + logger_cron.info("JOB: "+jobid+",写入文件完成") +#原始数据去掉逗号 +def remove_commas(record): + return ''.join(c for c in record if c != ',') + def group_and_sum(data, by_fields="ip,jobnum"): # 将by_fields转换为列表 by_fields_list = by_fields.split(',') diff --git a/utils/dashboard_data_pg.py b/utils/dashboard_data_pg.py index 94b16de..20824e7 100644 --- a/utils/dashboard_data_pg.py +++ b/utils/dashboard_data_pg.py @@ -14,6 +14,7 @@ from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam from ext_logging import logger +TRACE_KEY = "" TABLE_NAME = "ueba_analysis_schema.logs" DATA_TYPE = { diff --git a/utils/dashboard_detail_data.py b/utils/dashboard_detail_data.py new file mode 100644 index 0000000..e11016e --- /dev/null +++ b/utils/dashboard_detail_data.py @@ -0,0 +1,134 @@ +#!/usr/bin/python +# encoding=utf-8 +# author: tangwy +from __future__ import division +import json +import os, re +import codecs +import traceback +from datetime import datetime, timedelta +from collections import defaultdict +from dataInterface.functions import CFunction +from dataInterface.db.params import CPgSqlParam +from ext_logging import logger + + +TABLE_NAME = "ueba_analysis_schema.logs" + +DATA_TYPE = { + "IP": 1, + "ACCOUNT": 2, + "INTERFACE": 3, + "MENU": 4, +} + +#安全除 +def safe_divide(numerator, denominator): + if denominator == 0: + return + else: + return numerator / denominator +#ip维度 +def get_ip_data(startTime, endTime,keyword): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = [] + sql = """ select ip,jobnum, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s and company = %s + group by ip,jobnum order by count desc limit 200""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"],keyword)))) + if res: + for item in res: + result.append({ + "req_ip": item[0], + "req_jobnum": item[1], + "req_frequency": item[2], + }) + return result +#账号维度 +def get_account_data(startTime, endTime,keyword): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = [] + sql = """ select account,jobnum, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s and company = %s + group by account,jobnum order by count desc limit 200""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"],keyword)))) + if res: + for item in res: + result.append({ + "req_account": item[0], + "req_jobnum": item[1], + "req_frequency": item[2], + }) + return result + +#接口维度 +def get_interface_data(startTime, endTime,keyword): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = [] + sql = """select ip,account,jobnum,sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s and interface = %s + group by ip,account,jobnum order by count desc limit 200""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"],keyword)))) + if res: + for item in res: + result.append({ + "req_ip": item[0], + "req_jobnum": item[2], + "req_account": item[1], + "req_frequency": item[3], + "interface_addr":keyword, + }) + + return result + +#菜单维度 +def get_menu_data(startTime, endTime,keyword): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = [] + sql = """select ip,jobnum,account,sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s and menu = %s + group by ip,jobnum,account order by count desc limit 200""".format(TABLE_NAME=TABLE_NAME) + logger.info(sql) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"],keyword)))) + if res: + logger.info(str(len(res))) + for item in res: + result.append({ + "req_ip": item[0], + "req_jobnum": item[1], + "req_account": item[2], + "req_frequency": item[3], + "menu_name":keyword, + }) + return result + +#入口 +def detail_data_entry(startTime, endTime,data_type,keyWord): + data = {} + if data_type == "1": + data=get_ip_data(startTime=startTime,endTime=endTime,keyword=keyWord) + if data_type == "2": + data=get_account_data(startTime=startTime,endTime=endTime,keyword=keyWord) + if data_type == "3": + data=get_interface_data(startTime=startTime,endTime=endTime,keyword=keyWord) + if data_type == "4": + data=get_menu_data(startTime=startTime,endTime=endTime,keyword=keyWord) + + return data + \ No newline at end of file diff --git a/utils/dashboard_summary_data.py b/utils/dashboard_summary_data.py new file mode 100644 index 0000000..1a7b5c4 --- /dev/null +++ b/utils/dashboard_summary_data.py @@ -0,0 +1,247 @@ +#!/usr/bin/python +# encoding=utf-8 +# author: tangwy +from __future__ import division +import json +import os, re +import codecs +import traceback +from datetime import datetime, timedelta +from collections import defaultdict +from dashboard_data_conversion import adjust_times +from dataInterface.functions import CFunction +from dataInterface.db.params import CPgSqlParam +from ext_logging import logger + +TABLE_NAME = "ueba_analysis_schema.logs" + +DATA_TYPE = { + "IP": 1, + "ACCOUNT": 2, + "INTERFACE": 3, + "MENU": 4, +} + +#安全除 +def safe_divide(numerator, denominator): + if denominator == 0: + return + else: + return numerator / denominator +#ip维度 +def get_ip_summary_data(startTime, endTime): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = {} + sql = """ select company, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by company""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result +#账号维度 +def get_account_summary_data(startTime, endTime): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = {} + sql = """ select company, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by company""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result + +#接口维度 +def get_interface_summary_data(startTime, endTime): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = {} + sql = """select interface, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by interface order by count desc limit 20""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result + +#菜单维度 +def get_menu_summary_data(startTime, endTime): + """ + IP维度查询 + :param startTime: 开始时间, + :param endTime: 结束时间, + """ + result = {} + sql = """select menu, sum(count) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by menu""".format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result + +#获取IP count +def get_ip_count(startTime, endTime): + result = {} + sql = """select company, count(distinct ip) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by company """.format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result + +#获取account count +def get_account_count(startTime, endTime): + result = {} + sql = """select company ,count(distinct account) as count from {TABLE_NAME} + where logdate >= %s and logdate <= %s and data_type = %s + group by company """.format(TABLE_NAME=TABLE_NAME) + res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"])))) + if res: + for item in res: + result[item[0]]=item[1] + return result + +#获取前一个周期数据 +def get_pre_date(startTime,endTime): + date_format = "%Y-%m-%d %H:%M:%S" + start = datetime.strptime(startTime, date_format) + end = datetime.strptime(endTime, date_format) + start = start.strftime('%Y-%m-%d') + end = end.strftime('%Y-%m-%d') + old_start,old_end = adjust_times(start, end) + return old_start,old_end + +#ip维度汇总数据计算 +def ip_summary_calcule(startTime, endTime): + logger.info("begin") + old_start,old_end = get_pre_date(startTime,endTime) + pre_data = get_ip_summary_data(startTime=old_start,endTime=old_end) + logger.info("完成pre_data查询") + res_data = [] + data = get_ip_summary_data(startTime=startTime,endTime=endTime) + ip_count_data = get_ip_count(startTime, endTime) + total_ip_count = sum(ip_count_data.itervalues()) + total_frequency = sum(data.itervalues()) + + for key, value in data.iteritems(): + tmp={} + tmp["company"]=key + tmp["req_frequency"]=value + tmp["frequency_rate"]=round(safe_divide(value,total_frequency),4) + tmp["ip_rate"]=round(safe_divide(ip_count_data[key],total_ip_count),4) + tmp["ip_count"]=ip_count_data[key] + tmp["ip_avg"]=round(safe_divide(value,ip_count_data[key]),4) + if key in pre_data: + tmp["trend"]= round(safe_divide((value-pre_data[key]),pre_data[key]),4) + else: + tmp["trend"]=0 + res_data.append(tmp) + result = {"summary": {"ip": res_data}, "detail": {"ip": {}}} + return result + +#account维度汇总数据计算 +def account_summary_calcule(startTime, endTime): + old_start,old_end = get_pre_date(startTime,endTime) + pre_data = get_account_summary_data(startTime=old_start,endTime=old_end) + + res_data = [] + data = get_account_summary_data(startTime=startTime,endTime=endTime) + account_count_data = get_account_count(startTime, endTime) + total_account_count = sum(account_count_data.itervalues()) + total_frequency = sum(data.itervalues()) + + for key, value in data.iteritems(): + tmp={} + tmp["company"]=key + tmp["req_frequency"]=value + tmp["frequency_rate"]=round(safe_divide(value,total_frequency),4) + tmp["account_rate"]=round(safe_divide(account_count_data[key],total_account_count),4) + tmp["account_count"]=account_count_data[key] + tmp["account_avg"]=round(safe_divide(value,account_count_data[key]),4) + if key in pre_data: + tmp["trend"]= round(safe_divide((value-pre_data[key]),pre_data[key]),4) + else: + tmp["trend"]=0 + res_data.append(tmp) + result = {"summary": {"account": res_data}, "detail": {"account": {}}} + return result + +#接口维度汇总数据计算 +def interface_summary_calcule(startTime, endTime): + old_start,old_end = get_pre_date(startTime,endTime) + pre_data = get_interface_summary_data(startTime=old_start,endTime=old_end) + + res_data = [] + data = get_interface_summary_data(startTime=startTime,endTime=endTime) + total_frequency = sum(data.itervalues()) + for key, value in data.iteritems(): + tmp={} + tmp["interface_addr"]=key + tmp["req_frequency"]=value + tmp["frequency_rate"]=round(safe_divide(value,total_frequency),4) + tmp["frequency_avg"]=round(safe_divide(value,20),4) + if key in pre_data: + tmp["trend"]= round(safe_divide((value-pre_data[key]),pre_data[key]),4) + else: + tmp["trend"]=0 + res_data.append(tmp) + result = {"summary": {"interface": res_data}, "detail": {"interface": {}}} + return result + +#菜单维度汇总数据计算 +def menu_summary_calcule(startTime, endTime): + logger.info("begin") + old_start,old_end = get_pre_date(startTime,endTime) + pre_data = get_menu_summary_data(startTime=old_start,endTime=old_end) + logger.info("完成pre_data查询") + res_data = [] + data = get_menu_summary_data(startTime=startTime,endTime=endTime) + logger.info("完成data查询") + total_frequency = sum(data.itervalues()) + logger.info("完成合计计算") + for key, value in data.iteritems(): + tmp={} + tmp["menu_name"]=key + tmp["req_frequency"]=value + tmp["frequency_rate"]=round(safe_divide(value,total_frequency),4) + tmp["frequency_avg"]=round(safe_divide(value,len(data)),4) + if key in pre_data: + tmp["trend"]= round(safe_divide((value-pre_data[key]),pre_data[key]),4) + else: + tmp["trend"]=0 + res_data.append(tmp) + logger.info("完成数据处理") + result = {"summary": {"menu": res_data}, "detail": {"menu": {}}} + return result + +#入口 +def summary_data_entry(startTime, endTime,data_type): + data = {} + if data_type == "1": + data=ip_summary_calcule(startTime=startTime,endTime=endTime) + if data_type == "2": + data=account_summary_calcule(startTime=startTime,endTime=endTime) + if data_type == "3": + data=interface_summary_calcule(startTime=startTime,endTime=endTime) + if data_type == "4": + data=menu_summary_calcule(startTime=startTime,endTime=endTime) + + return data \ No newline at end of file diff --git a/utils/ext_logging.py b/utils/ext_logging.py index f10d513..8b78a52 100644 --- a/utils/ext_logging.py +++ b/utils/ext_logging.py @@ -10,16 +10,26 @@ import os from mlogging import TimedRotatingFileHandler_MP from appsUtils import env +#应用日志 APPFOLDERNAME = 'uebaMetricsAnalysis' - +#定时任务 APP_CRON_FOLDERNAME = 'uebaMetricsAnalysis_cron' - +#数据追踪 用于数据排查 +APP_TRACE_FOLDERNAME = 'uebaMetricsAnalysis_trace' +#审计日志 +APP_AUDIT_FOLDERNAME = 'uebaMetricsAnalysis_audit' def get_clean_file_path(): fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files" if not os.path.exists(fileroot): os.mkdir(fileroot) return fileroot +def merge_large_file_path(): + file_path = get_clean_file_path()+"/merge_files" + if not os.path.exists(file_path): + os.mkdir(file_path) + return file_path + def get_logger(logfile): """ 获取日志句柄 @@ -43,4 +53,6 @@ def get_logger(logfile): logger = get_logger(APPFOLDERNAME) -logger_cron = get_logger(APP_CRON_FOLDERNAME) \ No newline at end of file +logger_cron = get_logger(APP_CRON_FOLDERNAME) +logger_trace = get_logger(APP_TRACE_FOLDERNAME) +logger_audit = get_logger(APP_AUDIT_FOLDERNAME) \ No newline at end of file diff --git a/utils/file_helper.py b/utils/file_helper.py index 3d4303a..180fe2d 100644 --- a/utils/file_helper.py +++ b/utils/file_helper.py @@ -7,15 +7,26 @@ from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path +TRACE_PATH = "/home/master/ISOP/apps/uebaMetricsAnalysis/logs/trace.cfg" + #写入大文件5M -def write_large_file(filename, data_list, chunk_size=1024*1024*5): +def write_large_file(filename, data_list, chunk_size=1024*1024*20): with codecs.open(filename, 'w', encoding='utf-8') as f: for i in range(0, len(data_list), chunk_size): chunk = data_list[i:i + chunk_size] f.write(chunk) +#判断文件是否大于500M +def is_file_larger_than_500mb(file_path): + file_size = os.path.getsize(file_path) + file_size_in_mb = file_size / (1024.0 * 1024) + if file_size_in_mb > 500: + return True + else: + return False + #读取大文件 -def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据 +def read_large_json_file(filename, chunk_size=1024*1024*10): # 每次读取10MB的数据 json_object = '' with codecs.open(filename, 'r', encoding='utf-8') as f: while True: @@ -82,4 +93,32 @@ def merge_data(datasets): ] return aggregated_data - \ No newline at end of file + +def merge_data_new(datasets): + # 创建一个新的空字典来存储结果 + result = {} + + # 遍历列表中的每一个字典 + for d in datasets: + for key, value in d.iteritems(): # 遍历当前字典中的键值对 + if key in result: + # 如果键已经存在于结果中,则将值相加 + result[key] = str(int(result[key]) + int(value)) + else: + # 否则,直接添加键值对 + result[key] = value + + return result + +#获取文件内容不做jsonload处理 +def get_file_content(): + json_object = '' + if os.path.exists(TRACE_PATH): + with codecs.open(TRACE_PATH, 'r', encoding='utf-8') as f: + while True: + chunk = f.read(1024*1024*1) + if not chunk: + break + json_object += chunk + + return json_object \ No newline at end of file diff --git a/utils/file_merge.py b/utils/file_merge.py index a732ed0..5600d4e 100644 --- a/utils/file_merge.py +++ b/utils/file_merge.py @@ -1,15 +1,16 @@ #!/usr/bin/python #encoding=utf-8 # author: tangwy -import re,os,json +import re,os,json,time import codecs from db2json import DBUtils from datetime import datetime, timedelta -from ext_logging import logger_cron,get_clean_file_path -from file_helper import read_large_json_file,write_large_file,merge_data,delete_frile +from base_dataclean_pg import TRACE_KEY +from ext_logging import logger_cron,get_clean_file_path,merge_large_file_path,logger_trace +from file_helper import read_large_json_file,write_large_file,get_file_content,delete_frile,is_file_larger_than_500mb,merge_data_new from collections import defaultdict -date_pattern = re.compile(r'\d{8}-\d{4}\.json') +date_pattern = re.compile(r'^\d{8}-\d{4}\.json$') def get_all_files(path): # 列出所有包含匹配模式的文件名 @@ -30,8 +31,20 @@ def get_file_merge_array(filenames): file_dict = dict(file_dict) return file_dict -#合并所以文件 -def merge_all_files(file_dict,base_path): +# 将 2024-08-08.json 移动大文件目录下 等待入库前的合并【只移动前一天的文件】 +def mv_root_file_to_current(base_path,new_path): + new_file_name = (datetime.now()-timedelta(days=1)).strftime("%Y%m%d-2359.json") + old_file_name = (datetime.now()-timedelta(days=1)).strftime("%Y-%m-%d.json") + + old_full_path=os.path.join(base_path,old_file_name) + new_full_path = os.path.join(new_path,new_file_name) + if os.path.exists(old_full_path): + if os.path.exists(new_full_path): + logger_cron.error("MERGE_LARG_EFILE: 文件 "+new_full_path+"已经存在,任然移动会被覆盖") + os.rename(old_full_path,new_full_path) + +#合并大于>500M的文件 +def merge_large_files(file_dict,base_path): # 遍历字典中的每一个键值对 for date_str, files in file_dict.items(): #20240721 @@ -39,29 +52,126 @@ def merge_all_files(file_dict,base_path): full_root_file_path = os.path.join(base_path,root_file_path) if len(files)>0: file_objs=[] + file_full_path = [] + # 合并的数据存储变量 + merge_tmp_data = {} + for filename in files: + #20240721-0170.json + full_path = os.path.join(base_path,filename) + file_full_path.append(full_path) + logger_cron.info("INSERT: 准备读取文件做合并"+full_path) + tmp_data =read_large_json_file(full_path) + logger_cron.info("INSERT: 数据量"+str(len(tmp_data))) + + file_objs.append(tmp_data) + file_objs.append(merge_tmp_data) + merge_tmp_data = merge_data_new(file_objs) + logger_cron.info("INSERT: 合并完成"+full_path) + #移除已经合并的数据 + del file_objs[:] + + #判断files目录是否存在结果文件 (2024-08-08.json) if os.path.exists(full_root_file_path): + logger_cron.info("INSERT: 准备读取文件做合并"+full_root_file_path) root_data = read_large_json_file(full_root_file_path) + logger_cron.info("INSERT: 数据量"+str(len(root_data))) file_objs.append(root_data) + file_objs.append(merge_tmp_data) + merge_tmp_data = merge_data_new(file_objs) + logger_cron.info("INSERT: 合并完成"+full_root_file_path) + logger_cron.info("INSERT: 准备写入合并的文件") + + ######################问题排查 + key=get_file_content() + if key in merge_tmp_data: + logger_trace.info("largefilemerge:"+full_root_file_path+":"+key+":"+str(merge_tmp_data[key])) + + write_large_file(full_root_file_path,json.dumps(merge_tmp_data)) + logger_cron.info("INSERT: 写入合并文件完成") + #准备删除合并文件 + for del_file in file_full_path: + logger_cron.info("INSERT: 准备删除 "+del_file) + delete_frile(del_file) + #os.rename(del_file,del_file+".cmp") + logger_cron.info("INSERT: 完成删除 "+del_file) + +#2024-08-23.json 换成20240823-1410 时分为当前时间的时分 +def get_new_file_name(old_file_path): + file_name_with_ext = os.path.basename(old_file_path) + file_name, file_extension = os.path.splitext(file_name_with_ext) + sf=time.strftime("%H%M.json", time.localtime()) + new_name = file_name.replace("-","") + res_name = new_name+"-"+sf + + return res_name + +#合并所有文件 +def merge_all_files(file_dict,base_path): + # 遍历字典中的每一个键值对 + for date_str, files in file_dict.items(): + #20240721 + root_file_path = "{}-{}-{}.json".format(date_str[:4], date_str[4:6], date_str[6:]) + full_root_file_path = os.path.join(base_path,root_file_path) + if len(files)>0: + file_objs=[] + file_full_path = [] + # 合并的数据存储变量 + merge_tmp_data = {} file_full_path = [] for filename in files: #20240721-0170.json full_path = os.path.join(base_path,filename) file_full_path.append(full_path) - logger_cron.info("FILE_MERGE: 准备读取文件"+full_path) + logger_cron.info("FILE_MERGE: 准备读取文件做合并"+full_path) + tmp_data =read_large_json_file(full_path) file_objs.append(tmp_data) - - logger_cron.info("FILE_MERGE: 准备合并文件") - data = merge_data(file_objs) - logger_cron.info("FILE_MERGE: 准备写入合并的文件") - write_large_file(full_root_file_path,json.dumps(data)) + file_objs.append(merge_tmp_data) + merge_tmp_data = merge_data_new(file_objs) + logger_cron.info("FILE_MERGE: 文件合并完成"+full_path) + #移除已经合并的数据 + del file_objs[:] + + if os.path.exists(full_root_file_path): + flag=is_file_larger_than_500mb(full_root_file_path) + if flag: + logger_cron.info("FILE_MERGE: 文件超过500M需要移动到merge_file目录"+full_root_file_path) + large_file_root_path = merge_large_file_path() + + #新的文件名 + new_file_name = get_new_file_name(full_root_file_path) + logger_cron.info("FILE_MERGE: 新文件名 "+new_file_name) + large_file_path = os.path.join(large_file_root_path,new_file_name) + logger_cron.info("FILE_MERGE: oldpath "+full_root_file_path) + if os.path.exists(large_file_path): + logger_cron.error("FILE_MERGE: 文件 "+large_file_path+"已经存在,任然移动会被覆盖") + + os.rename(full_root_file_path, large_file_path) + logger_cron.info("FILE_MERGE: newpath "+large_file_path+"移动成功") + else: + logger_cron.info("FILE_MERGE: 文件小于500M需要参与合并"+full_root_file_path) + root_data = read_large_json_file(full_root_file_path) + file_objs.append(root_data) + file_objs.append(merge_tmp_data) + merge_tmp_data = merge_data_new(file_objs) + + ###################问题排查 + key=get_file_content() + if key in merge_tmp_data: + logger_trace.info("filemerge:"+full_root_file_path+":"+key+":"+str(merge_tmp_data[key])) + + logger_cron.info("FILE_MERGE: 所有文件合并完成") + write_large_file(full_root_file_path,json.dumps(merge_tmp_data)) logger_cron.info("FILE_MERGE: 写入合并文件完成") #准备删除合并文件 for del_file in file_full_path: logger_cron.info("FILE_MERGE: 准备删除 "+del_file) delete_frile(del_file) + #os.rename(del_file,del_file+".cmp") logger_cron.info("FILE_MERGE: 完成删除 "+del_file) + +#每半小时执行的合并 def entry(): #清洗目录 base_path = get_clean_file_path() @@ -72,3 +182,20 @@ def entry(): #合并所有文件 logger_cron.info("FILE_MERGE: 准备执行文件合并") merge_all_files(file_dict,base_path) + +#入库前执行的大文件合并 +def merge_large_entry(): + base_path = get_clean_file_path() + #清洗目录 + new_base_path = merge_large_file_path() + #将 2024-08-08.json 移动到merge_file目录下 等待入库前的合并 + mv_root_file_to_current(base_path,new_base_path) + #匹配待清洗的文件 + files = get_all_files(new_base_path) + logger_cron.info("INSERT: 待合并的文件"+json.dumps(files)) + #对待清洗的文件进行分组 + file_dict =get_file_merge_array(files) + #合并所有文件 + logger_cron.info("INSERT: 准备执行文件合并") + merge_large_files(file_dict,new_base_path) + diff --git a/utils/file_to_pg.py b/utils/file_to_pg.py index 4193feb..2094720 100644 --- a/utils/file_to_pg.py +++ b/utils/file_to_pg.py @@ -4,10 +4,12 @@ import re,os,json import codecs,csv from db2json import DBUtils +import psycopg2 from datetime import datetime, timedelta -from ext_logging import logger_cron,get_clean_file_path -from file_helper import read_large_json_file -from file_merge import entry as merge_entry +from ext_logging import logger_cron,merge_large_file_path,logger_trace +from file_helper import read_large_json_file,write_large_file,get_file_content +from file_merge import merge_large_entry,entry as merge_entry +from dashboard_data_conversion import find_region_by_code,jobnum_region_dict from appsUtils.confutil import ConfUtil from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam @@ -15,6 +17,8 @@ from dataInterface.db.params import CPgSqlParam date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$') LOG_TABLE_NAME = "ueba_analysis_schema.logs" +FILED_NAMES = ['data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count'] +FILED_NAMES_TUMP = ('data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count') DATA_TYPE = { "IP": 1, @@ -25,8 +29,8 @@ DATA_TYPE = { # 获取当前日期并格式化为"年-月" def get_current_year_month(): - now = datetime.now() - return now.strftime("%Y_%m") + table_name = (datetime.now()-timedelta(days=1)).strftime("%Y_%m_%d") + return table_name # 获取当前月份的第一天并格式化为"年-月-日" def get_first_day_of_current_month(): @@ -44,10 +48,14 @@ def get_first_day_of_next_month(): return next_month.strftime("%Y-%m-%d") #获取表名 -def get_table_name(): +def get_table_data_range_new(): year_month = get_current_year_month() return LOG_TABLE_NAME+'_'+ year_month +def get_table_range(): + end = datetime.now().strftime("%Y-%m-%d") + start = (datetime.now()-timedelta(days=1)).strftime("%Y-%m-%d") + return start,end #获取表区间 def get_table_data_range(): start= get_first_day_of_current_month() @@ -55,9 +63,9 @@ def get_table_data_range(): return start,end #创建分区表 -def create_fq_table(): - table_name = get_table_name() - start,end = get_table_data_range() +def create_pq_table(): + table_name = get_table_data_range_new() + start,end = get_table_range() logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end) @@ -83,21 +91,35 @@ def get_all_files(path): files = [] for filename in os.listdir(path): if date_pattern.search(filename): - #由于定时任务是凌晨3点执行 所以只处理昨天的数据,今天的不处理 + #由于定时任务是凌晨2点执行 所以只处理昨天的数据,今天的不处理 if datetime.now().strftime("%Y-%m-%d")+".json" != filename: files.append({"filename": filename, "path": os.path.join(path,filename)}) return files +#写csv文件不写列名 def json_to_csvFile(json_data, csv_file): - # 提取字段名 - fields = json_data[0].keys() # 假设第一个元素包含所有可能的键 with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() + writer = csv.DictWriter(csvfile, fieldnames=FILED_NAMES) + # writer.writeheader() for row in json_data: row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} writer.writerow(row) +def copy_from_file(conn, table_name, file_path, columns): + with conn.cursor() as cur: + with open(file_path, 'r') as f: + cur.copy_from(f, table_name, sep=',', columns=columns) + conn.commit() + +def csv_to_pg_new(file_path): + confUtil = ConfUtil() + pgInfo = confUtil.getPostgresqlConf() + conn = psycopg2.connect(host=pgInfo["ip"], database=pgInfo["database"], user=pgInfo["username"], password=pgInfo["password"]) + table_name = LOG_TABLE_NAME + + copy_from_file(conn,table_name,file_path,FILED_NAMES_TUMP) + conn.close() + def csv_to_pg(sql): logger_cron.info("INSERT: 准备数据入库") confutil = ConfUtil() @@ -110,81 +132,49 @@ def csv_to_pg(sql): logger_cron.info("INSERT: 数据入库完成") #数据入库 -def insert_data(files): +def insert_data(files,base_path): for itemFile in files: if os.path.exists(itemFile.get("path",'')): - data =read_large_json_file(itemFile.get("path",'')) logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path','')) + data =read_large_json_file(itemFile.get("path",'')) logger_cron.info("INSERT: 读取聚合文件完成") - ip_list = data.get('ip', []) - account_list = data.get('account', []) - interface_list = data.get('interface', []) - menu_list = data.get('menu', []) - logger_cron.info("INSERT: IP维度 " +str(len(ip_list))) - logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list))) - logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list))) - logger_cron.info("INSERT: MENU维度 " +str(len(menu_list))) + logger_cron.info("INSERT: 总数据 " +str(len(data))) + #########问题排查 + key=get_file_content() + if key in data: + logger_trace.info("filetopg:"+key+":"+str(data[key])) basename, extension = os.path.splitext(itemFile.get('filename', '')) log_date = basename # print ("filename:"+log_date) records = [] - for item in ip_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("IP",1) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in account_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("ACCOUNT",2) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in interface_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("INTERFACE",3) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in menu_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("MENU",4) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - - csv_file = get_clean_file_path()+"/"+log_date+".csv" + for key, value in data.iteritems(): + #(datatype,menu,ip,account,jobnum,interface,company) count + res_str = ",".join([key,log_date, str(value)]) + records.append(res_str) + + res_str = "\n".join(records) + csv_file = base_path+"/"+log_date+".csv" logger_cron.info("INSERT: 开始写csv文件") - json_to_csvFile(records,csv_file) - sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file) - csv_to_pg(sql) + write_large_file(csv_file,res_str) + # json_to_csvFile(records,csv_file) + # sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file) + # csv_to_pg(sql) + + logger_cron.info("INSERT: 准备数据入库") + csv_to_pg_new(csv_file) + logger_cron.info("INSERT: 完成数据入库") - #重命名文件 + #重命名文件json文件 logger_cron.info(itemFile.get('path','')) logger_cron.info("done_"+itemFile.get('filename', '')) - os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', '')) + os.rename(itemFile.get('path',''),base_path+"/done_"+itemFile.get('filename', '')) logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', '')) - + + #重命名文件csv文件 logger_cron.info("done_"+itemFile.get('filename', '')) - os.rename(csv_file,get_clean_file_path()+"/done_"+log_date+".csv") + os.rename(csv_file,base_path+"/done_"+log_date+".csv") logger_cron.info("INSERT: csv重命名文件完成") def delete_files(directory_path): """ @@ -193,8 +183,8 @@ def delete_files(directory_path): :param directory_path: 要检查的目录的绝对路径 """ - # 计算10天前的日期 - ten_days_ago = datetime.now() - timedelta(days=10) + # 计算7天前的日期 + ten_days_ago = datetime.now() - timedelta(days=7) # 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名 date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})') @@ -206,21 +196,61 @@ def delete_files(directory_path): file_date_str = match.group(1) file_date = datetime.strptime(file_date_str, '%Y-%m-%d') - # 检查文件日期是否在10天前 + # 检查文件日期是否在7天前 if file_date <= ten_days_ago: file_path = os.path.join(directory_path, filename) os.remove(file_path) logger_cron.info("INSERT: 删除文件"+file_path) - def entry(): - # 合并文件 + #将大于500M的文件再次做合并 merge_entry() - base_path = get_clean_file_path() + merge_large_entry() + base_path = merge_large_file_path() files = get_all_files(base_path) logger_cron.info("INSERT:获取文件数量"+str(len(files))) #创建分区表 - create_fq_table() + create_pq_table() #数据入库 - insert_data(files) + insert_data(files,base_path) #删除文件 delete_files(base_path) + + + + + + +# #创建分区表 +# def create_pq_table2(): +# table_name = LOG_TABLE_NAME+'_'+'2024_08_19' +# start,end = '2024-08-19','2024-08-20' +# logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) +# sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs +# FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end) +# CFunction.execute(CPgSqlParam(sql)) + +# sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1} +# PARTITION OF {TABLE_NAME} +# FOR VALUES FROM (1) TO (2); +# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2} +# PARTITION OF {TABLE_NAME} +# FOR VALUES FROM (2) TO (3); +# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3} +# PARTITION OF {TABLE_NAME} +# FOR VALUES FROM (3) TO (4); +# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4} +# PARTITION OF {TABLE_NAME} +# FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name) + +# CFunction.execute(CPgSqlParam(sql_type)) +# logger_cron.info("INSERT:创建分区表完成") + + +# create_pq_table2() + + +# # logger_cron.info("INSERT:01") +# # csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-15.csv") +# logger_cron.info("INSERT:02") +# csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-18.csv") +# logger_cron.info("INSERT:03") \ No newline at end of file diff --git a/views/dashboard_views.py b/views/dashboard_views.py index 818623e..c5e5eff 100644 --- a/views/dashboard_views.py +++ b/views/dashboard_views.py @@ -11,18 +11,20 @@ import traceback,time,codecs from rest_framework import viewsets from rest_framework.decorators import list_route, detail_route -from uebaMetricsAnalysis.utils.ext_logging import logger +from uebaMetricsAnalysis.utils.ext_logging import logger,logger_audit from uebaMetricsAnalysis.lib.result import Result from uebaMetricsAnalysis.utils import config from uebaMetricsAnalysis.utils.dashboard_data_pg import entry -class DashboardViewSets(viewsets.GenericViewSet): - #写入大文件5M - def write_large_file(self,filename, data_list, chunk_size=1024*1024*5): - with codecs.open(filename, 'w', encoding='utf-8') as f: - for i in range(0, len(data_list), chunk_size): - chunk = data_list[i:i + chunk_size] - f.write(chunk) +from uebaMetricsAnalysis.utils.dashboard_detail_data import detail_data_entry +from uebaMetricsAnalysis.utils.dashboard_summary_data import summary_data_entry +class DashboardViewSets(viewsets.GenericViewSet): + # #写入大文件5M + # def write_large_file(self,filename, data_list, chunk_size=1024*1024*5): + # with codecs.open(filename, 'w', encoding='utf-8') as f: + # for i in range(0, len(data_list), chunk_size): + # chunk = data_list[i:i + chunk_size] + # f.write(chunk) @list_route(methods=['GET']) def get_summary_data_list(self,request): try: @@ -43,9 +45,9 @@ class DashboardViewSets(viewsets.GenericViewSet): action =body.get("action") username = request.session.get('username',"unknown user") params = body.get("params") - logger.info("Audit_Log:"+username+","+action +",params:"+json.dumps(params)) + logger_audit.info("Audit_Log:"+username+","+action +",params:"+json.dumps(params)) except Exception, e: - logger.info("Audit_Log:{}, err: {}, traceback: {}".format(username, str(e), traceback.format_exc())) + logger_audit.info("Audit_Log:{}, err: {}, traceback: {}".format(username, str(e), traceback.format_exc())) return Result.ok("ok") @list_route(methods=['GET']) @@ -54,7 +56,42 @@ class DashboardViewSets(viewsets.GenericViewSet): jsonfile_path = os.path.join(conf_path, 'defaultRule.json') rule_data = config.read_json_config(jsonfile_path) return Result.ok(rule_data) - + #获取主页面数据 + @list_route(methods=['GET']) + def get_summary_data(self,request): + try: + data_type = request.GET.get('type') + startTime =request.GET.get('startDate') + endTime = request.GET.get('endDate') + #1:ip,2:账号,3:接口,4:菜单 + logger.info("获取汇总数据:"+data_type+";" + startTime +";"+ endTime) + return Result.ok(summary_data_entry(startTime,endTime,data_type)) + except Exception, e: + logger.error(traceback.format_exc()) + return Result.failed("查询失败", str(e)) + + #获取明细页面数据 + @list_route(methods=['GET']) + def get_detail_data(self,request): + try: + data_type = request.GET.get('type') + startTime =request.GET.get('startDate') + endTime = request.GET.get('endDate') + keyWord = request.GET.get('keyWord') + #1:ip,2:账号,3:接口,4:菜单 + logger.info("获取明细数据:"+data_type+";" + startTime +";"+ endTime+";"+keyWord) + return Result.ok(detail_data_entry(startTime,endTime,data_type,keyWord)) + except Exception, e: + logger.error(traceback.format_exc()) + return Result.failed("查询失败", str(e)) + + #获取允许查询的最大天数 + @list_route(methods=['GET']) + def get_search_limit(self,request): + conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf') + jsonfile_path = os.path.join(conf_path, 'sys_config.json') + rule_data = config.read_json_config(jsonfile_path) + return Result.ok(rule_data) # if __name__ == '__main__': # get_summary_data_list \ No newline at end of file