diff --git a/conf/sys_config.json b/conf/sys_config.json index 24e5eb7..101686f 100644 --- a/conf/sys_config.json +++ b/conf/sys_config.json @@ -1,4 +1,38 @@ { "search_limit": 15, - "dip":["192.12.1.5"] + "dip":["10.25.108.198","10.30.61.50","10.25.110.215"], + "static_ext":[".js",".css",".png",".jpg",".ico",".html",".gif",".woff",".woff2",".ttf",".htm",".svg"], + "region_dict":{ + "10": "省公司", + "110": "武汉分公司", + "111": "武汉分公司", + "170": "襄阳分公司", + "171": "襄阳分公司", + "130": "鄂州分公司", + "131": "鄂州分公司", + "260": "孝感分公司", + "261": "孝感分公司", + "250": "黄冈分公司", + "251": "黄冈分公司", + "120": "黄石分公司", + "121": "黄石分公司", + "190": "咸宁分公司", + "191": "咸宁分公司", + "200": "荆州分公司", + "201": "荆州分公司", + "202": "荆州分公司", + "140": "宜昌分公司", + "141": "宜昌分公司", + "150": "恩施分公司", + "151": "恩施分公司", + "160": "十堰分公司", + "161": "十堰分公司", + "240": "随州分公司", + "241": "随州分公司", + "230": "荆门分公司", + "231": "荆门分公司", + "1801": "江汉分公司", + "1802": "潜江分公司", + "1803": "天门分公司" + } } \ No newline at end of file diff --git a/package.json b/package.json index 1ebceec..3f036ba 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "uebaMetricsAnalysis", - "version": "V3.0R01F00", + "version": "V3.0R01F00.240904", "menuurl": "/uebaMetricsAnalysis", "menuregx": "^/uebaMetricsAnalysis", "menuname": "指标晾晒统计", diff --git a/target/package.py b/target/package.py index 7149b47..1e8286a 100644 --- a/target/package.py +++ b/target/package.py @@ -18,7 +18,7 @@ class Package(object): # 第一步:初始化一些变量,获取app fold name 版本等等 self.__svn_up() self.app_name = self.__get_app_name() - self.version = '1.0.0' + self.version = self.__get_app_version() self.work_path_base = '/tmp' # 输出的dat包放置的位置,默认与package.py 放在同级目录 self.package_out_path = DIR_PATH @@ -33,7 +33,7 @@ class Package(object): self.packageFrontEnd() # 第三步:获取svn版本号 # self.build_version = self.__get_svn() - self.build_version = 23000 + self.build_version = "cf71606" self.package_name = "%s.%s.%s.tar.gz" % (self.app_name, self.version, self.build_version) # 第五步:拷贝需要打包进组件的目录和文件到临时目录 self.do_copy_work_dir() @@ -50,7 +50,10 @@ class Package(object): with open(os.path.join(BASE_PATH, "package.json"), "r+") as f: pkgConfig = json.load(f) return pkgConfig["name"] - + def __get_app_version(self): + with open(os.path.join(BASE_PATH, "package.json"), "r+") as f: + pkgConfig = json.load(f) + return pkgConfig["version"] def __get_py_not_encrypt(self): return { 'files': [ diff --git a/utils/base_dataclean_pg.py b/utils/base_dataclean_pg.py index aecab6a..f1a02d7 100644 --- a/utils/base_dataclean_pg.py +++ b/utils/base_dataclean_pg.py @@ -9,7 +9,7 @@ import codecs from esUtil import EsUtil from config import read_json_config from file_helper import write_large_file,get_file_content,TRACE_PATH -from dashboard_data_conversion import find_region_by_code,jobnum_region_dict +from dashboard_data_conversion import find_region_by_code from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path from collections import defaultdict from appsUtils import env @@ -222,13 +222,20 @@ def get_menu_group_data(index,startTime,endTime,diplist): def datetime_to_timestamp(dt): dtstr=dt.strftime("%Y-%m-%d %H:%M:%S") return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000) + +#获取扩展名称 +def get_file_extension(filename): + base_name, extension = os.path.splitext(filename) + return extension + def clean_data(read_index,start,end,jobid): APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME config_path = os.path.normpath(APPHOME + "/conf/sys_config.json") rule_data = read_json_config(config_path) dips=rule_data["dip"] - + static_ext = rule_data["static_ext"] + region_dict = rule_data["region_dict"] logger_cron.info("JOB:dip "+json.dumps(dips)) data_ip = get_ip_group_data(read_index,start,end,dips) @@ -244,11 +251,20 @@ def clean_data(read_index,start,end,jobid): logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据") logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据") logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据") + + new_interface_date=[] + logger_cron.info("JOB:"+jobid+","+json.dumps(static_ext)) + for item in data_interface: + if get_file_extension(item.get('interface', '')) in static_ext: + continue + new_interface_date.append(item) + + logger_cron.info("JOB:"+jobid+",接口维度实际数据量 "+str(len(new_interface_date))+" 条数据") #todo 读取上一次5分钟的文件,与这5分钟的文件合并 #合并完成后 写文件 - group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid) + group_and_write_to_file(data_ip, data_account, new_interface_date, data_menu, start,jobid,region_dict) -def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid): +def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid,region_dict): # 获取当前工作目录 base_path = get_clean_file_path() logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path) @@ -263,16 +279,19 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st logger_cron.info("JOB:"+jobid+", tmpfilepath"+tmp_file_path) #(datatype,menu,ip,account,jobnum,interface) count + jobnum_max_length = 20 records = {} for item in data_ip: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') + if jobnum == "" or len(jobnum) >jobnum_max_length: + continue count = item.get('count', 0) datatype = DATA_TYPE.get("IP",1) interface = remove_commas(item.get('interface', '')) - company = find_region_by_code(jobnum,jobnum_region_dict) + company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count #日志追踪 if not os.path.exists(TRACE_PATH): @@ -283,34 +302,40 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') + if jobnum == "" or len(jobnum) >jobnum_max_length: + continue count = item.get('count', 0) datatype = DATA_TYPE.get("ACCOUNT",2) interface = remove_commas(item.get('interface', '')) - company = find_region_by_code(jobnum,jobnum_region_dict) + company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count for item in data_interface: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') + if jobnum == "" or len(jobnum) >jobnum_max_length: + continue count = item.get('count', 0) datatype = DATA_TYPE.get("INTERFACE",3) interface = remove_commas(item.get('interface', '')) - company = find_region_by_code(jobnum,jobnum_region_dict) + company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count for item in data_menu: menu = remove_commas(item.get('menu', '')) ip = item.get('ip', '0.0.0.0') account = remove_commas(item.get('account', '')) jobnum = item.get('jobnum', '') + if menu == "" or len(jobnum) >jobnum_max_length: + continue count = item.get('count', 0) datatype = DATA_TYPE.get("MENU",4) interface = remove_commas(item.get('interface', '')) - company = find_region_by_code(jobnum,jobnum_region_dict) + company = find_region_by_code(jobnum,region_dict) records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count json_data = json.dumps(records) - + ########问题排查################# key=get_file_content() if key in records: diff --git a/utils/dashboard_data_conversion.py b/utils/dashboard_data_conversion.py index dacb709..8e362eb 100644 --- a/utils/dashboard_data_conversion.py +++ b/utils/dashboard_data_conversion.py @@ -43,13 +43,16 @@ def find_region_by_code(code, region_dict): 未查询到 返回错误工号 """ code_str = keep_digits_filter(code) + # 在字典中特别检查 + if code_str in region_dict: + return region_dict[code_str] + # 使用生成器表达式和next函数尝试找到匹配的前缀 company = next( (region_dict.get(code_str[:i]) for i in range(2, min(len(code_str), 5)) if code_str[:i] in region_dict), "错误工号") return company - def ip_summary_data_format(ip_summary_data): """ ip维度数据转换方法 diff --git a/utils/dashboard_summary_data.py b/utils/dashboard_summary_data.py index 1a7b5c4..3fde3f1 100644 --- a/utils/dashboard_summary_data.py +++ b/utils/dashboard_summary_data.py @@ -12,6 +12,7 @@ from dashboard_data_conversion import adjust_times from dataInterface.functions import CFunction from dataInterface.db.params import CPgSqlParam from ext_logging import logger +from collections import OrderedDict TABLE_NAME = "ueba_analysis_schema.logs" @@ -35,10 +36,11 @@ def get_ip_summary_data(startTime, endTime): :param startTime: 开始时间, :param endTime: 结束时间, """ - result = {} + result = OrderedDict() + sql = """ select company, sum(count) as count from {TABLE_NAME} where logdate >= %s and logdate <= %s and data_type = %s - group by company""".format(TABLE_NAME=TABLE_NAME) + group by company order by count desc""".format(TABLE_NAME=TABLE_NAME) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"])))) if res: for item in res: @@ -51,10 +53,10 @@ def get_account_summary_data(startTime, endTime): :param startTime: 开始时间, :param endTime: 结束时间, """ - result = {} + result = OrderedDict() sql = """ select company, sum(count) as count from {TABLE_NAME} where logdate >= %s and logdate <= %s and data_type = %s - group by company""".format(TABLE_NAME=TABLE_NAME) + group by company order by count desc""".format(TABLE_NAME=TABLE_NAME) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"])))) if res: for item in res: @@ -68,7 +70,7 @@ def get_interface_summary_data(startTime, endTime): :param startTime: 开始时间, :param endTime: 结束时间, """ - result = {} + result = OrderedDict() sql = """select interface, sum(count) as count from {TABLE_NAME} where logdate >= %s and logdate <= %s and data_type = %s group by interface order by count desc limit 20""".format(TABLE_NAME=TABLE_NAME) @@ -85,10 +87,10 @@ def get_menu_summary_data(startTime, endTime): :param startTime: 开始时间, :param endTime: 结束时间, """ - result = {} + result = OrderedDict() sql = """select menu, sum(count) as count from {TABLE_NAME} where logdate >= %s and logdate <= %s and data_type = %s - group by menu""".format(TABLE_NAME=TABLE_NAME) + group by menu order by count desc""".format(TABLE_NAME=TABLE_NAME) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"])))) if res: for item in res: diff --git a/utils/file_to_pg.py b/utils/file_to_pg.py index 6c84df2..5ffb0fc 100644 --- a/utils/file_to_pg.py +++ b/utils/file_to_pg.py @@ -151,10 +151,27 @@ def insert_data(files,base_path): records = [] for key, value in data.iteritems(): #(datatype,menu,ip,account,jobnum,interface,company) count + #[str(datatype), menu, ip,account,jobnum,interface,company + + v1,v2,v3,v4,v5,v6,v7 = key.split(",") + #menu + if len(v2)>50: + continue + #account + if len(v4)>30: + continue + #jobnum + if len(v5)>30: + continue + #interface + if len(v6)>300: + continue + res_str = ",".join([key,log_date, str(value)]) records.append(res_str) res_str = "\n".join(records) + logger_cron.info("INSERT: 排除异常数据后总数据 " +str(len(records))) csv_file = base_path+"/"+log_date+".csv" logger_cron.info("INSERT: 开始写csv文件") write_large_file(csv_file,res_str)