#!/usr/bin/python #encoding=utf-8 # author: tangwy import re,os,json import codecs,csv from db2json import DBUtils import psycopg2 from datetime import datetime, timedelta from ext_logging import logger_cron,merge_large_file_path,logger_trace from file_helper import read_large_json_file,write_large_file,get_file_content from file_merge import merge_large_entry,entry as merge_entry from dashboard_data_conversion import find_region_by_code,jobnum_region_dict from appsUtils.confutil import ConfUtil from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$') LOG_TABLE_NAME = "ueba_analysis_schema.logs" FILED_NAMES = ['data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count'] FILED_NAMES_TUMP = ('data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count') DATA_TYPE = { "IP": 1, "ACCOUNT": 2, "INTERFACE": 3, "MENU": 4, } # 获取当前日期并格式化为"年-月" def get_current_year_month(): table_name = (datetime.now()-timedelta(days=1)).strftime("%Y_%m_%d") return table_name # 获取当前月份的第一天并格式化为"年-月-日" def get_first_day_of_current_month(): now = datetime.now() first_day = now.replace(day=1) return first_day.strftime("%Y-%m-%d") # 获取当前日期,然后计算下个月的第一天 def get_first_day_of_next_month(): now = datetime.now() if now.month == 12: next_month = now.replace(year=now.year+1, month=1, day=1) else: next_month = now.replace(month=now.month+1, day=1) return next_month.strftime("%Y-%m-%d") #获取表名 def get_table_data_range_new(): year_month = get_current_year_month() return LOG_TABLE_NAME+'_'+ year_month def get_table_range(): end = datetime.now().strftime("%Y-%m-%d") start = (datetime.now()-timedelta(days=1)).strftime("%Y-%m-%d") return start,end #获取表区间 def get_table_data_range(): start= get_first_day_of_current_month() end = get_first_day_of_next_month() return start,end #创建分区表 def create_pq_table(): table_name = get_table_data_range_new() start,end = get_table_range() logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end) CFunction.execute(CPgSqlParam(sql)) sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1} PARTITION OF {TABLE_NAME} FOR VALUES FROM (1) TO (2); CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2} PARTITION OF {TABLE_NAME} FOR VALUES FROM (2) TO (3); CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3} PARTITION OF {TABLE_NAME} FOR VALUES FROM (3) TO (4); CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4} PARTITION OF {TABLE_NAME} FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name) CFunction.execute(CPgSqlParam(sql_type)) logger_cron.info("INSERT:创建分区表完成") def get_all_files(path): # 列出所有包含匹配模式的文件名 files = [] for filename in os.listdir(path): if date_pattern.search(filename): #由于定时任务是凌晨2点执行 所以只处理昨天的数据,今天的不处理 if datetime.now().strftime("%Y-%m-%d")+".json" != filename: files.append({"filename": filename, "path": os.path.join(path,filename)}) return files #写csv文件不写列名 def json_to_csvFile(json_data, csv_file): with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式 writer = csv.DictWriter(csvfile, fieldnames=FILED_NAMES) # writer.writeheader() for row in json_data: row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()} writer.writerow(row) def copy_from_file(conn, table_name, file_path, columns): with conn.cursor() as cur: with open(file_path, 'r') as f: cur.copy_from(f, table_name, sep=',', columns=columns) conn.commit() def csv_to_pg_new(file_path): confUtil = ConfUtil() pgInfo = confUtil.getPostgresqlConf() conn = psycopg2.connect(host=pgInfo["ip"], database=pgInfo["database"], user=pgInfo["username"], password=pgInfo["password"]) table_name = LOG_TABLE_NAME copy_from_file(conn,table_name,file_path,FILED_NAMES_TUMP) conn.close() def csv_to_pg(sql): logger_cron.info("INSERT: 准备数据入库") confutil = ConfUtil() cur_pg_conf = confutil.getPostgresqlConf() cmd = """psql {} -U {} -w -c \"{}\"""".format(cur_pg_conf["database"],cur_pg_conf["username"],sql) logger_cron.info("INSERT: "+ cmd) rtn = os.popen(cmd) cmd_rtn = rtn.readlines() logger_cron.info("INSERT: "+ json.dumps(cmd_rtn)) logger_cron.info("INSERT: 数据入库完成") #数据入库 def insert_data(files,base_path): for itemFile in files: if os.path.exists(itemFile.get("path",'')): logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path','')) data =read_large_json_file(itemFile.get("path",'')) logger_cron.info("INSERT: 读取聚合文件完成") logger_cron.info("INSERT: 总数据 " +str(len(data))) #########问题排查 key=get_file_content() if key in data: logger_trace.info("filetopg:"+key+":"+str(data[key])) basename, extension = os.path.splitext(itemFile.get('filename', '')) log_date = basename # print ("filename:"+log_date) records = [] for key, value in data.iteritems(): #(datatype,menu,ip,account,jobnum,interface,company) count #[str(datatype), menu, ip,account,jobnum,interface,company v1,v2,v3,v4,v5,v6,v7 = key.split(",") #menu if len(v2)>50: continue #account if len(v4)>30: continue #jobnum if len(v5)>30: continue #interface if len(v6)>300: continue res_str = ",".join([key,log_date, str(value)]) records.append(res_str) res_str = "\n".join(records) logger_cron.info("INSERT: 排除异常数据后总数据 " +str(len(records))) csv_file = base_path+"/"+log_date+".csv" logger_cron.info("INSERT: 开始写csv文件") write_large_file(csv_file,res_str) # json_to_csvFile(records,csv_file) # sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file) # csv_to_pg(sql) logger_cron.info("INSERT: 准备数据入库") csv_to_pg_new(csv_file) logger_cron.info("INSERT: 完成数据入库") #重命名文件json文件 logger_cron.info(itemFile.get('path','')) logger_cron.info("done_"+itemFile.get('filename', '')) os.rename(itemFile.get('path',''),base_path+"/done_"+itemFile.get('filename', '')) logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', '')) #重命名文件csv文件 logger_cron.info("done_"+itemFile.get('filename', '')) os.rename(csv_file,base_path+"/done_"+log_date+".csv") logger_cron.info("INSERT: csv重命名文件完成") def delete_files(directory_path): """ 删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件, 其中日期部分早于10天以前。 :param directory_path: 要检查的目录的绝对路径 """ # 计算7天前的日期 ten_days_ago = datetime.now() - timedelta(days=7) # 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名 date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})') # 遍历目录中的文件 for filename in os.listdir(directory_path): match = date_pattern.search(filename) if match: file_date_str = match.group(1) file_date = datetime.strptime(file_date_str, '%Y-%m-%d') # 检查文件日期是否在7天前 if file_date <= ten_days_ago: file_path = os.path.join(directory_path, filename) os.remove(file_path) logger_cron.info("INSERT: 删除文件"+file_path) def entry(): #将大于500M的文件再次做合并 merge_entry() merge_large_entry() base_path = merge_large_file_path() files = get_all_files(base_path) logger_cron.info("INSERT:获取文件数量"+str(len(files))) #创建分区表 create_pq_table() #数据入库 insert_data(files,base_path) #删除文件 delete_files(base_path)