You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
197 lines
8.1 KiB
197 lines
8.1 KiB
#!/usr/bin/python
|
|
#encoding=utf-8
|
|
# author: tangwy
|
|
import re,os,json
|
|
import codecs
|
|
from db2json import DBUtils
|
|
from datetime import datetime, timedelta
|
|
from ext_logging import logger_cron,get_clean_file_path
|
|
from file_helper import read_large_json_file
|
|
from file_merge import entry as merge_entry
|
|
from dataInterface.functions import CFunction
|
|
from dataInterface.db.params import CPgSqlParam
|
|
|
|
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$')
|
|
|
|
LOG_TABLE_NAME = "ueba_analysis_schema.logs"
|
|
|
|
DATA_TYPE = {
|
|
"IP": 1,
|
|
"ACCOUNT": 2,
|
|
"INTERFACE": 3,
|
|
"MENU": 4,
|
|
}
|
|
|
|
# 获取当前日期并格式化为"年-月"
|
|
def get_current_year_month():
|
|
now = datetime.now()
|
|
return now.strftime("%Y_%m")
|
|
|
|
# 获取当前月份的第一天并格式化为"年-月-日"
|
|
def get_first_day_of_current_month():
|
|
now = datetime.now()
|
|
first_day = now.replace(day=1)
|
|
return first_day.strftime("%Y-%m-%d")
|
|
|
|
# 获取当前日期,然后计算下个月的第一天
|
|
def get_first_day_of_next_month():
|
|
now = datetime.now()
|
|
if now.month == 12:
|
|
next_month = now.replace(year=now.year+1, month=1, day=1)
|
|
else:
|
|
next_month = now.replace(month=now.month+1, day=1)
|
|
return next_month.strftime("%Y-%m-%d")
|
|
|
|
#获取表名
|
|
def get_table_name():
|
|
year_month = get_current_year_month()
|
|
return LOG_TABLE_NAME+'_'+ year_month
|
|
|
|
#获取表区间
|
|
def get_table_data_range():
|
|
start= get_first_day_of_current_month()
|
|
end = get_first_day_of_next_month()
|
|
return start,end
|
|
|
|
#创建分区表
|
|
def create_fq_table():
|
|
table_name = get_table_name()
|
|
start,end = get_table_data_range()
|
|
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
|
|
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
|
|
FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end)
|
|
CFunction.execute(CPgSqlParam(sql))
|
|
logger_cron.info("INSERT:创建分区表完成")
|
|
|
|
def get_all_files(path):
|
|
# 列出所有包含匹配模式的文件名
|
|
files = []
|
|
for filename in os.listdir(path):
|
|
if date_pattern.search(filename):
|
|
#由于定时任务是凌晨3点执行 所以只处理昨天的数据,今天的不处理
|
|
if datetime.now().strftime("%Y-%m-%d")+".json" != filename:
|
|
files.append({"filename": filename, "path": os.path.join(path,filename)})
|
|
return files
|
|
|
|
#数据入库
|
|
def insert_data(files):
|
|
for itemFile in files:
|
|
if os.path.exists(itemFile.get("path",'')):
|
|
data =read_large_json_file(itemFile.get("path",''))
|
|
logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path',''))
|
|
logger_cron.info("INSERT: 读取聚合文件完成")
|
|
ip_list = data.get('ip', [])
|
|
account_list = data.get('account', [])
|
|
interface_list = data.get('interface', [])
|
|
menu_list = data.get('menu', [])
|
|
|
|
logger_cron.info("INSERT: IP维度 " +str(len(ip_list)))
|
|
logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list)))
|
|
logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list)))
|
|
logger_cron.info("INSERT: MENU维度 " +str(len(menu_list)))
|
|
|
|
basename, extension = os.path.splitext(itemFile.get('filename', ''))
|
|
log_date = basename
|
|
# print ("filename:"+log_date)
|
|
records = []
|
|
for item in ip_list:
|
|
menu = item.get('menu', '')
|
|
ip = item.get('ip', '0.0.0.0')
|
|
account = item.get('account', '')
|
|
jobnum = item.get('jobnum', '')
|
|
count = item.get('count', 0)
|
|
logdate = log_date
|
|
datatype = DATA_TYPE.get("IP",1)
|
|
interface = item.get('interface', '')
|
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
|
|
for item in account_list:
|
|
menu = item.get('menu', '')
|
|
ip = item.get('ip', '0.0.0.0')
|
|
account = item.get('account', '')
|
|
jobnum = item.get('jobnum', '')
|
|
count = item.get('count', 0)
|
|
logdate = log_date
|
|
datatype = DATA_TYPE.get("ACCOUNT",2)
|
|
interface = item.get('interface', '')
|
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
|
|
for item in interface_list:
|
|
menu = item.get('menu', '')
|
|
ip = item.get('ip', '0.0.0.0')
|
|
account = item.get('account', '')
|
|
jobnum = item.get('jobnum', '')
|
|
count = item.get('count', 0)
|
|
logdate = log_date
|
|
datatype = DATA_TYPE.get("INTERFACE",3)
|
|
interface = item.get('interface', '')
|
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
|
|
for item in menu_list:
|
|
menu = item.get('menu', '')
|
|
ip = item.get('ip', '0.0.0.0')
|
|
account = item.get('account', '')
|
|
jobnum = item.get('jobnum', '')
|
|
count = item.get('count', 0)
|
|
logdate = log_date
|
|
datatype = DATA_TYPE.get("MENU",4)
|
|
interface = item.get('interface', '')
|
|
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
|
|
|
|
data_list=[]
|
|
for item in records:
|
|
data_list.append(item.get('menu', ''))
|
|
data_list.append(item.get('ip', ''))
|
|
data_list.append(item.get('account', ''))
|
|
data_list.append(item.get('jobnum', ''))
|
|
data_list.append(item.get('count', ''))
|
|
data_list.append(item.get('logdate', ''))
|
|
data_list.append(item.get('datatype', ''))
|
|
data_list.append(item.get('interface', ''))
|
|
|
|
sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface)
|
|
VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records]))
|
|
CFunction.execute(CPgSqlParam(sql, params=data_list))
|
|
logger_cron.info("INSERT: 数据插入完成")
|
|
|
|
#重命名文件
|
|
logger_cron.info(itemFile.get('path',''))
|
|
logger_cron.info("done_"+itemFile.get('filename', ''))
|
|
os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', ''))
|
|
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', ''))
|
|
|
|
def delete_files(directory_path):
|
|
"""
|
|
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件,
|
|
其中日期部分早于10天以前。
|
|
|
|
:param directory_path: 要检查的目录的绝对路径
|
|
"""
|
|
# 计算10天前的日期
|
|
ten_days_ago = datetime.now() - timedelta(days=10)
|
|
|
|
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
|
|
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})')
|
|
|
|
# 遍历目录中的文件
|
|
for filename in os.listdir(directory_path):
|
|
match = date_pattern.search(filename)
|
|
if match:
|
|
file_date_str = match.group(1)
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
|
|
|
|
# 检查文件日期是否在10天前
|
|
if file_date <= ten_days_ago:
|
|
file_path = os.path.join(directory_path, filename)
|
|
os.remove(file_path)
|
|
logger_cron.info("INSERT: 删除文件"+file_path)
|
|
|
|
def entry():
|
|
# 合并文件
|
|
merge_entry()
|
|
base_path = get_clean_file_path()
|
|
files = get_all_files(base_path)
|
|
logger_cron.info("INSERT:获取文件数量"+str(len(files)))
|
|
#创建分区表
|
|
create_fq_table()
|
|
#数据入库
|
|
insert_data(files)
|
|
#删除文件
|
|
delete_files(base_path)
|
|
|