diff --git a/utils/dashboard_data_conversion.py b/utils/dashboard_data_conversion.py index c50c249..dacb709 100644 --- a/utils/dashboard_data_conversion.py +++ b/utils/dashboard_data_conversion.py @@ -178,7 +178,7 @@ def interface_summary_data_format(interface_summary_data): jobnum = interface_data["jobnum"] account = interface_data["account"] ip = interface_data["ip"] - interface_detail_dict_key = "{}{}".format(ip, account, jobnum) + interface_detail_dict_key = "{}{}{}".format(ip, account, jobnum) # 更新统计数据 grouped_data[interface]["reqs"] += count @@ -208,12 +208,12 @@ def interface_summary_data_format(interface_summary_data): } for interface, data in grouped_data.items() ] - result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)[:500] + result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)[:20] # 构建detail部分 result["detail"]["interface"] = { company: sorted(data.values(), key=lambda x: x["req_frequency"], reverse=True) - for company, data in interface_detail_dict.items() + for company, data in interface_detail_dict.items()[:500] } return result diff --git a/utils/file_to_pg.py b/utils/file_to_pg.py index 1cec781..4193feb 100644 --- a/utils/file_to_pg.py +++ b/utils/file_to_pg.py @@ -2,12 +2,13 @@ #encoding=utf-8 # author: tangwy import re,os,json -import codecs +import codecs,csv from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path from file_helper import read_large_json_file from file_merge import entry as merge_entry +from appsUtils.confutil import ConfUtil from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam @@ -59,10 +60,24 @@ def create_fq_table(): start,end = get_table_data_range() logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs -FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end) +FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end) CFunction.execute(CPgSqlParam(sql)) - logger_cron.info("INSERT:创建分区表完成") + sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1} +PARTITION OF {TABLE_NAME} +FOR VALUES FROM (1) TO (2); +CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2} +PARTITION OF {TABLE_NAME} +FOR VALUES FROM (2) TO (3); +CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3} +PARTITION OF {TABLE_NAME} +FOR VALUES FROM (3) TO (4); +CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4} +PARTITION OF {TABLE_NAME} +FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name) + + CFunction.execute(CPgSqlParam(sql_type)) + logger_cron.info("INSERT:创建分区表完成") def get_all_files(path): # 列出所有包含匹配模式的文件名 files = [] @@ -73,6 +88,27 @@ def get_all_files(path): files.append({"filename": filename, "path": os.path.join(path,filename)}) return files +def json_to_csvFile(json_data, csv_file): + # 提取字段名 + fields = json_data[0].keys() # 假设第一个元素包含所有可能的键 + with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 + writer = csv.DictWriter(csvfile, fieldnames=fields) + writer.writeheader() + for row in json_data: + row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} + writer.writerow(row) + +def csv_to_pg(sql): + logger_cron.info("INSERT: 准备数据入库") + confutil = ConfUtil() + cur_pg_conf = confutil.getPostgresqlConf() + cmd = """psql {} -U {} -w -c \"{}\"""".format(cur_pg_conf["database"],cur_pg_conf["username"],sql) + logger_cron.info("INSERT: "+ cmd) + rtn = os.popen(cmd) + cmd_rtn = rtn.readlines() + logger_cron.info("INSERT: "+ json.dumps(cmd_rtn)) + logger_cron.info("INSERT: 数据入库完成") + #数据入库 def insert_data(files): for itemFile in files: @@ -103,7 +139,7 @@ def insert_data(files): logdate = log_date datatype = DATA_TYPE.get("IP",1) interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) for item in account_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') @@ -113,7 +149,7 @@ def insert_data(files): logdate = log_date datatype = DATA_TYPE.get("ACCOUNT",2) interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) for item in interface_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') @@ -123,7 +159,7 @@ def insert_data(files): logdate = log_date datatype = DATA_TYPE.get("INTERFACE",3) interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) for item in menu_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') @@ -133,23 +169,13 @@ def insert_data(files): logdate = log_date datatype = DATA_TYPE.get("MENU",4) interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface}) - - data_list=[] - for item in records: - data_list.append(item.get('menu', '')) - data_list.append(item.get('ip', '')) - data_list.append(item.get('account', '')) - data_list.append(item.get('jobnum', '')) - data_list.append(item.get('count', '')) - data_list.append(item.get('logdate', '')) - data_list.append(item.get('datatype', '')) - data_list.append(item.get('interface', '')) - - sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface) - VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records])) - CFunction.execute(CPgSqlParam(sql, params=data_list)) - logger_cron.info("INSERT: 数据插入完成") + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) + + csv_file = get_clean_file_path()+"/"+log_date+".csv" + logger_cron.info("INSERT: 开始写csv文件") + json_to_csvFile(records,csv_file) + sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file) + csv_to_pg(sql) #重命名文件 logger_cron.info(itemFile.get('path','')) @@ -157,6 +183,9 @@ def insert_data(files): os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', '')) logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', '')) + logger_cron.info("done_"+itemFile.get('filename', '')) + os.rename(csv_file,get_clean_file_path()+"/done_"+log_date+".csv") + logger_cron.info("INSERT: csv重命名文件完成") def delete_files(directory_path): """ 删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件,