代码提交

dev
TANGWY 2 months ago
parent e5e6fc0db8
commit 4910576413
  1. 36
      conf/sys_config.json
  2. 2
      package.json
  3. 9
      target/package.py
  4. 43
      utils/base_dataclean_pg.py
  5. 5
      utils/dashboard_data_conversion.py
  6. 16
      utils/dashboard_summary_data.py
  7. 17
      utils/file_to_pg.py

@ -1,4 +1,38 @@
{
"search_limit": 15,
"dip":["192.12.1.5"]
"dip":["10.25.108.198","10.30.61.50","10.25.110.215"],
"static_ext":[".js",".css",".png",".jpg",".ico",".html",".gif",".woff",".woff2",".ttf",".htm",".svg"],
"region_dict":{
"10": "省公司",
"110": "武汉分公司",
"111": "武汉分公司",
"170": "襄阳分公司",
"171": "襄阳分公司",
"130": "鄂州分公司",
"131": "鄂州分公司",
"260": "孝感分公司",
"261": "孝感分公司",
"250": "黄冈分公司",
"251": "黄冈分公司",
"120": "黄石分公司",
"121": "黄石分公司",
"190": "咸宁分公司",
"191": "咸宁分公司",
"200": "荆州分公司",
"201": "荆州分公司",
"202": "荆州分公司",
"140": "宜昌分公司",
"141": "宜昌分公司",
"150": "恩施分公司",
"151": "恩施分公司",
"160": "十堰分公司",
"161": "十堰分公司",
"240": "随州分公司",
"241": "随州分公司",
"230": "荆门分公司",
"231": "荆门分公司",
"1801": "江汉分公司",
"1802": "潜江分公司",
"1803": "天门分公司"
}
}

@ -1,6 +1,6 @@
{
"name": "uebaMetricsAnalysis",
"version": "V3.0R01F00",
"version": "V3.0R01F00.240904",
"menuurl": "/uebaMetricsAnalysis",
"menuregx": "^/uebaMetricsAnalysis",
"menuname": "指标晾晒统计",

@ -18,7 +18,7 @@ class Package(object):
# 第一步:初始化一些变量,获取app fold name 版本等等
self.__svn_up()
self.app_name = self.__get_app_name()
self.version = '1.0.0'
self.version = self.__get_app_version()
self.work_path_base = '/tmp'
# 输出的dat包放置的位置,默认与package.py 放在同级目录
self.package_out_path = DIR_PATH
@ -33,7 +33,7 @@ class Package(object):
self.packageFrontEnd()
# 第三步:获取svn版本号
# self.build_version = self.__get_svn()
self.build_version = 23000
self.build_version = "cf71606"
self.package_name = "%s.%s.%s.tar.gz" % (self.app_name, self.version, self.build_version)
# 第五步:拷贝需要打包进组件的目录和文件到临时目录
self.do_copy_work_dir()
@ -50,7 +50,10 @@ class Package(object):
with open(os.path.join(BASE_PATH, "package.json"), "r+") as f:
pkgConfig = json.load(f)
return pkgConfig["name"]
def __get_app_version(self):
with open(os.path.join(BASE_PATH, "package.json"), "r+") as f:
pkgConfig = json.load(f)
return pkgConfig["version"]
def __get_py_not_encrypt(self):
return {
'files': [

@ -9,7 +9,7 @@ import codecs
from esUtil import EsUtil
from config import read_json_config
from file_helper import write_large_file,get_file_content,TRACE_PATH
from dashboard_data_conversion import find_region_by_code,jobnum_region_dict
from dashboard_data_conversion import find_region_by_code
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
from collections import defaultdict
from appsUtils import env
@ -222,13 +222,20 @@ def get_menu_group_data(index,startTime,endTime,diplist):
def datetime_to_timestamp(dt):
dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
#获取扩展名称
def get_file_extension(filename):
base_name, extension = os.path.splitext(filename)
return extension
def clean_data(read_index,start,end,jobid):
APPHOME = env.get_isop_root() + "/apps/" + APPFOLDERNAME
config_path = os.path.normpath(APPHOME + "/conf/sys_config.json")
rule_data = read_json_config(config_path)
dips=rule_data["dip"]
static_ext = rule_data["static_ext"]
region_dict = rule_data["region_dict"]
logger_cron.info("JOB:dip "+json.dumps(dips))
data_ip = get_ip_group_data(read_index,start,end,dips)
@ -244,11 +251,20 @@ def clean_data(read_index,start,end,jobid):
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
new_interface_date=[]
logger_cron.info("JOB:"+jobid+","+json.dumps(static_ext))
for item in data_interface:
if get_file_extension(item.get('interface', '')) in static_ext:
continue
new_interface_date.append(item)
logger_cron.info("JOB:"+jobid+",接口维度实际数据量 "+str(len(new_interface_date))+" 条数据")
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid)
group_and_write_to_file(data_ip, data_account, new_interface_date, data_menu, start,jobid,region_dict)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid):
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid,region_dict):
# 获取当前工作目录
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
@ -263,16 +279,19 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
logger_cron.info("JOB:"+jobid+", tmpfilepath"+tmp_file_path)
#(datatype,menu,ip,account,jobnum,interface) count
jobnum_max_length = 20
records = {}
for item in data_ip:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("IP",1)
interface = remove_commas(item.get('interface', ''))
company = find_region_by_code(jobnum,jobnum_region_dict)
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
#日志追踪
if not os.path.exists(TRACE_PATH):
@ -283,34 +302,40 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = remove_commas(item.get('interface', ''))
company = find_region_by_code(jobnum,jobnum_region_dict)
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
for item in data_interface:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
if jobnum == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("INTERFACE",3)
interface = remove_commas(item.get('interface', ''))
company = find_region_by_code(jobnum,jobnum_region_dict)
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
for item in data_menu:
menu = remove_commas(item.get('menu', ''))
ip = item.get('ip', '0.0.0.0')
account = remove_commas(item.get('account', ''))
jobnum = item.get('jobnum', '')
if menu == "" or len(jobnum) >jobnum_max_length:
continue
count = item.get('count', 0)
datatype = DATA_TYPE.get("MENU",4)
interface = remove_commas(item.get('interface', ''))
company = find_region_by_code(jobnum,jobnum_region_dict)
company = find_region_by_code(jobnum,region_dict)
records[",".join([str(datatype), menu, ip,account,jobnum,interface,company])]=count
json_data = json.dumps(records)
########问题排查#################
key=get_file_content()
if key in records:

