You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hbyd_ueba/utils/file_to_pg.py

233 lines
9.1 KiB

#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import re,os,json
import codecs,csv
from db2json import DBUtils
import psycopg2
from datetime import datetime, timedelta
from ext_logging import logger_cron,merge_large_file_path,logger_trace
from file_helper import read_large_json_file,write_large_file,get_file_content
from file_merge import merge_large_entry,entry as merge_entry
from dashboard_data_conversion import find_region_by_code,jobnum_region_dict
from appsUtils.confutil import ConfUtil
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$')
LOG_TABLE_NAME = "ueba_analysis_schema.logs"
FILED_NAMES = ['data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count']
FILED_NAMES_TUMP = ('data_type', 'menu','ip', 'account','jobnum', 'interface',"company",'logdate','count')
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
# 获取当前日期并格式化为"年-月"
def get_current_year_month():
table_name = (datetime.now()-timedelta(days=1)).strftime("%Y_%m_%d")
return table_name
# 获取当前月份的第一天并格式化为"年-月-日"
def get_first_day_of_current_month():
now = datetime.now()
first_day = now.replace(day=1)
return first_day.strftime("%Y-%m-%d")
# 获取当前日期,然后计算下个月的第一天
def get_first_day_of_next_month():
now = datetime.now()
if now.month == 12:
next_month = now.replace(year=now.year+1, month=1, day=1)
else:
next_month = now.replace(month=now.month+1, day=1)
return next_month.strftime("%Y-%m-%d")
#获取表名
def get_table_data_range_new():
year_month = get_current_year_month()
return LOG_TABLE_NAME+'_'+ year_month
def get_table_range():
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now()-timedelta(days=1)).strftime("%Y-%m-%d")
return start,end
#获取表区间
def get_table_data_range():
start= get_first_day_of_current_month()
end = get_first_day_of_next_month()
return start,end
#创建分区表
def create_pq_table():
table_name = get_table_data_range_new()
start,end = get_table_range()
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end)
CFunction.execute(CPgSqlParam(sql))
sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (1) TO (2);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (2) TO (3);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (3) TO (4);
CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4}
PARTITION OF {TABLE_NAME}
FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name)
CFunction.execute(CPgSqlParam(sql_type))
logger_cron.info("INSERT:创建分区表完成")
def get_all_files(path):
# 列出所有包含匹配模式的文件名
files = []
for filename in os.listdir(path):
if date_pattern.search(filename):
#由于定时任务是凌晨2点执行 所以只处理昨天的数据,今天的不处理
if datetime.now().strftime("%Y-%m-%d")+".json" != filename:
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
#写csv文件不写列名
def json_to_csvFile(json_data, csv_file):
with open(csv_file, 'wb') as csvfile: # 注意这里使用 'wb' 模式
writer = csv.DictWriter(csvfile, fieldnames=FILED_NAMES)
# writer.writeheader()
for row in json_data:
row = {k: v.encode('utf-8') if isinstance(v, unicode) else v for k, v in row.items()}
writer.writerow(row)
def copy_from_file(conn, table_name, file_path, columns):
with conn.cursor() as cur:
with open(file_path, 'r') as f:
cur.copy_from(f, table_name, sep=',', columns=columns)
conn.commit()
def csv_to_pg_new(file_path):
confUtil = ConfUtil()
pgInfo = confUtil.getPostgresqlConf()
conn = psycopg2.connect(host=pgInfo["ip"], database=pgInfo["database"], user=pgInfo["username"], password=pgInfo["password"])
table_name = LOG_TABLE_NAME
copy_from_file(conn,table_name,file_path,FILED_NAMES_TUMP)
conn.close()
def csv_to_pg(sql):
logger_cron.info("INSERT: 准备数据入库")
confutil = ConfUtil()
cur_pg_conf = confutil.getPostgresqlConf()
cmd = """psql {} -U {} -w -c \"{}\"""".format(cur_pg_conf["database"],cur_pg_conf["username"],sql)
logger_cron.info("INSERT: "+ cmd)
rtn = os.popen(cmd)
cmd_rtn = rtn.readlines()
logger_cron.info("INSERT: "+ json.dumps(cmd_rtn))
logger_cron.info("INSERT: 数据入库完成")
#数据入库
def insert_data(files,base_path):
for itemFile in files:
if os.path.exists(itemFile.get("path",'')):
logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path',''))
data =read_large_json_file(itemFile.get("path",''))
logger_cron.info("INSERT: 读取聚合文件完成")
logger_cron.info("INSERT: 总数据 " +str(len(data)))
#########问题排查
key=get_file_content()
if key in data:
logger_trace.info("filetopg:"+key+":"+str(data[key]))
basename, extension = os.path.splitext(itemFile.get('filename', ''))
log_date = basename
# print ("filename:"+log_date)
records = []
for key, value in data.iteritems():
#(datatype,menu,ip,account,jobnum,interface,company) count
#[str(datatype), menu, ip,account,jobnum,interface,company
v1,v2,v3,v4,v5,v6,v7 = key.split(",")
#menu
if len(v2)>50:
continue
#account
if len(v4)>30:
continue
#jobnum
if len(v5)>30:
continue
#interface
if len(v6)>300:
continue
res_str = ",".join([key,log_date, str(value)])
records.append(res_str)
res_str = "\n".join(records)
logger_cron.info("INSERT: 排除异常数据后总数据 " +str(len(records)))
csv_file = base_path+"/"+log_date+".csv"
logger_cron.info("INSERT: 开始写csv文件")
write_large_file(csv_file,res_str)
# json_to_csvFile(records,csv_file)
# sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file)
# csv_to_pg(sql)
logger_cron.info("INSERT: 准备数据入库")
csv_to_pg_new(csv_file)
logger_cron.info("INSERT: 完成数据入库")
#重命名文件json文件
logger_cron.info(itemFile.get('path',''))
logger_cron.info("done_"+itemFile.get('filename', ''))
os.rename(itemFile.get('path',''),base_path+"/done_"+itemFile.get('filename', ''))
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', ''))
#重命名文件csv文件
logger_cron.info("done_"+itemFile.get('filename', ''))
os.rename(csv_file,base_path+"/done_"+log_date+".csv")
logger_cron.info("INSERT: csv重命名文件完成")
def delete_files(directory_path):
"""
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件,
其中日期部分早于10天以前。
:param directory_path: 要检查的目录的绝对路径
"""
# 计算7天前的日期
ten_days_ago = datetime.now() - timedelta(days=7)
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})')
# 遍历目录中的文件
for filename in os.listdir(directory_path):
match = date_pattern.search(filename)
if match:
file_date_str = match.group(1)
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
# 检查文件日期是否在7天前
if file_date <= ten_days_ago:
file_path = os.path.join(directory_path, filename)
os.remove(file_path)
logger_cron.info("INSERT: 删除文件"+file_path)
def entry():
#将大于500M的文件再次做合并
merge_entry()
merge_large_entry()
base_path = merge_large_file_path()
files = get_all_files(base_path)
logger_cron.info("INSERT:获取文件数量"+str(len(files)))
#创建分区表
create_pq_table()
#数据入库
insert_data(files,base_path)
#删除文件
delete_files(base_path)