From e5e6fc0db86a280ba51e3adb949109a796452995 Mon Sep 17 00:00:00 2001 From: TANGWY Date: Tue, 3 Sep 2024 10:20:07 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/sys_config.json | 3 ++- jobs/jobmeta.json | 4 +-- utils/base_dataclean_pg.py | 52 +++++++++++++++++--------------------- utils/dashboard_data_pg.py | 1 - utils/file_merge.py | 1 - utils/file_to_pg.py | 40 ----------------------------- 6 files changed, 27 insertions(+), 74 deletions(-) diff --git a/conf/sys_config.json b/conf/sys_config.json index 745cdb2..24e5eb7 100644 --- a/conf/sys_config.json +++ b/conf/sys_config.json @@ -1,3 +1,4 @@ { - "search_limit": 15 + "search_limit": 15, + "dip":["192.12.1.5"] } \ No newline at end of file diff --git a/jobs/jobmeta.json b/jobs/jobmeta.json index b0480bb..2184505 100644 --- a/jobs/jobmeta.json +++ b/jobs/jobmeta.json @@ -25,9 +25,9 @@ "exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py", "task_owner": "uebaMetricsAnalysis", "run_mode": 1, - "duration_args": "0 10 2 * * ?", + "duration_args": "0 10 1 * * ?", "retry_nums": 3, "is_enable": 1, - "task_description": "凌晨2点10分执行一次 将汇总数据写入pg" + "task_description": "凌晨1点10分执行一次 将汇总数据写入pg" } ] \ No newline at end of file diff --git a/utils/base_dataclean_pg.py b/utils/base_dataclean_pg.py index 51015ee..aecab6a 100644 --- a/utils/base_dataclean_pg.py +++ b/utils/base_dataclean_pg.py @@ -7,11 +7,13 @@ from datetime import datetime, timedelta import calendar import codecs from esUtil import EsUtil +from config import read_json_config from file_helper import write_large_file,get_file_content,TRACE_PATH from dashboard_data_conversion import find_region_by_code,jobnum_region_dict from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path from collections import defaultdict -from ext_logging import logger_trace +from appsUtils import env +from ext_logging import logger_trace,APPFOLDERNAME size = 9999#根据实际情况调整 @@ -23,18 +25,14 @@ DATA_TYPE = { } ## IP维度 -def get_ip_group_data(index,startTime,endTime): +def get_ip_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, - {"bool": { - "must_not": [ - {"match_phrase": {"account": ""}} - ] - }} + {"terms": {"dip": diplist}} ] } }, @@ -72,18 +70,14 @@ def get_ip_group_data(index,startTime,endTime): return datas ## 账号维度 -def get_account_group_data(index,startTime,endTime): +def get_account_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, - {"bool": { - "must_not": [ - {"match_phrase": {"account": ""}} - ] - }} + {"terms": {"dip": diplist}} ] } }, @@ -124,18 +118,14 @@ def get_account_group_data(index,startTime,endTime): return datas ## 接口维度 -def get_interface_group_data(index,startTime,endTime): +def get_interface_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, - {"bool": { - "must_not": [ - {"match_phrase": {"account": ""}} - ] - }} + {"terms": {"dip": diplist}} ] } }, @@ -180,18 +170,14 @@ def get_interface_group_data(index,startTime,endTime): return datas ## 菜单维度 -def get_menu_group_data(index,startTime,endTime): +def get_menu_group_data(index,startTime,endTime,diplist): query_body={ "size": 0, "query": { "bool": { "filter": [ {"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, - {"bool": { - "must_not": [ - {"match_phrase": {"account": ""}} - ] - }} + {"terms": {"dip": diplist}} ] } }, @@ -237,10 +223,18 @@ def datetime_to_timestamp(dt): dtstr=dt.strftime("%Y-%m-%d %H:%M:%S") return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000) def clean_data(read_index,start,end,jobid): - data_ip = get_ip_group_data(read_index,start,end) - data_account = get_account_group_data(read_index,start,end) - data_interface = get_interface_group_data(read_index,start,end) - data_menu = get_menu_group_data(read_index,start,end) + + APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME + config_path = os.path.normpath(APPHOME + "/conf/sys_config.json") + rule_data = read_json_config(config_path) + dips=rule_data["dip"] + + logger_cron.info("JOB:dip "+json.dumps(dips)) + + data_ip = get_ip_group_data(read_index,start,end,dips) + data_account = get_account_group_data(read_index,start,end,dips) + data_interface = get_interface_group_data(read_index,start,end,dips) + data_menu = get_menu_group_data(read_index,start,end,dips) if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0: logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并") diff --git a/utils/dashboard_data_pg.py b/utils/dashboard_data_pg.py index 20824e7..94b16de 100644 --- a/utils/dashboard_data_pg.py +++ b/utils/dashboard_data_pg.py @@ -14,7 +14,6 @@ from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam from ext_logging import logger -TRACE_KEY = "" TABLE_NAME = "ueba_analysis_schema.logs" DATA_TYPE = { diff --git a/utils/file_merge.py b/utils/file_merge.py index 5600d4e..2571c08 100644 --- a/utils/file_merge.py +++ b/utils/file_merge.py @@ -5,7 +5,6 @@ import re,os,json,time import codecs from db2json import DBUtils from datetime import datetime, timedelta -from base_dataclean_pg import TRACE_KEY from ext_logging import logger_cron,get_clean_file_path,merge_large_file_path,logger_trace from file_helper import read_large_json_file,write_large_file,get_file_content,delete_frile,is_file_larger_than_500mb,merge_data_new from collections import defaultdict diff --git a/utils/file_to_pg.py b/utils/file_to_pg.py index 2094720..6c84df2 100644 --- a/utils/file_to_pg.py +++ b/utils/file_to_pg.py @@ -214,43 +214,3 @@ def entry(): insert_data(files,base_path) #删除文件 delete_files(base_path) - - - - - - -# #创建分区表 -# def create_pq_table2(): -# table_name = LOG_TABLE_NAME+'_'+'2024_08_19' -# start,end = '2024-08-19','2024-08-20' -# logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end)) -# sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs -# FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end) -# CFunction.execute(CPgSqlParam(sql)) - -# sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1} -# PARTITION OF {TABLE_NAME} -# FOR VALUES FROM (1) TO (2); -# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2} -# PARTITION OF {TABLE_NAME} -# FOR VALUES FROM (2) TO (3); -# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3} -# PARTITION OF {TABLE_NAME} -# FOR VALUES FROM (3) TO (4); -# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4} -# PARTITION OF {TABLE_NAME} -# FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name) - -# CFunction.execute(CPgSqlParam(sql_type)) -# logger_cron.info("INSERT:创建分区表完成") - - -# create_pq_table2() - - -# # logger_cron.info("INSERT:01") -# # csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-15.csv") -# logger_cron.info("INSERT:02") -# csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-18.csv") -# logger_cron.info("INSERT:03") \ No newline at end of file