@ -43,13 +43,16 @@ def find_region_by_code(code, region_dict):
未查询到 返回错误工号
"""
code_str = keep_digits_filter(code)
# 在字典中特别检查
if code_str in region_dict:
return region_dict[code_str]
# 使用生成器表达式和next函数尝试找到匹配的前缀
company = next(
(region_dict.get(code_str[:i]) for i in range(2, min(len(code_str), 5)) if code_str[:i] in region_dict),
"错误工号")
return company
def ip_summary_data_format(ip_summary_data):
"""
ip维度数据转换方法

@ -12,6 +12,7 @@ from dashboard_data_conversion import adjust_times
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
from ext_logging import logger
from collections import OrderedDict
TABLE_NAME = "ueba_analysis_schema.logs"
@ -35,10 +36,11 @@ def get_ip_summary_data(startTime, endTime):
:param startTime: 开始时间,
:param endTime: 结束时间,
"""
result = {}
result = OrderedDict()
sql = """ select company, sum(count) as count from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by company""".format(TABLE_NAME=TABLE_NAME)
group by company order by count desc""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["IP"]))))
if res:
for item in res:
@ -51,10 +53,10 @@ def get_account_summary_data(startTime, endTime):
:param startTime: 开始时间,
:param endTime: 结束时间,
"""
result = {}
result = OrderedDict()
sql = """ select company, sum(count) as count from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by company""".format(TABLE_NAME=TABLE_NAME)
group by company order by count desc""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["ACCOUNT"]))))
if res:
for item in res:
@ -68,7 +70,7 @@ def get_interface_summary_data(startTime, endTime):
:param startTime: 开始时间,
:param endTime: 结束时间,
"""
result = {}
result = OrderedDict()
sql = """select interface, sum(count) as count from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by interface order by count desc limit 20""".format(TABLE_NAME=TABLE_NAME)
@ -85,10 +87,10 @@ def get_menu_summary_data(startTime, endTime):
:param startTime: 开始时间,
:param endTime: 结束时间,
"""
result = {}
result = OrderedDict()
sql = """select menu, sum(count) as count from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by menu""".format(TABLE_NAME=TABLE_NAME)
group by menu order by count desc""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res:
for item in res:

@ -151,10 +151,27 @@ def insert_data(files,base_path):
records = []
for key, value in data.iteritems():
#(datatype,menu,ip,account,jobnum,interface,company) count
#[str(datatype), menu, ip,account,jobnum,interface,company
v1,v2,v3,v4,v5,v6,v7 = key.split(",")
#menu
if len(v2)>50:
continue
#account
if len(v4)>30:
continue
#jobnum
if len(v5)>30:
continue
#interface
if len(v6)>300:
continue
res_str = ",".join([key,log_date, str(value)])
records.append(res_str)
res_str = "\n".join(records)
logger_cron.info("INSERT: 排除异常数据后总数据 " +str(len(records)))
csv_file = base_path+"/"+log_date+".csv"
logger_cron.info("INSERT: 开始写csv文件")
write_large_file(csv_file,res_str)

Loading…
Cancel
Save