代码还原

dev
TANGWY 6 months ago
parent 1d0236c5d6
commit e0cbf716d5
  1. 88
      utils/file_helper.py
  2. 37
      utils/file_merge.py
  3. 64
      utils/file_to_pg.py

@ -2,18 +2,11 @@
#encoding=utf-8
# author: tangwy
import re,os,json
import codecs,csv
import codecs
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
#写入大文件5M
def write_large_file(filename, data_list, chunk_size=1024*1024*5):
with codecs.open(filename, 'w', encoding='utf-8') as f:
@ -22,13 +15,17 @@ def write_large_file(filename, data_list, chunk_size=1024*1024*5):
f.write(chunk)
#读取大文件
def read_large_json_file(filename):
rows = []
with open(filename, 'rb') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(dict(row))
return json.dumps(rows)
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
#删除文件
def delete_frile(file_path):
@ -85,63 +82,4 @@ def merge_data(datasets):
]
return aggregated_data
# json 转 csv
def json_to_csv_data(data,log_date):
ip_list = data.get('ip', [])
account_list = data.get('account', [])
interface_list = data.get('interface', [])
menu_list = data.get('menu', [])
records = []
for item in ip_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
return records
def write_csv(json_data, csv_file):
# 提取字段名
fields = json_data[0].keys() # 假设第一个元素包含所有可能的键
with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
for row in json_data:
row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()}
writer.writerow(row)

@ -6,10 +6,10 @@ import codecs
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
from file_helper import read_large_json_file,write_large_file,merge_data,delete_frile,json_to_csv_data,write_csv
from file_helper import read_large_json_file,write_large_file,merge_data,delete_frile
from collections import defaultdict
date_pattern = re.compile(r'\d{8}-\d{4}\.csv')
date_pattern = re.compile(r'\d{8}-\d{4}\.json')
def get_all_files(path):
# 列出所有包含匹配模式的文件名
@ -35,42 +35,27 @@ def merge_all_files(file_dict,base_path):
# 遍历字典中的每一个键值对
for date_str, files in file_dict.items():
#20240721
root_file_path = "{}-{}-{}.csv".format(date_str[:4], date_str[4:6], date_str[6:])
root_file_path = "{}-{}-{}.json".format(date_str[:4], date_str[4:6], date_str[6:])
full_root_file_path = os.path.join(base_path,root_file_path)
if len(files)>0:
file_objs=[]
if os.path.exists(full_root_file_path):
root_data = read_large_json_file(full_root_file_path)
file_objs.append(root_data)
file_full_path = []
aggregated_data = {}
for filename in files:
file_objs=[]
#20240721-0170.json
full_path = os.path.join(base_path,filename)
file_full_path.append(full_path)
logger_cron.info("FILE_MERGE: 准备读取文件"+full_path)
tmp_data =read_large_json_file(full_path)
file_objs.append(tmp_data)
if aggregated_data:
file_objs.append(aggregated_data)
aggregated_data = merge_data(file_objs)
# 最后合并基础文件
result_data_array = []
if os.path.exists(full_root_file_path):
root_data = read_large_json_file(full_root_file_path)
result_data_array.append(root_data)
if aggregated_data:
result_data_array.append(aggregated_data)
logger_cron.info("FILE_MERGE: 准备合并文件")
data = merge_data(result_data_array)
logger_cron.info("FILE_MERGE: 准备合并文件")
data = merge_data(file_objs)
logger_cron.info("FILE_MERGE: 准备写入合并的文件")
csv_data = json_to_csv_data(data,""),
write_csv(csv_data,full_root_file_path)
write_large_file(full_root_file_path,json.dumps(data))
logger_cron.info("FILE_MERGE: 写入合并文件完成")
#准备删除合并文件
for del_file in file_full_path:

@ -6,13 +6,13 @@ import codecs,csv
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
from file_helper import read_large_json_file,json_to_csv_data,write_csv
from file_helper import read_large_json_file
from file_merge import entry as merge_entry
from appsUtils.confutil import ConfUtil
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.csv$')
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$')
LOG_TABLE_NAME = "ueba_analysis_schema.logs"
@ -84,10 +84,20 @@ def get_all_files(path):
for filename in os.listdir(path):
if date_pattern.search(filename):
#由于定时任务是凌晨3点执行 所以只处理昨天的数据,今天的不处理
if datetime.now().strftime("%Y-%m-%d")+".csv" != filename:
if datetime.now().strftime("%Y-%m-%d")+".json" != filename:
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
def json_to_csvFile(json_data, csv_file):
# 提取字段名
fields = json_data[0].keys() # 假设第一个元素包含所有可能的键
with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
for row in json_data:
row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()}
writer.writerow(row)
def csv_to_pg(sql):
logger_cron.info("INSERT: 准备数据入库")
confutil = ConfUtil()
@ -118,12 +128,52 @@ def insert_data(files):
basename, extension = os.path.splitext(itemFile.get('filename', ''))
log_date = basename
csv_file = get_clean_file_path()+"/"+log_date+".csv"
records = json_to_csv_data(data,log_date)
# print ("filename:"+log_date)
records = []
for item in ip_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface})
csv_file = get_clean_file_path()+"/"+log_date+".csv"
logger_cron.info("INSERT: 开始写csv文件")
write_csv(records,csv_file)
json_to_csvFile(records,csv_file)
sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file)
csv_to_pg(sql)

Loading…
Cancel
Save