配置优化

dev
TANGWY 2 months ago
parent 216269d3cb
commit e5e6fc0db8
  1. 3
      conf/sys_config.json
  2. 4
      jobs/jobmeta.json
  3. 52
      utils/base_dataclean_pg.py
  4. 1
      utils/dashboard_data_pg.py
  5. 1
      utils/file_merge.py
  6. 40
      utils/file_to_pg.py

@ -1,3 +1,4 @@
{ {
"search_limit": 15 "search_limit": 15,
"dip":["192.12.1.5"]
} }

@ -25,9 +25,9 @@
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py", "exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py",
"task_owner": "uebaMetricsAnalysis", "task_owner": "uebaMetricsAnalysis",
"run_mode": 1, "run_mode": 1,
"duration_args": "0 10 2 * * ?", "duration_args": "0 10 1 * * ?",
"retry_nums": 3, "retry_nums": 3,
"is_enable": 1, "is_enable": 1,
"task_description": "凌晨2点10分执行一次 将汇总数据写入pg" "task_description": "凌晨1点10分执行一次 将汇总数据写入pg"
} }
] ]

@ -7,11 +7,13 @@ from datetime import datetime, timedelta
import calendar import calendar
import codecs import codecs
from esUtil import EsUtil from esUtil import EsUtil
from config import read_json_config
from file_helper import write_large_file,get_file_content,TRACE_PATH from file_helper import write_large_file,get_file_content,TRACE_PATH
from dashboard_data_conversion import find_region_by_code,jobnum_region_dict from dashboard_data_conversion import find_region_by_code,jobnum_region_dict
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
from collections import defaultdict from collections import defaultdict
from ext_logging import logger_trace from appsUtils import env
from ext_logging import logger_trace,APPFOLDERNAME
size = 9999#根据实际情况调整 size = 9999#根据实际情况调整
@ -23,18 +25,14 @@ DATA_TYPE = {
} }
## IP维度 ## IP维度
def get_ip_group_data(index,startTime,endTime): def get_ip_group_data(index,startTime,endTime,diplist):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"bool": { "bool": {
"filter": [ "filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": { {"terms": {"dip": diplist}}
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
] ]
} }
}, },
@ -72,18 +70,14 @@ def get_ip_group_data(index,startTime,endTime):
return datas return datas
## 账号维度 ## 账号维度
def get_account_group_data(index,startTime,endTime): def get_account_group_data(index,startTime,endTime,diplist):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"bool": { "bool": {
"filter": [ "filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": { {"terms": {"dip": diplist}}
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
] ]
} }
}, },
@ -124,18 +118,14 @@ def get_account_group_data(index,startTime,endTime):
return datas return datas
## 接口维度 ## 接口维度
def get_interface_group_data(index,startTime,endTime): def get_interface_group_data(index,startTime,endTime,diplist):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"bool": { "bool": {
"filter": [ "filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": { {"terms": {"dip": diplist}}
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
] ]
} }
}, },
@ -180,18 +170,14 @@ def get_interface_group_data(index,startTime,endTime):
return datas return datas
## 菜单维度 ## 菜单维度
def get_menu_group_data(index,startTime,endTime): def get_menu_group_data(index,startTime,endTime,diplist):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"bool": { "bool": {
"filter": [ "filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}}, {"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": { {"terms": {"dip": diplist}}
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
] ]
} }
}, },
@ -237,10 +223,18 @@ def datetime_to_timestamp(dt):
dtstr=dt.strftime("%Y-%m-%d %H:%M:%S") dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000) return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end,jobid): def clean_data(read_index,start,end,jobid):
data_ip = get_ip_group_data(read_index,start,end)
data_account = get_account_group_data(read_index,start,end) APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME
data_interface = get_interface_group_data(read_index,start,end) config_path = os.path.normpath(APPHOME + "/conf/sys_config.json")
data_menu = get_menu_group_data(read_index,start,end) rule_data = read_json_config(config_path)
dips=rule_data["dip"]
logger_cron.info("JOB:dip "+json.dumps(dips))
data_ip = get_ip_group_data(read_index,start,end,dips)
data_account = get_account_group_data(read_index,start,end,dips)
data_interface = get_interface_group_data(read_index,start,end,dips)
data_menu = get_menu_group_data(read_index,start,end,dips)
if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0: if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并") logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")

@ -14,7 +14,6 @@ from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
from ext_logging import logger from ext_logging import logger
TRACE_KEY = ""
TABLE_NAME = "ueba_analysis_schema.logs" TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = { DATA_TYPE = {

@ -5,7 +5,6 @@ import re,os,json,time
import codecs import codecs
from db2json import DBUtils from db2json import DBUtils
from datetime import datetime, timedelta from datetime import datetime, timedelta
from base_dataclean_pg import TRACE_KEY
from ext_logging import logger_cron,get_clean_file_path,merge_large_file_path,logger_trace from ext_logging import logger_cron,get_clean_file_path,merge_large_file_path,logger_trace
from file_helper import read_large_json_file,write_large_file,get_file_content,delete_frile,is_file_larger_than_500mb,merge_data_new from file_helper import read_large_json_file,write_large_file,get_file_content,delete_frile,is_file_larger_than_500mb,merge_data_new
from collections import defaultdict from collections import defaultdict

@ -214,43 +214,3 @@ def entry():
insert_data(files,base_path) insert_data(files,base_path)
#删除文件 #删除文件
delete_files(base_path) delete_files(base_path)
# #创建分区表
# def create_pq_table2():
# table_name = LOG_TABLE_NAME+'_'+'2024_08_19'
# start,end = '2024-08-19','2024-08-20'
# logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
# sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
# FOR VALUES FROM ('{START}') TO ('{END}') PARTITION BY RANGE (data_type);""".format(TABLE_NAME=table_name,START = start,END=end)
# CFunction.execute(CPgSqlParam(sql))
# sql_type="""CREATE TABLE if not EXISTS {TABLE_NAME_TYPE1}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (1) TO (2);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE2}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (2) TO (3);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE3}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (3) TO (4);
# CREATE TABLE if not EXISTS {TABLE_NAME_TYPE4}
# PARTITION OF {TABLE_NAME}
# FOR VALUES FROM (4) TO (5);""".format(TABLE_NAME_TYPE1=table_name+"_type_1",TABLE_NAME_TYPE2=table_name+"_type_2",TABLE_NAME_TYPE3=table_name+"_type_3",TABLE_NAME_TYPE4=table_name+"_type_4",TABLE_NAME=table_name)
# CFunction.execute(CPgSqlParam(sql_type))
# logger_cron.info("INSERT:创建分区表完成")
# create_pq_table2()
# # logger_cron.info("INSERT:01")
# # csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-15.csv")
# logger_cron.info("INSERT:02")
# csv_to_pg_new("/home/master/ISOP/apps/uebaMetricsAnalysis/files/merge_files/2024-08-18.csv")
# logger_cron.info("INSERT:03")
Loading…
Cancel
Save