From e0cbf716d5ab4ea9809510b272e1717cd9c5f0a4 Mon Sep 17 00:00:00 2001 From: TANGWY Date: Tue, 6 Aug 2024 10:47:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=BF=98=E5=8E=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/file_helper.py | 88 +++++++------------------------------------- utils/file_merge.py | 37 ++++++------------- utils/file_to_pg.py | 64 ++++++++++++++++++++++++++++---- 3 files changed, 81 insertions(+), 108 deletions(-) diff --git a/utils/file_helper.py b/utils/file_helper.py index 0f4efb7..3d4303a 100644 --- a/utils/file_helper.py +++ b/utils/file_helper.py @@ -2,18 +2,11 @@ #encoding=utf-8 # author: tangwy import re,os,json -import codecs,csv +import codecs from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path -DATA_TYPE = { - "IP": 1, - "ACCOUNT": 2, - "INTERFACE": 3, - "MENU": 4, -} - #写入大文件5M def write_large_file(filename, data_list, chunk_size=1024*1024*5): with codecs.open(filename, 'w', encoding='utf-8') as f: @@ -22,13 +15,17 @@ def write_large_file(filename, data_list, chunk_size=1024*1024*5): f.write(chunk) #读取大文件 -def read_large_json_file(filename): - rows = [] - with open(filename, 'rb') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - rows.append(dict(row)) - return json.dumps(rows) +def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据 + json_object = '' + with codecs.open(filename, 'r', encoding='utf-8') as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + json_object += chunk + + data = json.loads(json_object) + return data #删除文件 def delete_frile(file_path): @@ -85,63 +82,4 @@ def merge_data(datasets): ] return aggregated_data - -# json 转 csv -def json_to_csv_data(data,log_date): - ip_list = data.get('ip', []) - account_list = data.get('account', []) - interface_list = data.get('interface', []) - menu_list = data.get('menu', []) - - records = [] - for item in ip_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("IP",1) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in account_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("ACCOUNT",2) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in interface_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("INTERFACE",3) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - for item in menu_list: - menu = item.get('menu', '') - ip = item.get('ip', '0.0.0.0') - account = item.get('account', '') - jobnum = item.get('jobnum', '') - count = item.get('count', 0) - logdate = log_date - datatype = DATA_TYPE.get("MENU",4) - interface = item.get('interface', '') - records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) - return records - -def write_csv(json_data, csv_file): - # 提取字段名 - fields = json_data[0].keys() # 假设第一个元素包含所有可能的键 - with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for row in json_data: - row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} - writer.writerow(row) + \ No newline at end of file diff --git a/utils/file_merge.py b/utils/file_merge.py index 30909e7..a732ed0 100644 --- a/utils/file_merge.py +++ b/utils/file_merge.py @@ -6,10 +6,10 @@ import codecs from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path -from file_helper import read_large_json_file,write_large_file,merge_data,delete_frile,json_to_csv_data,write_csv +from file_helper import read_large_json_file,write_large_file,merge_data,delete_frile from collections import defaultdict -date_pattern = re.compile(r'\d{8}-\d{4}\.csv') +date_pattern = re.compile(r'\d{8}-\d{4}\.json') def get_all_files(path): # 列出所有包含匹配模式的文件名 @@ -35,42 +35,27 @@ def merge_all_files(file_dict,base_path): # 遍历字典中的每一个键值对 for date_str, files in file_dict.items(): #20240721 - root_file_path = "{}-{}-{}.csv".format(date_str[:4], date_str[4:6], date_str[6:]) + root_file_path = "{}-{}-{}.json".format(date_str[:4], date_str[4:6], date_str[6:]) full_root_file_path = os.path.join(base_path,root_file_path) if len(files)>0: - + file_objs=[] + if os.path.exists(full_root_file_path): + root_data = read_large_json_file(full_root_file_path) + file_objs.append(root_data) + file_full_path = [] - aggregated_data = {} for filename in files: - file_objs=[] #20240721-0170.json full_path = os.path.join(base_path,filename) file_full_path.append(full_path) logger_cron.info("FILE_MERGE: 准备读取文件"+full_path) - tmp_data =read_large_json_file(full_path) file_objs.append(tmp_data) - if aggregated_data: - file_objs.append(aggregated_data) - - aggregated_data = merge_data(file_objs) - - # 最后合并基础文件 - result_data_array = [] - if os.path.exists(full_root_file_path): - root_data = read_large_json_file(full_root_file_path) - result_data_array.append(root_data) - - if aggregated_data: - result_data_array.append(aggregated_data) - - logger_cron.info("FILE_MERGE: 准备合并文件") - data = merge_data(result_data_array) + logger_cron.info("FILE_MERGE: 准备合并文件") + data = merge_data(file_objs) logger_cron.info("FILE_MERGE: 准备写入合并的文件") - - csv_data = json_to_csv_data(data,""), - write_csv(csv_data,full_root_file_path) + write_large_file(full_root_file_path,json.dumps(data)) logger_cron.info("FILE_MERGE: 写入合并文件完成") #准备删除合并文件 for del_file in file_full_path: diff --git a/utils/file_to_pg.py b/utils/file_to_pg.py index f08e244..4193feb 100644 --- a/utils/file_to_pg.py +++ b/utils/file_to_pg.py @@ -6,13 +6,13 @@ import codecs,csv from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path -from file_helper import read_large_json_file,json_to_csv_data,write_csv +from file_helper import read_large_json_file from file_merge import entry as merge_entry from appsUtils.confutil import ConfUtil from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam -date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.csv$') +date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$') LOG_TABLE_NAME = "ueba_analysis_schema.logs" @@ -84,10 +84,20 @@ def get_all_files(path): for filename in os.listdir(path): if date_pattern.search(filename): #由于定时任务是凌晨3点执行 所以只处理昨天的数据,今天的不处理 - if datetime.now().strftime("%Y-%m-%d")+".csv" != filename: + if datetime.now().strftime("%Y-%m-%d")+".json" != filename: files.append({"filename": filename, "path": os.path.join(path,filename)}) return files +def json_to_csvFile(json_data, csv_file): + # 提取字段名 + fields = json_data[0].keys() # 假设第一个元素包含所有可能的键 + with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 + writer = csv.DictWriter(csvfile, fieldnames=fields) + writer.writeheader() + for row in json_data: + row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} + writer.writerow(row) + def csv_to_pg(sql): logger_cron.info("INSERT: 准备数据入库") confutil = ConfUtil() @@ -118,12 +128,52 @@ def insert_data(files): basename, extension = os.path.splitext(itemFile.get('filename', '')) log_date = basename - - csv_file = get_clean_file_path()+"/"+log_date+".csv" - records = json_to_csv_data(data,log_date) + # print ("filename:"+log_date) + records = [] + for item in ip_list: + menu = item.get('menu', '') + ip = item.get('ip', '0.0.0.0') + account = item.get('account', '') + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + logdate = log_date + datatype = DATA_TYPE.get("IP",1) + interface = item.get('interface', '') + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) + for item in account_list: + menu = item.get('menu', '') + ip = item.get('ip', '0.0.0.0') + account = item.get('account', '') + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + logdate = log_date + datatype = DATA_TYPE.get("ACCOUNT",2) + interface = item.get('interface', '') + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) + for item in interface_list: + menu = item.get('menu', '') + ip = item.get('ip', '0.0.0.0') + account = item.get('account', '') + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + logdate = log_date + datatype = DATA_TYPE.get("INTERFACE",3) + interface = item.get('interface', '') + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) + for item in menu_list: + menu = item.get('menu', '') + ip = item.get('ip', '0.0.0.0') + account = item.get('account', '') + jobnum = item.get('jobnum', '') + count = item.get('count', 0) + logdate = log_date + datatype = DATA_TYPE.get("MENU",4) + interface = item.get('interface', '') + records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface}) + csv_file = get_clean_file_path()+"/"+log_date+".csv" logger_cron.info("INSERT: 开始写csv文件") - write_csv(records,csv_file) + json_to_csvFile(records,csv_file) sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file) csv_to_pg(sql)