TANGWY 4 months ago
parent 10096ae4ec
commit 189d1c8cdd
  1. 6
      utils/dashboard_data_conversion.py
  2. 75
      utils/file_to_pg.py

@ -178,7 +178,7 @@ def interface_summary_data_format(interface_summary_data):
jobnum = interface_data["jobnum"]
account = interface_data["account"]
ip = interface_data["ip"]
interface_detail_dict_key = "{}{}".format(ip, account, jobnum)
interface_detail_dict_key = "{}{}{}".format(ip, account, jobnum)
# 更新统计数据
grouped_data[interface]["reqs"] += count
@ -208,12 +208,12 @@ def interface_summary_data_format(interface_summary_data):
}
for interface, data in grouped_data.items()
]
result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)[:500]
result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)[:20]
# 构建detail部分
result["detail"]["interface"] = {
company: sorted(data.values(), key=lambda x: x["req_frequency"], reverse=True)
for company, data in interface_detail_dict.items()
for company, data in interface_detail_dict.items()[:500]
}
return result

@ -2,12 +2,13 @@
#encoding=utf-8
# author: tangwy
import re,os,json
import codecs
import codecs,csv
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
from file_helper import read_large_json_file
from file_merge import entry as merge_entry
from appsUtils.confutil import ConfUtil
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
@ -59,10 +60,24 @@ def create_fq_table():
start,end = get_table_data_range()
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end)
FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end)
CFunction.execute(CPgSqlParam(sql))
logger_cron.info("INSERT:创建分区表完成")
sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (1) TO (2);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (2) TO (3);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (3) TO (4);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name)
CFunction.execute(CPgSqlParam(sql_type))
logger_cron.info("INSERT:创建分区表完成")
def get_all_files(path):
# 列出所有包含匹配模式的文件名
files = []
@ -73,6 +88,27 @@ def get_all_files(path):
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
def json_to_csvFile(json_data, csv_file):
# 提取字段名
fields = json_data[0].keys() # 假设第一个元素包含所有可能的键
with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
for row in json_data:
row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()}
writer.writerow(row)
def csv_to_pg(sql):
logger_cron.info("INSERT: 准备数据入库")
confutil = ConfUtil()
cur_pg_conf = confutil.getPostgresqlConf()
cmd = """psql {} -U {} -w -c \"{}\"""".format(cur_pg_conf["database"],cur_pg_conf["username"],sql)
logger_cron.info("INSERT: "+ cmd)
rtn = os.popen(cmd)
cmd_rtn = rtn.readlines()
logger_cron.info("INSERT: "+ json.dumps(cmd_rtn))
logger_cron.info("INSERT: 数据入库完成")
#数据入库
def insert_data(files):
for itemFile in files:
@ -103,7 +139,7 @@ def insert_data(files):
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
@ -113,7 +149,7 @@ def insert_data(files):
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
@ -123,7 +159,7 @@ def insert_data(files):
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
@ -133,23 +169,13 @@ def insert_data(files):
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
data_list=[]
for item in records:
data_list.append(item.get('menu', ''))
data_list.append(item.get('ip', ''))
data_list.append(item.get('account', ''))
data_list.append(item.get('jobnum', ''))
data_list.append(item.get('count', ''))
data_list.append(item.get('logdate', ''))
data_list.append(item.get('datatype', ''))
data_list.append(item.get('interface', ''))
sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface)
VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records]))
CFunction.execute(CPgSqlParam(sql, params=data_list))
logger_cron.info("INSERT: 数据插入完成")
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
csv_file = get_clean_file_path()+"/"+log_date+".csv"
logger_cron.info("INSERT: 开始写csv文件")
json_to_csvFile(records,csv_file)
sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file)
csv_to_pg(sql)
#重命名文件
logger_cron.info(itemFile.get('path',''))
@ -157,6 +183,9 @@ def insert_data(files):
os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', ''))
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', ''))
logger_cron.info("done_"+itemFile.get('filename', ''))
os.rename(csv_file,get_clean_file_path()+"/done_"+log_date+".csv")
logger_cron.info("INSERT: csv重命名文件完成")
def delete_files(directory_path):
"""
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件

Loading…
Cancel
Save