#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import re , os , json
import codecs , csv
from db2json import DBUtils
import psycopg2
from datetime import datetime , timedelta
from ext_logging import logger_cron , merge_large_file_path , logger_trace
from file_helper import read_large_json_file , write_large_file , get_file_content
from file_merge import merge_large_entry , entry as merge_entry
from dashboard_data_conversion import find_region_by_code , jobnum_region_dict
from appsUtils . confutil import ConfUtil
from dataInterface . functions import CFunction
from dataInterface . db . params import CPgSqlParam
date_pattern = re . compile ( r ' ^ \ d {4} - \ d {2} - \ d {2} .json$ ' )
LOG_TABLE_NAME = " ueba_analysis_schema.logs "
FILED_NAMES = [ ' data_type ' , ' menu ' , ' ip ' , ' account ' , ' jobnum ' , ' interface ' , " company " , ' logdate ' , ' count ' ]
FILED_NAMES_TUMP = ( ' data_type ' , ' menu ' , ' ip ' , ' account ' , ' jobnum ' , ' interface ' , " company " , ' logdate ' , ' count ' )
DATA_TYPE = {
" IP " : 1 ,
" ACCOUNT " : 2 ,
" INTERFACE " : 3 ,
" MENU " : 4 ,
}
# 获取当前日期并格式化为"年-月"
def get_current_year_month ( ) :
table_name = ( datetime . now ( ) - timedelta ( days = 1 ) ) . strftime ( " % Y_ % m_ %d " )
return table_name
# 获取当前月份的第一天并格式化为"年-月-日"
def get_first_day_of_current_month ( ) :
now = datetime . now ( )
first_day = now . replace ( day = 1 )
return first_day . strftime ( " % Y- % m- %d " )
# 获取当前日期,然后计算下个月的第一天
def get_first_day_of_next_month ( ) :
now = datetime . now ( )
if now . month == 12 :
next_month = now . replace ( year = now . year + 1 , month = 1 , day = 1 )
else :
next_month = now . replace ( month = now . month + 1 , day = 1 )
return next_month . strftime ( " % Y- % m- %d " )
#获取表名
def get_table_data_range_new ( ) :
year_month = get_current_year_month ( )
return LOG_TABLE_NAME + ' _ ' + year_month
def get_table_range ( ) :
end = datetime . now ( ) . strftime ( " % Y- % m- %d " )
start = ( datetime . now ( ) - timedelta ( days = 1 ) ) . strftime ( " % Y- % m- %d " )
return start , end
#获取表区间
def get_table_data_range ( ) :
start = get_first_day_of_current_month ( )
end = get_first_day_of_next_month ( )
return start , end
#创建分区表
def create_pq_table ( ) :
table_name = get_table_data_range_new ( )
start , end = get_table_range ( )
logger_cron . info ( " INSERT:准备创建分区表 {} , {} , {} " . format ( table_name , start , end ) )
sql = """ CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
FOR VALUES FROM ( ' {START} ' ) TO ( ' {END} ' ) PARTITION BY RANGE ( data_type ) ; """ .format(TABLE_NAME=table_name,START = start,END=end)
CFunction . execute ( CPgSqlParam ( sql ) )
sql_type = """ CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1}
PARTITION OF { TABLE_NAME }
FOR VALUES FROM ( 1 ) TO ( 2 ) ;
CREATE TABLE if not EXISTS { TABLE_NAME_TYPE2 }
PARTITION OF { TABLE_NAME }
FOR VALUES FROM ( 2 ) TO ( 3 ) ;
CREATE TABLE if not EXISTS { TABLE_NAME_TYPE3 }
PARTITION OF { TABLE_NAME }
FOR VALUES FROM ( 3 ) TO ( 4 ) ;
CREATE TABLE if not EXISTS { TABLE_NAME_TYPE4 }
PARTITION OF { TABLE_NAME }
FOR VALUES FROM ( 4 ) TO ( 5 ) ; """ .format(TABLE_NAME_TYPE1=table_name+ " _type_1 " ,TABLE_NAME_TYPE2=table_name+ " _type_2 " ,TABLE_NAME_TYPE3=table_name+ " _type_3 " ,TABLE_NAME_TYPE4=table_name+ " _type_4 " ,TABLE_NAME=table_name)
CFunction . execute ( CPgSqlParam ( sql_type ) )
logger_cron . info ( " INSERT:创建分区表完成 " )
def get_all_files ( path ) :
# 列出所有包含匹配模式的文件名
files = [ ]
for filename in os . listdir ( path ) :
if date_pattern . search ( filename ) :
#由于定时任务是凌晨2点执行 所以只处理昨天的数据,今天的不处理
if datetime . now ( ) . strftime ( " % Y- % m- %d " ) + " .json " != filename :
files . append ( { " filename " : filename , " path " : os . path . join ( path , filename ) } )
return files
#写csv文件不写列名
def json_to_csvFile ( json_data , csv_file ) :
with open ( csv_file , ' wb ' ) as csvfile : # 注意这里使用 'wb' 模式
writer = csv . DictWriter ( csvfile , fieldnames = FILED_NAMES )
# writer.writeheader()
for row in json_data :
row = { k : v . encode ( ' utf-8 ' ) if isinstance ( v , unicode ) else v for k , v in row . items ( ) }
writer . writerow ( row )
def copy_from_file ( conn , table_name , file_path , columns ) :
with conn . cursor ( ) as cur :
with open ( file_path , ' r ' ) as f :
cur . copy_from ( f , table_name , sep = ' , ' , columns = columns )
conn . commit ( )
def csv_to_pg_new ( file_path ) :
confUtil = ConfUtil ( )
pgInfo = confUtil . getPostgresqlConf ( )
conn = psycopg2 . connect ( host = pgInfo [ " ip " ] , database = pgInfo [ " database " ] , user = pgInfo [ " username " ] , password = pgInfo [ " password " ] )
table_name = LOG_TABLE_NAME
copy_from_file ( conn , table_name , file_path , FILED_NAMES_TUMP )
conn . close ( )
def csv_to_pg ( sql ) :
logger_cron . info ( " INSERT: 准备数据入库 " )
confutil = ConfUtil ( )
cur_pg_conf = confutil . getPostgresqlConf ( )
cmd = """ psql {} -U {} -w -c \" {} \" """ . format ( cur_pg_conf [ " database " ] , cur_pg_conf [ " username " ] , sql )
logger_cron . info ( " INSERT: " + cmd )
rtn = os . popen ( cmd )
cmd_rtn = rtn . readlines ( )
logger_cron . info ( " INSERT: " + json . dumps ( cmd_rtn ) )
logger_cron . info ( " INSERT: 数据入库完成 " )
#数据入库
def insert_data ( files , base_path ) :
for itemFile in files :
if os . path . exists ( itemFile . get ( " path " , ' ' ) ) :
logger_cron . info ( " INSERT: 准备读取聚合文件: " + itemFile . get ( ' path ' , ' ' ) )
data = read_large_json_file ( itemFile . get ( " path " , ' ' ) )
logger_cron . info ( " INSERT: 读取聚合文件完成 " )
logger_cron . info ( " INSERT: 总数据 " + str ( len ( data ) ) )
#########问题排查
key = get_file_content ( )
if key in data :
logger_trace . info ( " filetopg: " + key + " : " + str ( data [ key ] ) )
basename , extension = os . path . splitext ( itemFile . get ( ' filename ' , ' ' ) )
log_date = basename
# print ("filename:"+log_date)
records = [ ]
for key , value in data . iteritems ( ) :
#(datatype,menu,ip,account,jobnum,interface,company) count
res_str = " , " . join ( [ key , log_date , str ( value ) ] )
records . append ( res_str )
res_str = " \n " . join ( records )
csv_file = base_path + " / " + log_date + " .csv "
logger_cron . info ( " INSERT: 开始写csv文件 " )
write_large_file ( csv_file , res_str )
# json_to_csvFile(records,csv_file)
# sql = "\copy ueba_analysis_schema.logs(count,account,logdate,data_type,ip,interface,menu,jobnum) from '{}' with csv header DELIMITER ',';".format(csv_file)
# csv_to_pg(sql)
logger_cron . info ( " INSERT: 准备数据入库 " )
csv_to_pg_new ( csv_file )
logger_cron . info ( " INSERT: 完成数据入库 " )
#重命名文件json文件
logger_cron . info ( itemFile . get ( ' path ' , ' ' ) )
logger_cron . info ( " done_ " + itemFile . get ( ' filename ' , ' ' ) )
os . rename ( itemFile . get ( ' path ' , ' ' ) , base_path + " /done_ " + itemFile . get ( ' filename ' , ' ' ) )
logger_cron . info ( " INSERT: 重命名文件完成, " + itemFile . get ( ' filename ' , ' ' ) )
#重命名文件csv文件
logger_cron . info ( " done_ " + itemFile . get ( ' filename ' , ' ' ) )
os . rename ( csv_file , base_path + " /done_ " + log_date + " .csv " )
logger_cron . info ( " INSERT: csv重命名文件完成 " )
def delete_files ( directory_path ) :
"""
删除指定目录下所有形如 ' done_YYYY-MM-DD ' 的文件 ,
其中日期部分早于10天以前 。
: param directory_path : 要检查的目录的绝对路径
"""
# 计算7天前的日期
ten_days_ago = datetime . now ( ) - timedelta ( days = 7 )
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
date_pattern = re . compile ( r ' done_( \ d {4} - \ d {2} - \ d {2} ) ' )
# 遍历目录中的文件
for filename in os . listdir ( directory_path ) :
match = date_pattern . search ( filename )
if match :
file_date_str = match . group ( 1 )
file_date = datetime . strptime ( file_date_str , ' % Y- % m- %d ' )
# 检查文件日期是否在7天前
if file_date < = ten_days_ago :
file_path = os . path . join ( directory_path , filename )
os . remove ( file_path )
logger_cron . info ( " INSERT: 删除文件 " + file_path )
def entry ( ) :
#将大于500M的文件再次做合并
merge_entry ( )
merge_large_entry ( )
base_path = merge_large_file_path ( )
files = get_all_files ( base_path )
logger_cron . info ( " INSERT:获取文件数量 " + str ( len ( files ) ) )
#创建分区表
create_pq_table ( )
#数据入库
insert_data ( files , base_path )
#删除文件
delete_files ( base_path )
# #创建分区表
# def create_pq_table2():
# table_name = LOG_TABLE_NAME+'_'+'2024_08_19'
# start,end = '2024-08-19','2024-08-20'
# logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
# sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
# FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end)
# CFunction.execute(CPgSqlParam(sql))
# sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (1) TO (2);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (2) TO (3);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (3) TO (4);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name)
# CFunction.execute(CPgSqlParam(sql_type))
# logger_cron.info("INSERT:创建分区表完成")
# create_pq_table2()
# # logger_cron.info("INSERT:01")
# # csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-15.csv")
# logger_cron.info("INSERT:02")
# csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-18.csv")
# logger_cron.info("INSERT:03")