#!/usr/bin/python #encoding=utf-8 # author: tangwy import re,os,json import codecs,csv from db2json import DBUtils from datetime import datetime, timedelta from ext_logging import logger_cron,get_clean_file_path from file_helper import read_large_json_file from file_merge import entry as merge_entry from appsUtils.confutil import ConfUtil from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$') LOG_TABLE_NAME = "ueba_analysis_schema.logs2" DATA_TYPE = { "IP": 1, "ACCOUNT": 2, "INTERFACE": 3, "MENU": 4, } # 获取当前日期并格式化为"年-月" def get_current_year_month(): now = datetime.now() return now.strftime("%Y_%m") # 获取当前月份的第一天并格式化为"年-月-日" def get_first_day_of_current_month(): now = datetime.now() first_day = now.replace(day=1) return first_day.strftime("%Y-%m-%d") # 获取当前日期,然后计算下个月的第一天 def get_first_day_of_next_month(): now = datetime.now() if now.month == 12: next_month = now.replace(year=now.year+1, month=1, day=1) else: next_month = now.replace(month=now.month+1, day=1) return next_month.strftime("%Y-%m-%d") #获取表名 def get_table_name(): year_month = get_current_year_month() return LOG_TABLE_NAME+'_'+ year_month #获取表区间 def get_table_data_range(): start= get_first_day_of_current_month() end = get_first_day_of_next_month() return start,end def get_all_files(path): # 列出所有包含匹配模式的文件名 files = [] for filename in os.listdir(path): if date_pattern.search(filename): #由于定时任务是凌晨3点执行 所以只处理昨天的数据,今天的不处理 if datetime.now().strftime("%Y-%m-%d")+".json" != filename: files.append({"filename": filename, "path": os.path.join(path,filename)}) return files def json_to_csvFile(json_data, csv_file): # 提取字段名 fields = json_data[0].keys() # 假设第一个元素包含所有可能的键 with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 writer = csv.DictWriter(csvfile, fieldnames=fields) writer.writeheader() for row in json_data: row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} writer.writerow(row) def csv_to_pg(sql): logger_cron.info("INSERT: 准备数据入库") confutil = ConfUtil() cur_pg_conf = confutil.getPostgresqlConf() cmd = """psql {} -U {} -w -c \"{}\"""".format(cur_pg_conf["database"],cur_pg_conf["username"],sql) logger_cron.info("INSERT: "+ cmd) rtn = os.popen(cmd) cmd_rtn = rtn.readlines() logger_cron.info("INSERT: "+ json.dumps(cmd_rtn)) logger_cron.info("INSERT: 数据入库完成") #数据入库 def insert_data(files): for itemFile in files: if os.path.exists(itemFile.get("path",'')): data =read_large_json_file(itemFile.get("path",'')) logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path','')) logger_cron.info("INSERT: 读取聚合文件完成") ip_list = data.get('ip', []) account_list = data.get('account', []) interface_list = data.get('interface', []) menu_list = data.get('menu', []) logger_cron.info("INSERT: IP维度 " +str(len(ip_list))) logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list))) logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list))) logger_cron.info("INSERT: MENU维度 " +str(len(menu_list))) basename, extension = os.path.splitext(itemFile.get('filename', '')) log_date = basename # print ("filename:"+log_date) records = [] for item in ip_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') account = item.get('account', '') jobnum = item.get('jobnum', '') count = item.get('count', 0) logdate = log_date datatype = DATA_TYPE.get("IP",1) interface = item.get('interface', '') keys= json.dumps([ip,jobnum]) records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface,"keys":keys}) for item in account_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') account = item.get('account', '') jobnum = item.get('jobnum', '') count = item.get('count', 0) logdate = log_date datatype = DATA_TYPE.get("ACCOUNT",2) interface = item.get('interface', '') keys= json.dumps([account,jobnum]) records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface,"keys":keys}) for item in interface_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') account = item.get('account', '') jobnum = item.get('jobnum', '') count = item.get('count', 0) logdate = log_date datatype = DATA_TYPE.get("INTERFACE",3) interface = item.get('interface', '') keys= json.dumps([interface,ip,account,jobnum]) records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface,"keys":keys}) for item in menu_list: menu = item.get('menu', '') ip = item.get('ip', '0.0.0.0') account = item.get('account', '') jobnum = item.get('jobnum', '') count = item.get('count', 0) logdate = log_date datatype = DATA_TYPE.get("MENU",4) interface = item.get('interface', '') keys= json.dumps([menu,ip,account,jobnum]) records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"data_type":datatype,"interface":interface,"keys":keys}) csv_file = get_clean_file_path()+"/"+log_date+".csv" logger_cron.info("INSERT: 开始写csv文件") json_to_csvFile(records,csv_file) sql = "\copy ueba_analysis_schema.logs2(count,account,logdate,data_type,ip,interface,menu,jobnum,keys) from '{}' with csv header DELIMITER ',';".format(csv_file) csv_to_pg(sql) #重命名文件 logger_cron.info(itemFile.get('path','')) logger_cron.info("done_"+itemFile.get('filename', '')) os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', '')) logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', '')) logger_cron.info("done_"+itemFile.get('filename', '')) os.rename(csv_file,get_clean_file_path()+"/done_"+log_date+".csv") logger_cron.info("INSERT: csv重命名文件完成") def entry(): # 合并文件 base_path = get_clean_file_path() files = get_all_files(base_path) logger_cron.info("INSERT:获取文件数量"+str(len(files))) #数据入库 insert_data(files) entry()