Compare commits

...

2 Commits

Author SHA1 Message Date
TANGWY 315f1470ec '提交' 4 months ago
TANGWY b97f549d89 '代码提交' 5 months ago
  1. 144
      conf/defaultRule.json
  2. 78
      corn/ueba_corn_data_insert.py
  3. 56
      corn/ueba_corn_pg.py
  4. 142
      cron/log_alarm.py
  5. 0
      cron/ueba_cron.py
  6. 37
      cron/ueba_cron_data_insert.py
  7. 73
      cron/ueba_cron_pg.py
  8. 44
      files/2024-07-13.json
  9. 2
      install.py
  10. 21
      jobs/jobmeta.json
  11. 823
      mock/mock_data.json
  12. 221
      mock/mock_data2.json
  13. 6
      package.json
  14. 14
      right_config.json
  15. 18
      test.py
  16. 292
      utils/base_dataclean.py
  17. 179
      utils/base_dataclean_pg.py
  18. 281
      utils/dashboard_data.py
  19. 40
      utils/dashboard_data_conversion.py
  20. 63
      utils/dashboard_data_pg.py
  21. 69
      utils/db2json.py
  22. 90
      utils/es_operation.py
  23. 7
      utils/ext_logging.py
  24. 205
      utils/file_to_pg.py
  25. 2
      views/dashboard_views.py

@ -1,38 +1,150 @@
{
"white_list": {
"ip": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"account": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"interface": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"menu": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
]
},
"grey_list": {
"ip": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"account": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"interface": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
],
"menu": [
400000,
400001
510400,
510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
]
}
}

@ -1,78 +0,0 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id():
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
#每5分钟执行一次
def processing(self):
try:
logger.info("job:开始执行")
start,end=self.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
logger.info("job:"+"准备获取es数据")
#entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,56 +0,0 @@
# coding=utf-8
"""
@Author: tangwy
@FileName: user_cron_pg.py
@DateTime: 2024/7/09 14:19
@Description: 定时清洗es数据
"""
from __future__ import unicode_literals
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id(self):
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#每5分钟执行一次
def processing(self):
job_id =self.generate_job_id()
try:
logger.info("job:开始执行")
start,end= DBUtils.get_job_period()
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
logger.info("job:"+"准备获取es数据")
entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -0,0 +1,142 @@
# coding:utf-8
import sys
import uuid
import json
import time
import random
path = str(sys.path[0])
home_path = path.split("isop_uebaapiData")[0]
sys.path.append(home_path)
from isop_uebaapiData.util import send_logs
def alarm(cookies, api):
"""2、HTTP日志"""
inputstr = '''[{"msgtype":1,"hash":"8DE9-BDAB-F622-2FA8","dev_ip":"10.67.5.17","product":"uts"},{"sid":"6004744450036c44f815500016d00a5f5151105430a3ed","timestamp":1567673939,"sip":"10.67.0.52","sport":5624,"dip":"10.67.0.53","dport":80,"protocol":6,"app":3087428650795009,"app_proto":8,"direct":4,"app.detail":{"method":"GET","http_protocol":"1.1","ret_code":200,"host":"10.67.1.1","uri":"/webtest/uploadFile.php","referer":"http://[2222::65]/webtest/","content_type":" multipart/form-data; boundary=----WebKitFormBoundary2zcCUl4lQf1h7A7S","content_type_server":" text/html","server":"Apache/2.4.4 (Win32) OpenSSL/0.9.8y PHP/5.4.19","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36","link":"","cookies":"loginmainacctid=wangshiguang;operatorId=d2601586;com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;","content_encoding":"","location":"","content_length":70080,"content_length_server":200,"set_cookie":"","range":"","connection":"keep-alive","connection_server":"Keep-Alive","x_forwarded_for":"","post_data":"LS0tLS0tV2ViS2l0Rm9ybUJvdW5kYXJ5MnpjQ1VsNGxRZjFoN0E3Uw0KQ29udGVudC1EaXNwb3NpdGlvbjogZm9ybS1kYXRhOyBuYW1lPSJmaWxlIjsgZmlsZW5hbWU9IjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1Ig0=","response_body":"VXBsb2FkOiAwMDAxYWQ0NDFkY2IzODYyMThhNzY5OTJhY2Y4YjcwNTxiciAvPlR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTxiciAvPlNpemU6IDY4LjEyNzkyOTY4NzUgS2I8YnIgLz5UZW1wIGZpbGU6IEQ6XHhhbXBwXHRtcFxwaHA2ODI1LnRtcDxiciAvPjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1IGFscmVhZHkgZXhpc3RzLiA="}}]'''
inputarr = json.loads(inputstr, strict=False)
# 随机生成timestamp
inputarr[1]["timestamp"] = int(time.time())
inputarr[1]["sid"] = str(uuid.uuid1())
# inputarr[1]["sip"] = "10.67.4.33"
inputarr[1]["sip"] = generate_random_ip()
inputarr[1]["dip"] = "10.67.1.1"
inputarr[1]["dport"] = "8180"
inputarr[1]["app.detail"]["uri"] = "/alarmtest.action?BMECID=352432757&BMETimestamp=1692788489260&queryNumber=158713459"
inputarr[1]["app.detail"]["host"] = api
inputarr[1]["app.detail"]["cookies"] = cookies
return json.dumps(inputarr)
def generate_random_ip():
# 固定前缀 "192.168."
prefix = "192.168."
# 生成随机的第三和第四段IP地址
third_octet = 1
fourth_octet = random.randint(0, 50)
# 拼接IP地址
ip = "{}{}.{}".format(prefix, third_octet, fourth_octet)
return ip
def AbIDVisitAPINums510404():
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com",
"good.alarm.com"]
info_list = [
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["武汉"][
0] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 60],
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["荆州"][
2] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 120]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
return "510405场景的告警数据已生成"
def get_random_jobnum():
# 定义包含不同前缀的字符串数组
prefix_strings = [
['10243', '10895', '10134', '10781', '10962'], # 10打头的字符串示例
['11089', '11057', '11023', '11016', '11030'], # 110打头的字符串示例
['14076', '14049', '14098', '14032', '14061'], # 140打头的字符串示例
['26054', '26013', '26087', '26029', '26061'], # 260打头的字符串示例
['20083', '20015', '20072', '20096', '20048'], # 200打头的字符串示例
['19035', '19017', '19049', '19082', '19096'], # 190打头的字符串示例
['180237', '180276', '180204', '180295', '180219'] # 1802打头的字符串示例
]
# 随机选择一个前缀数组
selected_prefix_array = random.choice(prefix_strings)
# 随机选择一个具体的字符串
selected_string = random.choice(selected_prefix_array)
return selected_string
def get_random_person():
people_list = [
"Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Henry", "Isabel", "Jack",
"Kate", "Liam", "Mia", "Noah", "Olivia"
# 继续添加更多的名称...
]
random_person = random.choice(people_list)
return random_person
def get_random_menu():
# 定义系统菜单列表
system_menu = [
"主页", "设置", "个人资料", "消息", "通知", "帮助", "帐户", "关于", "联系我们", "服务",
"购物车", "订单", "支付", "地址", "密码"
]
# 随机选择一个菜单项
random_menu_item = random.choice(system_menu)
return random_menu_item
if __name__ == '__main__':
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com", "good.alarm.com","baidu.com","sohu.com","xinlang.com","erpx.com"]
info_list = [
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
print "510405场景的告警数据已生成"

@ -0,0 +1,37 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.file_to_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
#每5分钟执行一次
def processing(self):
try:
logger_cron.info("INSERT:开始执行")
entry()
logger_cron.info("INSERT:"+"执行完成")
except Exception ,e:
err_info=traceback.format_exc()
logger_cron.error("INSERT:"+"执行失败,"+err_info)
if __name__ == '__main__':
UserCron().processing()

@ -0,0 +1,73 @@
# coding=utf-8
"""
@Author: tangwy
@FileName: user_cron_pg.py
@DateTime: 2024/7/09 14:19
@Description: 定时清洗es数据
"""
from __future__ import unicode_literals
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
#生成job_id
def generate_job_id(self):
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#每5分钟执行一次
def processing(self):
logger_cron.info("JOB:接收到执行指令")
job_id =self.generate_job_id()
task_run_count =0
try:
start,end,status,run_count,jobid= DBUtils.get_job_period()
if jobid !="":
job_id=jobid
logger_cron.info("JOB:"+job_id+"开始执行")
if status ==1:
logger_cron.info("JOB:"+job_id+"正在运行中不执行")
return
if status==3 and run_count >3:
logger_cron.info("JOB:"+job_id+"失败次数大于3不执行")
return
if start is None or end is None:
logger_cron.info("JOB:"+job_id+"结束时间大于(服务器时间-15分钟)不执行")
return
task_run_count = run_count+1
logger_cron.info("JOB:"+job_id+"运行参数:{},{}".format(start,end))
logger_cron.info("JOB:"+job_id+"准备将job写入job表")
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger_cron.info("JOB:"+job_id+"完成job表写入")
logger_cron.info("JOB:"+job_id+"准备获取es数据")
entry(start,end,job_id)
logger_cron.info("JOB:"+job_id+"完成es数据获取")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"",task_run_count)
logger_cron.info("JOB:"+job_id+"更新job表状态完成")
except Exception ,e:
err_info=traceback.format_exc()
logger_cron.error("JOB:"+job_id+"执行失败:"+err_info)
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info,task_run_count)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,44 +0,0 @@
{
"ip":[
{
"ip":"192.168.1.1",
"jobnum":"1122222",
"count":212
},
{
"ip":"192.168.2.1",
"jobnum":"1122222",
"count":212
}
],
"account":[
{
"account":"zhangs",
"jobnum":"1122222",
"count":212
},
{
"account":"zhang3",
"jobnum":"112222",
"count":211
}
],
"interface":[
{
"interface":"www.baidu.com/user",
"jobnum":"1122222",
"account":"zhangs",
"ip":"192.168.1.1",
"count":212
}
],
"menu":[
{
"menu":"菜单1",
"jobnum":"1122222",
"account":"zhangs",
"ip":"192.168.1.1",
"count":212
}
]
}

@ -76,7 +76,7 @@ class Install():
def install(self):
try:
installDBSchema(["pg_struct.sql"])
installDBSchema(["pg_struct.sql","pg_data.sql"])
add_task()
logger.info('>>>安装结束!!!')
except Exception as e:

@ -1,13 +1,24 @@
[
{
"task_name": "ueba_corn",
"task_name": "ueba_cron",
"task_type": 1,
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/corn/ueba_corn.py",
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_pg.py",
"task_owner": "uebaMetricsAnalysis",
"run_mode": 1,
"duration_args": "*/2 * * * * *",
"retry_nums": 5,
"duration_args": "0 */1 * * * ?",
"retry_nums": 0,
"is_enable": 1,
"task_description": "每天执行一次 清洗数据到es-ueba索引"
"task_description": "每分钟执行一次数据清洗"
},
{
"task_name": "ueba_cron_data_insert",
"task_type": 1,
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py",
"task_owner": "uebaMetricsAnalysis",
"run_mode": 1,
"duration_args": "0 0 3 * * ?",
"retry_nums": 0,
"is_enable": 1,
"task_description": "每天执行一次 将汇总数据写入pg"
}
]

@ -2,191 +2,784 @@
"summary": {
"ip": [
{
"company": "湖北公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_reat": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "宜昌公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_reat": 0.3,
"ip_avg": 0.43,
"trend": 0.3
"company": "宜昌分公司",
"req_frequency": 517,
"frequency_rate": 17.1475953565506,
"ip_count": 8,
"ip_rate": 0.195121951219512,
"ip_avg": 2.14344941956882,
"trend": 0.09
},
{
"company": "随州分公司",
"req_frequency": 329,
"frequency_rate": 10.9121061359867,
"ip_count": 7,
"ip_rate": 0.170731707317073,
"ip_avg": 1.55887230514096,
"trend": 0.1
},
{
"company": "孝感分公司",
"req_frequency": 399,
"frequency_rate": 13.2338308457711,
"ip_count": 7,
"ip_rate": 0.170731707317073,
"ip_avg": 1.89054726368159,
"trend": -0.07
},
{
"company": "黄冈分公司",
"req_frequency": 495,
"frequency_rate": 16.4179104477612,
"ip_count": 9,
"ip_rate": 0.219512195121951,
"ip_avg": 1.82421227197347,
"trend": -0.02
},
{
"company": "省公司",
"req_frequency": 1275,
"frequency_rate": 42.2885572139304,
"ip_count": 10,
"ip_rate": 0.24390243902439,
"ip_avg": 4.22885572139304,
"trend": 0.1
}
],
"account": [
{
"company": "湖北公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_reat": 0.3,
"account_avg": 0.43,
"trend": 0.3
"company": "宜昌分公司",
"req_frequency": 134,
"frequency_rate": 19.7058823529412,
"account_count": 8,
"account_rate": 0.242424242424242,
"account_avg": 2.46323529411765,
"trend": 0.09
},
{
"company": "随州分公司",
"req_frequency": 73,
"frequency_rate": 10.7352941176471,
"account_count": 7,
"account_rate": 0.212121212121212,
"account_avg": 1.53361344537815,
"trend": 0.1
},
{
"company": "孝感分公司",
"req_frequency": 225,
"frequency_rate": 33.0882352941176,
"account_count": 7,
"account_rate": 0.212121212121212,
"account_avg": 4.72689075630252,
"trend": -0.07
},
{
"company": "黄冈分公司",
"req_frequency": 166,
"frequency_rate": 24.4117647058824,
"account_count": 9,
"account_rate": 0.272727272727273,
"account_avg": 2.71241830065359,
"trend": -0.02
},
{
"company": "宜昌公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_reat": 0.3,
"account_avg": 0.43,
"trend": 0.3
"company": "公司",
"req_frequency": 216,
"frequency_rate": 31.7647058823529,
"account_count": 10,
"account_rate": 0.303030303030303,
"account_avg": 3.17647058823529,
"trend": 0.1
}
],
"interface": [
{
"interface_addr": "/getuser",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 212,
"frequency_rate": 0.160727824109174,
"frequency_avg": 0,
"trend": 0.07
},
{
"interface_addr": "/getcpminfo",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 225,
"frequency_rate": 0.170583775587566,
"frequency_avg": 0,
"trend": 0.02
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 882,
"frequency_rate": 0.66868840030326,
"frequency_avg": 0,
"trend": -0.09
}
],
"menu": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
"menu_name": "菜单1",
"req_frequency": 333,
"frequency_rate": 0.263449367088608,
"frequency_avg": 111,
"trend": 0.09
},
{
"menu_name": "菜单2",
"req_frequency": 315,
"frequency_rate": 0.249208860759494,
"frequency_avg": 105,
"trend": -0.01
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
"menu_name": "菜单3",
"req_frequency": 616,
"frequency_rate": 0.487341772151899,
"frequency_avg": 205.333333333333,
"trend": 0.02
}
]
},
"detail": {
"ip": {
"湖北公司": [
"宜昌分公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_ip": "192.156.3.11",
"req_frequency": 22
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
"req_ip": "192.156.3.12",
"req_frequency": 12
},
{
"req_ip": "192.156.3.19",
"req_frequency": 78
},
{
"req_ip": "192.156.3.20",
"req_frequency": 79
},
{
"req_ip": "192.156.3.21",
"req_frequency": 80
},
{
"req_ip": "192.156.3.22",
"req_frequency": 81
},
{
"req_ip": "192.156.3.23",
"req_frequency": 82
},
{
"req_ip": "192.156.3.24",
"req_frequency": 83
}
],
"宜昌公司": [
"随州分公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_frequency": 22
"req_ip": "192.116.3.24",
"req_frequency": 44
},
{
"req_ip": "192.116.3.25",
"req_frequency": 45
},
{
"req_ip": "192.116.3.26",
"req_frequency": 46
},
{
"req_ip": "192.116.3.27",
"req_frequency": 47
},
{
"req_ip": "192.116.3.28",
"req_frequency": 48
},
{
"req_ip": "192.116.3.29",
"req_frequency": 49
},
{
"req_ip": "192.116.3.30",
"req_frequency": 50
}
],
"孝感分公司": [
{
"req_ip": "192.126.3.24",
"req_frequency": 54
},
{
"req_ip": "192.126.3.25",
"req_frequency": 55
},
{
"req_ip": "192.126.3.26",
"req_frequency": 56
},
{
"req_ip": "192.126.3.27",
"req_frequency": 57
},
{
"req_ip": "192.126.3.28",
"req_frequency": 58
},
{
"req_ip": "192.126.3.29",
"req_frequency": 59
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
"req_ip": "192.106.3.30",
"req_frequency": 60
}
],
"黄冈分公司": [
{
"req_ip": "192.106.3.30",
"req_frequency": 51
},
{
"req_ip": "192.106.3.31",
"req_frequency": 52
},
{
"req_ip": "192.106.3.32",
"req_frequency": 53
},
{
"req_ip": "192.106.3.33",
"req_frequency": 54
},
{
"req_ip": "192.106.3.34",
"req_frequency": 55
},
{
"req_ip": "192.106.3.35",
"req_frequency": 56
},
{
"req_ip": "192.106.3.36",
"req_frequency": 57
},
{
"req_ip": "192.106.3.37",
"req_frequency": 58
},
{
"req_ip": "192.106.3.38",
"req_frequency": 59
}
],
"省公司": [
{
"req_ip": "192.146.3.38",
"req_frequency": 123
},
{
"req_ip": "192.146.3.39",
"req_frequency": 124
},
{
"req_ip": "192.146.3.40",
"req_frequency": 125
},
{
"req_ip": "192.146.3.41",
"req_frequency": 126
},
{
"req_ip": "192.146.3.42",
"req_frequency": 127
},
{
"req_ip": "192.146.3.43",
"req_frequency": 128
},
{
"req_ip": "192.146.3.44",
"req_frequency": 129
},
{
"req_ip": "192.146.3.45",
"req_frequency": 130
},
{
"req_ip": "192.146.3.46",
"req_frequency": 131
},
{
"req_ip": "192.146.3.47",
"req_frequency": 132
}
]
},
"account": {
"湖北公司": [
"宜昌分公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
"req_account": "huqx",
"req_frequency": 33,
"req_jobnum": 54412
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
"req_account": "zhangsf",
"req_frequency": 34,
"req_jobnum": 54413
},
{
"req_account": "zhaoj",
"req_frequency": 35,
"req_jobnum": 54414
}
],
"宜昌公司": [
"随州分公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
"req_account": "sangdq",
"req_frequency": 36,
"req_jobnum": 54415
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
"req_account": "hujt",
"req_frequency": 37,
"req_jobnum": 54416
}
],
"孝感分公司": [
{
"req_account": "zhangs",
"req_frequency": 98,
"req_jobnum": 43325
},
{
"req_account": "lin",
"req_frequency": 43,
"req_jobnum": 43326
},
{
"req_account": "liuhr",
"req_frequency": 33,
"req_jobnum": 43327
},
{
"req_account": "sunxq01",
"req_frequency": 51,
"req_jobnum": 43328
}
],
"黄冈分公司": [
{
"req_account": "shicl",
"req_frequency": 47,
"req_jobnum": 65341
},
{
"req_account": "gongxs",
"req_frequency": 65,
"req_jobnum": 65342
},
{
"req_account": "sunzs",
"req_frequency": 54,
"req_jobnum": 65343
}
],
"省公司": [
{
"req_account": "maoxt",
"req_frequency": 37,
"req_jobnum": 98761
},
{
"req_account": "xiaod01",
"req_frequency": 29,
"req_jobnum": 98761
},
{
"req_account": "qingsx",
"req_frequency": 71,
"req_jobnum": 98761
},
{
"req_account": "guobg",
"req_frequency": 79,
"req_jobnum": 98761
}
]
},
"interface": {
"接口1": [
"http://190.89.233.2:8909/getUser": [
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 23,
"req_ip": "192.156.3.12",
"req_account": "zhangq",
"req_jobnum": 54411
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 24,
"req_ip": "192.156.3.12",
"req_account": "huqx",
"req_jobnum": 54412
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 25,
"req_ip": "192.156.3.13",
"req_account": "zhangsf",
"req_jobnum": 54413
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 26,
"req_ip": "192.156.3.14",
"req_account": "zhaoj",
"req_jobnum": 54414
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 27,
"req_ip": "192.156.3.15",
"req_account": "sangdq",
"req_jobnum": 54415
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 28,
"req_ip": "192.156.3.16",
"req_account": "hujt",
"req_jobnum": 54416
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 29,
"req_ip": "192.156.3.17",
"req_account": "zhangs",
"req_jobnum": 43325
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 30,
"req_ip": "192.156.3.18",
"req_account": "lin",
"req_jobnum": 43326
}
],
"接口2": [
"http://190.89.233.2:8909/getpublicconfig": [
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 43,
"req_ip": "192.156.3.12",
"req_account": "liuhr",
"req_jobnum": 43327
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 44,
"req_ip": "192.156.3.12",
"req_account": "sunxq01",
"req_jobnum": 43328
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 45,
"req_ip": "192.156.3.18",
"req_account": "shicl",
"req_jobnum": 65341
},
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 46,
"req_ip": "192.106.3.33",
"req_account": "gongxs",
"req_jobnum": 65342
},
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 47,
"req_ip": "192.106.3.34",
"req_account": "sunzs",
"req_jobnum": 65343
}
],
"http://190.89.233.2:8909/update/sysconfig": [
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 34,
"req_ip": "192.106.3.35",
"req_account": "zhangsf",
"req_jobnum": 54415
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 23,
"req_ip": "192.106.3.36",
"req_account": "zhaoj",
"req_jobnum": 54416
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 78,
"req_ip": "192.106.3.37",
"req_account": "sangdq",
"req_jobnum": 43325
},
{
"interface_addr": "http://190.89.233.2:8910/update/sysconfig",
"req_frequency": 79,
"req_ip": "192.146.3.38",
"req_account": "hujt",
"req_jobnum": 43326
},
{
"interface_addr": "http://190.89.233.2:8911/update/sysconfig",
"req_frequency": 80,
"req_ip": "192.146.3.39",
"req_account": "zhangs",
"req_jobnum": 43327
},
{
"interface_addr": "http://190.89.233.2:8912/update/sysconfig",
"req_frequency": 81,
"req_ip": "192.146.3.40",
"req_account": "lin",
"req_jobnum": 43328
},
{
"interface_addr": "http://190.89.233.2:8913/update/sysconfig",
"req_frequency": 82,
"req_ip": "192.146.3.41",
"req_account": "liuhr",
"req_jobnum": 65341
},
{
"interface_addr": "http://190.89.233.2:8914/update/sysconfig",
"req_frequency": 83,
"req_ip": "192.146.3.42",
"req_account": "sunxq01",
"req_jobnum": 65342
},
{
"interface_addr": "http://190.89.233.2:8915/update/sysconfig",
"req_frequency": 84,
"req_ip": "192.146.3.43",
"req_account": "xiaod01",
"req_jobnum": 65343
},
{
"interface_addr": "http://190.89.233.2:8916/update/sysconfig",
"req_frequency": 85,
"req_ip": "192.146.3.44",
"req_account": "qingsx",
"req_jobnum": 98761
},
{
"interface_addr": "http://190.89.233.2:8917/update/sysconfig",
"req_frequency": 86,
"req_ip": "192.146.3.45",
"req_account": "guobg",
"req_jobnum": 98761
},
{
"interface_addr": "http://190.89.233.2:8918/update/sysconfig",
"req_frequency": 87,
"req_ip": "192.146.3.46",
"req_account": "zhangq",
"req_jobnum": 98761
}
]
},
"menu": {
"菜单1": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"menu_name": "菜单1",
"req_frequency": 53,
"req_ip": "192.106.3.32",
"req_account": "lin",
"req_jobnum": "43326"
},
{
"menu_name": "菜单1",
"req_frequency": 54,
"req_ip": "192.106.3.33",
"req_account": "liuhr",
"req_jobnum": "43327"
},
{
"menu_name": "菜单1",
"req_frequency": 55,
"req_ip": "192.106.3.34",
"req_account": "sunxq01",
"req_jobnum": "43328"
},
{
"menu_name": "菜单1",
"req_frequency": 56,
"req_ip": "192.106.3.35",
"req_account": "shicl",
"req_jobnum": "65341"
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"menu_name": "菜单1",
"req_frequency": 57,
"req_ip": "192.106.3.36",
"req_account": "gongxs",
"req_jobnum": "65342"
},
{
"menu_name": "菜单1",
"req_frequency": 58,
"req_ip": "192.106.3.37",
"req_account": "sunzs",
"req_jobnum": "65343"
}
],
"菜单2": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"menu_name": "菜单2",
"req_frequency": 31,
"req_ip": "192.156.3.12",
"req_account": "zhangq",
"req_jobnum": "54411"
},
{
"menu_name": "菜单2",
"req_frequency": 32,
"req_ip": "192.156.3.12",
"req_account": "huqx",
"req_jobnum": "54412"
},
{
"menu_name": "菜单2",
"req_frequency": 33,
"req_ip": "192.156.3.13",
"req_account": "zhangsf",
"req_jobnum": "54413"
},
{
"menu_name": "菜单2",
"req_frequency": 34,
"req_ip": "192.156.3.14",
"req_account": "zhaoj",
"req_jobnum": "54414"
},
{
"menu_name": "菜单2",
"req_frequency": 35,
"req_ip": "192.156.3.15",
"req_account": "sangdq",
"req_jobnum": "54415"
},
{
"menu_name": "菜单2",
"req_frequency": 36,
"req_ip": "192.156.3.16",
"req_account": "hujt",
"req_jobnum": "54416"
},
{
"menu_name": "菜单2",
"req_frequency": 37,
"req_ip": "192.156.3.17",
"req_account": "zhangs",
"req_jobnum": "43325"
},
{
"menu_name": "菜单2",
"req_frequency": 38,
"req_ip": "192.156.3.18",
"req_account": "lin",
"req_jobnum": "43326"
},
{
"menu_name": "菜单2",
"req_frequency": 39,
"req_ip": "192.156.3.12",
"req_account": "liuhr",
"req_jobnum": "43327"
}
],
"菜单3": [
{
"menu_name": "菜单3",
"req_frequency": 51,
"req_ip": "192.106.3.33",
"req_account": "gongxs",
"req_jobnum": "65342"
},
{
"menu_name": "菜单3",
"req_frequency": 52,
"req_ip": "192.106.3.34",
"req_account": "sunzs",
"req_jobnum": "65343"
},
{
"menu_name": "菜单3",
"req_frequency": 53,
"req_ip": "192.106.3.35",
"req_account": "zhangsf",
"req_jobnum": "54415"
},
{
"menu_name": "菜单3",
"req_frequency": 54,
"req_ip": "192.106.3.36",
"req_account": "zhaoj",
"req_jobnum": "54416"
},
{
"menu_name": "菜单3",
"req_frequency": 55,
"req_ip": "192.106.3.37",
"req_account": "sangdq",
"req_jobnum": "43325"
},
{
"menu_name": "菜单3",
"req_frequency": 56,
"req_ip": "192.146.3.38",
"req_account": "hujt",
"req_jobnum": "43326"
},
{
"menu_name": "菜单3",
"req_frequency": 57,
"req_ip": "192.146.3.39",
"req_account": "zhangs",
"req_jobnum": "43327"
},
{
"menu_name": "菜单3",
"req_frequency": 58,
"req_ip": "192.146.3.40",
"req_account": "lin",
"req_jobnum": "43328"
},
{
"menu_name": "菜单3",
"req_frequency": 59,
"req_ip": "192.146.3.41",
"req_account": "liuhr",
"req_jobnum": "65341"
},
{
"menu_name": "菜单3",
"req_frequency": 60,
"req_ip": "192.146.3.42",
"req_account": "sunxq01",
"req_jobnum": "65342"
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
"menu_name": "菜单3",
"req_frequency": 61,
"req_ip": "192.146.3.43",
"req_account": "xiaod01",
"req_jobnum": "65343"
}
]
}

@ -0,0 +1,221 @@
{
"summary": {
"ip": [
{
"company": "孝感分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "宜昌分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "随州分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "黄冈分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "省公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
}
],
"account": [
{
"company": "湖北公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_rate": 0.3,
"account_avg": 0.43,
"trend": 0.3
},
{
"company": "宜昌公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_rate": 0.3,
"account_avg": 0.43,
"trend": 0.3
}
],
"interface": [
{
"interface_addr": "/getuser",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
},
{
"interface_addr": "/getcpminfo",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
}
],
"menu": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
}
]
},
"detail": {
"ip": {
"湖北公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_frequency": 22
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
}
],
"宜昌公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_frequency": 22
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
}
]
},
"account": {
"湖北公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
}
],
"宜昌公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
}
]
},
"interface": {
"接口1": [
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
],
"接口2": [
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
]
},
"menu": {
"菜单1": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
],
"菜单2": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
]
}
}
}

@ -1,8 +1,8 @@
{
"name": "UebaMetricsAnalysis",
"name": "uebaMetricsAnalysis",
"version": "V3.0R01F00",
"menuurl": "/UebaMetricsAnalysis",
"menuregx": "^/UebaMetricsAnalysis",
"menuurl": "/uebaMetricsAnalysis",
"menuregx": "^/uebaMetricsAnalysis",
"menuname": "指标晾晒统计",
"summary": "指标晾晒统计",
"platform_mode": "simple_mode",

@ -1,16 +1,16 @@
{
"name": "指标晾晒统计",
"key": "UebaMetricsAnalysis",
"prefix": "UebaMetricsAnalysis",
"key": "uebaMetricsAnalysis",
"prefix": "uebaMetricsAnalysis",
"allfix": [
"^/UebaMetricsAnalysis"
"^/uebaMetricsAnalysis"
],
"fix": [
"^/UebaMetricsAnalysis"
"^/uebaMetricsAnalysis"
],
"link": "/WebApi/UebaMetricsAnalysis/static/dist/#/",
"link": "/WebApi/uebaMetricsAnalysis/static/dist/#/",
"pinyin": "zhibiaoliangshaitongji",
"app_name": "UebaMetricsAnalysis",
"app_name": "uebaMetricsAnalysis",
"children": [
],
@ -18,6 +18,6 @@
1
],
"role_menu_register":{
"指标晾晒统计": "UebaMetricsAnalysis"
"指标晾晒统计": "uebaMetricsAnalysis"
}
}

@ -1,18 +0,0 @@
#encoding=utf-8
import json
from isoc.utils.esUtil import EsUtil
def createIndex():
map={
"field1": "text",
"field2": "text"
}
es_util_instance = EsUtil()
res = es_util_instance.create_index_simple("bsa_traffic*",3,scroll_search)
return res
res = createIndex()
print(res)

@ -1,292 +0,0 @@
#encoding=utf-8
import json
import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
from esUtil import EsUtil
import pytz
size = 1000# 可以根据实际情况调整
##01 创建索引
def createIndex(index):
map={
"data_type":"keyword",
"req_account":"keyword",
"req_frequency":"integer",
"req_jobnum":"keyword",
"interface_addr":"keyword",
"req_ip":"ip",
"menu_name":"keyword",
"date_time":"date"
}
es_util_instance = EsUtil()
reqs = es_util_instance.is_index_exist(index)
if reqs =="false":
try:
res = es_util_instance.create_index_simple(index,map)
except Exception,e:
print e.message
## IP维度
def get_ip_group_data(index,startTime,endTime):
try:
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"sip": { "terms": {"field": "sip"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try:
response = es_util_instance.search(index,query_body)
except Exception,e:
print "err"
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": "ip",
"req_account": "",
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":bucket['key']['sip'] ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas
## 账号维度
def get_account_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"account": { "terms": {"field": "account"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "account",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":"0.0.0.0" ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def get_interface_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "interface",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": bucket['key']['interface'] ,
"req_ip":bucket['key']['sip'],
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def get_menu_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "menu",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "" ,
"req_ip":bucket['key']['sip'],
"menu_name": bucket['key']['worm_family'],
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
##03 数据写入
def data_insert(index,data):
es_util_instance = EsUtil()
response = es_util_instance.bulk_insert(index,data)
return response
def clean_data(write_index,read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end)
print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end)
print "data_ip:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end)
print "data_ip:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end)
print "data_ip:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
response = data_insert(write_index,res_data)
print json.dumps(response)
#入口
def entry(write_index,read_index,start,end):
createIndex(write_index)
clean_data(write_index,read_index,start,end)
#前一天的0点0分0秒
def get_start_end_time(hour,minute,second):
# 获取当前日期时间
now = datetime.now()
# 计算昨天的日期时间
yesterday = now - timedelta(days=1)
# 将时间部分设为 00:00:00
yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0)
# 使用 pytz 来获取 UTC 时区对象
utc = pytz.utc
# 将时间对象本地化为 UTC 时区
yesterday_midnight_utc = utc.localize(yesterday_midnight)
# 格式化为带时区的字符串(ISO 8601格式)
formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_date
def index():
try:
#写入的索引 按月创建,注意跨天的场景
write_index= "b_ueba_2024_07"
read_index ="bsa_traffic*"
#任务执行时间是每天 凌晨12点
#查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒
start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0)
end = get_start_end_time(23,59,59)
print start +":"+ end
entry(write_index,read_index,start,end)
except Exception ,e:
print "定时任务执行失败:"+traceback.format_exc()
# logger.error("定时任务执行失败:".format(str(e), traceback.format_exc()))
index()

@ -5,10 +5,11 @@ import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
import codecs
from esUtil import EsUtil
import pytz
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
size = 1000# 可以根据实际情况调整
size = 9999#根据实际情况调整
DATA_TYPE = {
"IP": 1,
@ -19,11 +20,19 @@ DATA_TYPE = {
## IP维度
def get_ip_group_data(index,startTime,endTime):
try:
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
@ -43,10 +52,7 @@ def get_ip_group_data(index,startTime,endTime):
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try:
response = es_util_instance.search(index,query_body)
except Exception,e:
print "err"
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": DATA_TYPE.get("IP"),
@ -56,13 +62,9 @@ def get_ip_group_data(index,startTime,endTime):
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas
## 账号维度
@ -70,7 +72,16 @@ def get_account_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
@ -92,7 +103,6 @@ def get_account_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": DATA_TYPE.get("ACCOUNT"),
"account": bucket['key']['account'],
@ -114,7 +124,16 @@ def get_interface_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
@ -124,7 +143,7 @@ def get_interface_group_data(index,startTime,endTime):
{"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
@ -138,7 +157,6 @@ def get_interface_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": DATA_TYPE.get("INTERFACE"),
"account": bucket['key']['account'],
@ -162,7 +180,16 @@ def get_menu_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
"bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
},
"aggs": {
"composite_buckets": {
@ -172,7 +199,7 @@ def get_menu_group_data(index,startTime,endTime):
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
@ -186,7 +213,6 @@ def get_menu_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": DATA_TYPE.get("MENU"),
"account": bucket['key']['account'],
@ -204,24 +230,48 @@ def get_menu_group_data(index,startTime,endTime):
return datas
def datetime_to_timestamp(dt):
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end):
dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end,jobid):
data_ip = get_ip_group_data(read_index,start,end)
# print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end)
# print "data_account:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end)
# print "data_interface:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end)
# print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
print ("resdata:"+json.dumps(res_data))
if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")
return
logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据")
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
#todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid)
#读取大文件
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
#写入大文件
def write_large_file(filename, data_list, chunk_size=1024*1024*5):
with codecs.open(filename, 'w', encoding='utf-8') as f:
for i in range(0, len(data_list), chunk_size):
chunk = data_list[i:i + chunk_size]
f.write(chunk)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid):
ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr)
accountGroupStr = "account,jobnum"
@ -231,7 +281,6 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
menuGroupStr = "menu,ip,account,jobnum"
menuGroup = group_and_sum(data_menu, menuGroupStr)
data = {}
data["ip"] = ipGroup
data["account"] = accountGroup
@ -239,34 +288,31 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
data["menu"] = menuGroup
# 获取当前工作目录
current_dir = os.getcwd()
# current_dir = os.getcwd()
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
date_time = convert_utc_to_local_time(start)
date_str = time.strftime('%Y-%m-%d', date_time)
file_path = os.path.join(base_path,date_str + '.json')
file_path = os.path.join(current_dir, 'files/' + date_str + '.json')
logger_cron.info("JOB:"+jobid+", 写入文件路径"+file_path)
all_data = [data]
logger_cron.info("JOB: "+jobid+",准备读取已有文件")
if os.path.exists(file_path):
# 打开文件并读取内容
with codecs.open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
old_json_data = json.loads(content)
old_json_data =read_large_json_file(file_path)
all_data = [data, old_json_data]
logger_cron.info("JOB:"+jobid+", 读取已有文件完成")
merged_data = merge_data(all_data)
# 使用codecs模块以UTF-8编码打开文件
f = codecs.open(file_path, 'w', encoding='utf-8')
json_data = json.dumps(merged_data)
# 写入Unicode字符串
f.write(json_data)
# 关闭文件
f.close()
#写入文件
write_large_file(file_path,json_data)
logger_cron.info("JOB: "+jobid+",写入文件完成")
def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表
by_fields_list = by_fields.split(',')
@ -361,28 +407,33 @@ def convert_utc_to_local_time(timestamp):
return time_struct_beijing
#入口
def entry(start,end):
def entry(start,end,jobid):
base_index ="bsa_traffic*"
es_util_instance = EsUtil()
# start = datetime_to_timestamp(start)
# end = datetime_to_timestamp(end)
start = datetime_to_timestamp(start)
end = datetime_to_timestamp(end)
logger_cron.info("JOB:"+jobid+",start为"+str(start))
logger_cron.info("JOB:"+jobid+",end为"+str(end))
res=es_util_instance.get_available_index_name(start,end,base_index)
print "xxxx:"+str(len(res))
logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res))
if len(res)==0:
return
index =",".join(res)
clean_data(index,start,end)
start = 1720772586000
end = 1720776186000
# # 将 datetime 对象转换为秒级时间戳
# timestamp_seconds = time.mktime(dt.timetuple())
# # 获取微秒数
# microseconds = dt.microsecond
# # 转换为毫秒级时间戳
# timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
entry(start,end)
clean_data(index,start,end,jobid)
# start = '2024-07-18 15:20:31'
# end = '2024-07-18 15:25:31'
# date_format = "%Y-%m-%d %H:%M:%S"
# date_str = datetime.strptime(start, date_format)
# end_str = datetime.strptime(end, date_format)
# # # 将 datetime 对象转换为秒级时间戳
# # timestamp_seconds = time.mktime(dt.timetuple())
# # # 获取微秒数
# # microseconds = dt.microsecond
# # # 转换为毫秒级时间戳
# # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
# entry(date_str,end_str,"xxxxxxxxxxxxxxxxxxx")

@ -1,281 +0,0 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import json
import os,re
import codecs
import traceback
from isoc.utils.esUtil import EsUtil
from dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
interface_summary_data_format, menu_summary_data_format
from ext_logging import logger
## IP维度
def es_get_ip_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "ip" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"ip":bucket['key']['req_ip'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 账号维度
def es_get_account_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "account" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def es_get_menu_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "menu" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "menu_name": { "terms": { "field": "menu_name" } } },
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"menu":bucket['key']['menu_name'],
"ip":bucket['key']['req_ip'],
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def es_get_interface_group_data(index,startTime,endTime):
page_size = 9999 #可以根据实际情况调整
query_body={
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "interface" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"group_by_menu": {
"composite": {
"size" : page_size,
"sources": [
{ "interface_addr": { "terms": { "field": "interface_addr" } } },
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"interface":bucket['key']['interface_addr'],
"ip":bucket['key']['req_ip'],
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
def entry(data_type,start,end):
base_index = 'c_ueba_001'
# es_util_instance = EsUtil()
# res=es_util_instance.get_available_index_name(start,end,base_index)
# if len(res)==0:
# return
# index =",".join(res)
index=base_index
try:
data = {}
if data_type == "1":
ip_summary_data = es_get_ip_group_data(index, start, end)
data = ip_summary_data_format(ip_summary_data)
if data_type == "2":
account_summary_data = es_get_account_group_data(index, start, end)
data = account_summary_data_format(account_summary_data)
if data_type == "3":
interface_summary_data = es_get_interface_group_data(index, start, end)
data = interface_summary_data_format(interface_summary_data)
if data_type == "4":
menu_summary_data = es_get_menu_group_data(index, start, end)
data = menu_summary_data_format(menu_summary_data)
return data
except Exception, e:
logger.error(traceback.format_exc())
raise e

@ -31,7 +31,12 @@ def keep_digits_filter(code):
"""
return ''.join(filter(str.isdigit, str(code)))
#安全除
def safe_divide(numerator, denominator):
if denominator == 0:
return
else:
return numerator / denominator
def find_region_by_code(code, region_dict):
"""
查询工号对应公司
@ -72,7 +77,6 @@ def ip_summary_data_format(ip_summary_data):
# 统计总请求次数和独立IP数
reqs_total = sum(data["reqs"] for data in grouped_data.values())
# 请求为0抛出
if reqs_total == 0:
return result
@ -88,7 +92,7 @@ def ip_summary_data_format(ip_summary_data):
# 本公司的 ip个数/所有公司 ip个数的合计
"ip_rate": round(len(data["ips"]) / len(ips_total), 4),
# 本公司的 请求次数/本公司 ip个数的合计
"ip_avg": data["reqs"] // len(data["ips"]),
"ip_avg": safe_divide(data["reqs"],len(data["ips"])),
}
for company, data in grouped_data.items()
]
@ -146,7 +150,7 @@ def account_summary_data_format(account_summary_data):
# 本公司的 账号次数/所有公司 账号次数的合计
"account_rate": round(len(data["accounts"]) / len(accounts_total), 4),
# 本公司的 请求次数/本公司 账号次数的合计
"account_avg": data["reqs"] // len(data["accounts"]),
"account_avg": safe_divide(data["reqs"],len(data["accounts"])),
}
for company, data in grouped_data.items()
]
@ -199,9 +203,9 @@ def interface_summary_data_format(interface_summary_data):
# 本接口的 请求次数/所有接口 请求次数的合计
"frequency_rate": round(data["reqs"] / reqs_total, 4),
# 本接口的 请求次数/ 20 查询top20接口
"frequency_avg": data["reqs"] // 20,
"frequency_avg": safe_divide(data["reqs"],20),
}
for company, data in grouped_data.items()
for interface, data in grouped_data.items()
]
result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)
@ -257,9 +261,9 @@ def menu_summary_data_format(menu_summary_data):
# 本菜单的 请求次数 /所有菜单 请求次数的合计
"frequency_rate": round(data["reqs"] / reqs_total, 4),
# 本菜单的 请求次数 /所有菜单 个数的合计
"frequency_avg": data["reqs"] // len(menu_total),
"frequency_avg": safe_divide(data["reqs"],len(menu_total)),
}
for company, data in grouped_data.items()
for menu, data in grouped_data.items()
]
result["summary"]["menu"] = sorted(menu_data_list, key=lambda x: x["req_frequency"], reverse=True)
@ -269,13 +273,17 @@ def menu_summary_data_format(menu_summary_data):
return result
#调整时间
def adjust_times(start_time, end_time):
# 计算开始时间和结束时间之间的天数差
delta_days = (end_time - start_time).days
# 从开始和结束时间各自减去这个天数差
adjusted_start_time = start_time - timedelta(days=delta_days)
adjusted_end_time = end_time - timedelta(days=delta_days)
start_time = datetime.strptime(start_time, "%Y-%m-%d")
end_time = datetime.strptime(end_time, "%Y-%m-%d")
return adjusted_start_time, adjusted_end_time
delta_days = (end_time - start_time).days
if delta_days == 0:
pre_date = start_time-timedelta(1)
pre_date = start_time-timedelta(1)
return pre_date.strftime("%Y-%m-%d"),pre_date.strftime("%Y-%m-%d")
if delta_days > 0:
pre_start_date = start_time-timedelta(delta_days+1)
pre_end_date = end_time-timedelta(delta_days+1)
return pre_start_date.strftime("%Y-%m-%d"),pre_end_date.strftime("%Y-%m-%d")
return start_time, end_time

@ -1,7 +1,7 @@
#!/usr/bin/python
# encoding=utf-8
# author: tangwy
from __future__ import division
import json
import os, re
import codecs
@ -12,10 +12,9 @@ from dashboard_data_conversion import ip_summary_data_format, account_summary_da
interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
from ext_logging import logger
TABLE_NAME = "ueba_logs"
TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = {
"IP": 1,
@ -24,7 +23,6 @@ DATA_TYPE = {
"MENU": 4,
}
def pg_get_ip_group_data(startTime, endTime):
"""
IP维度查询
@ -115,7 +113,27 @@ def pg_get_menu_group_data(startTime, endTime):
return result
def pg_get_previous_company_count(startTime, endTime, data_type):
def pg_get_account_previous_company_count(startTime, endTime, data_type):
"""
账号维度查询菜请求次数
:param startTime: 开始时间,
:param endTime: 结束时间,
:param data_type: 公司聚合类型 ACCOUNT or IP ,
"""
result = defaultdict(int)
if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type]
sql = """ select jobnum, account,sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res:
for item in res:
company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[2]
return result
def pg_get_ip_previous_company_count(startTime, endTime, data_type):
"""
账号维度查询菜请求次数
:param startTime: 开始时间,
@ -125,14 +143,14 @@ def pg_get_previous_company_count(startTime, endTime, data_type):
result = defaultdict(int)
if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type]
sql = """ select jobnum, sum(count) from {TABLE_NAME}
sql = """ select jobnum,ip, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum""".format(TABLE_NAME=TABLE_NAME)
group by jobnum,ip """.format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res:
for item in res:
company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[1]
result[company] += item[2]
return result
def pg_get_previous_interface_count(startTime, endTime):
@ -142,13 +160,13 @@ def pg_get_previous_interface_count(startTime, endTime):
:param endTime: 结束时间,
"""
result = defaultdict(int)
sql = """ select interface, sum(count) from {TABLE_NAME}
sql = """ select interface, ip, jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by interface""".format(TABLE_NAME=TABLE_NAME)
group by interface, ip, jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res:
for item in res:
result[item[0]] += item[1]
result[item[0]] += item[4]
return result
@ -159,13 +177,13 @@ def pg_get_previous_menu_count(startTime, endTime):
:param endTime: 结束时间,
"""
result = defaultdict(int)
sql = """ select menu, sum(count) from {TABLE_NAME}
sql = """ select menu, ip,jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by menu""".format(TABLE_NAME=TABLE_NAME)
group by menu, ip,jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res:
for item in res:
result[item[0]] += item[1]
result[item[0]] += item[4]
return result
def entry(data_type, start, end):
@ -174,17 +192,25 @@ def entry(data_type, start, end):
start = datetime.strptime(start, date_format)
end = datetime.strptime(end, date_format)
old_start,old_end = adjust_times(start, end)
start = start.strftime('%Y-%m-%d')
end = end.strftime('%Y-%m-%d')
old_start,old_end = adjust_times(start, end)
print (old_start)
print (old_end)
data = {}
if data_type == "1":
ip_summary_data = pg_get_ip_group_data(start, end)
data = ip_summary_data_format(ip_summary_data)
previous_company_dict = pg_get_previous_company_count(old_start, start, "IP")
previous_company_dict = pg_get_ip_previous_company_count(old_start, old_end, "IP")
for d in data["summary"]["ip"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
print("xxx")
else:
print("dddd:"+str(d["req_frequency"]))
print(previous_company_dict.get(d["company"], 0))
d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4)
@ -193,7 +219,7 @@ def entry(data_type, start, end):
account_summary_data = pg_get_account_group_data(start, end)
data = account_summary_data_format(account_summary_data)
previous_company_dict = pg_get_previous_company_count(old_start, start, "ACCOUNT")
previous_company_dict = pg_get_account_previous_company_count(old_start, start, "ACCOUNT")
for d in data["summary"]["account"]:
if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0
@ -229,3 +255,6 @@ def entry(data_type, start, end):
d["menu_name"], 0), 4)
return data
# res = entry("1","2024-07-17 00:00:00","2024-07-17 23:59:59")
# # print res
# logger.info(json.dumps(res))

@ -9,20 +9,18 @@ import json
import traceback
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
from datetime import datetime,timedelta,time
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
from uebaMetricsAnalysis.utils.ext_logging import logger
from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron
class DBType(object):
LIST = 'list'
DICT = 'dict'
JOB_TABLE_NAME = "ueba_jobs"
ANALYSIS_TABLE_NAME = "ueba_analysis_log"
JOB_TABLE_NAME = "ueba_analysis_schema.jobs"
class DBUtils(object):
@classmethod
@ -77,7 +75,7 @@ class DBUtils(object):
"""
try:
sql_list = CPgSqlParam(sql)
logger.info("execute sql:"+sql)
#logger.info("execute sql:"+sql)
data = CFunction.execute(sql_list)
logger.info("execute result : {}".format(data))
return json.loads(data)
@ -103,7 +101,7 @@ class DBUtils(object):
return camel_list
@classmethod
def write_job_status(self,job_id,status,err):
def write_job_status(self,job_id,status,err,run_count):
#success
if status == 2:
sql = """update {JOB_TABLE_NAME} set status=%s ,complate_time = %s
@ -111,48 +109,67 @@ class DBUtils(object):
CFunction.execute(CPgSqlParam(sql, params=(status,datetime.now(), job_id)))
#failed
if status == 3:
sql = """update {JOB_TABLE_NAME} set status=%s, err=%s
sql = """update {JOB_TABLE_NAME} set status=%s, err=%s ,run_count = %s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id)))
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id,run_count)))
@classmethod
def insert_job_record(self,job_id,start_time,end_time,status):
sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status) values(%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status)))
sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status,run_count) values(%s,%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status,1)))
#获取 job的执行时间 开始时间-结束时间
@classmethod
def get_job_period(self):
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time,status,run_count,to_char(start_time,'YYYY-MM-DD HH24:MI:SS') as start_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=())))
print json.dumps(res)
data = {}
if res:
data["job_id"]=res[0][0]
data["end_time"]=res[0][1]
fields=["job_id", "end_time"]
#data = DBUtils.transition(fields, sql, DBType.LIST)
data["status"]=res[0][2]
data["run_count"]=res[0][3]
data["start_time"]=res[0][4]
if len(data)==0:
start_time = datetime.now() - timedelta(minutes=5)
end_time = datetime.now()
start_time = datetime.now() - timedelta(minutes=20)
end_time = datetime.now()- timedelta(minutes=15)
return start_time,end_time,2,0,""
if len(data)>0:
#运行中
if data["status"] ==1:
return None,None, 1,data["run_count"],data["job_id"]
#运行失败 重试不超过3次
if data["status"] ==3 and data["run_count"]<=3:
start_time = data["start_time"]
end_time = data["end_time"]
return start_time,end_time,3,data["run_count"],data["job_id"]
start_time = data["end_time"]
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
end_time = data["end_time"]
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + timedelta(minutes=5)
if end_time > datetime.now():
return None,None
# 检查时间是否为23:59:59
if start_time.hour == 23 and start_time.minute == 59 and start_time.second == 59:
# 是的话,增加一天并设置时间为00:00:00
start_time = start_time + timedelta(days=1)
start_time = start_time.replace(hour=0, minute=0, second=0)
end_time = start_time + timedelta(minutes=5)
#kafka数据到es会存在5分钟左右的数据延迟这里设置15分钟
if end_time > (datetime.now()-timedelta(minutes=15)):
logger_cron.info("end_time:"+end_time.strftime("%Y-%m-%d %H:%M:%S")+",datetime.now:"+datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return None,None,2,999,""
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#需要在入口生成jobid 所以给空
return start_time,end_time,data["status"],data["run_count"],""
@classmethod
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
end_time = datetime.combine(start_time.date(), time(23, 59, 59))
return start_time, end_time
# if __name__ == '__main__':
# DBUtils.get_job_period()
if __name__ == '__main__':
start,end = DBUtils.get_job_period()
print ( "job:运行参数:{},{}".format(start,end))

@ -1,90 +0,0 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import json
import os,re
import codecs
import csv
import ConfigParser
from isoc.utils.esUtil import EsUtil
print json.dumps(es_host_list)
# conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf')
# ini_path = os.path.join(conf_path, 'conf.ini')
# config = ConfigParser.ConfigParser()
# config.read(ini_path)
# ES_HOST = config.get('COMMON', 'es_host')
# ES_PER_COUNT = config.get('COMMON', 'es_per_count')
# ES_INDEX_NAME = config.get('COMMON', 'es_index_name')
# CSV_FILE_PATH = config.get('COMMON', 'csv_file_path')
def createIndex():
es = Elasticsearch(es_host_list)
es.create(index="urba_analyse_2024_06", ignore=400)
map={
"ip1": "text",
"ip2": "text",
"ip3": "text",
"ip4": "text",
}
es_instance = EsUtil()
res = es_instance.create_index_simple("urba_analyse_2024_06")
return res
# def generate_ip_range(start_ip, end_ip):
# start_parts = list(map(int, start_ip.split('.')))
# end_parts = list(map(int, end_ip.split('.')))
# ip_range = []
# while start_parts < end_parts:
# ip_range.append('.'.join(map(str, start_parts)))
# start_parts[3] += 1
# for i in range(3, 0, -1):
# if start_parts[i] == 256:
# start_parts[i] = 0
# start_parts[i-1] += 1
# ip_range.append('.'.join(map(str, start_parts))) # 添加结束IP地址
# return ip_range
# # scroll查询数据
# def get_ip_summary_data(start_time,end_time,query_body):
# es = Elasticsearch(ES_HOST)
# msg = es.search(index=ES_INDEX_NAME,scroll="3m",size=ES_PER_COUNT,_source_includes= ["cookies","url","sip","dip"], query=query_body)
# result = msg['hits']['hits']
# total = msg['hits']['total']
# scroll_id = msg['_scroll_id']
# for i in range(0,int(total["value"]/ES_PER_COUNT)+1):
# query_scroll = es.scroll(scroll_id=scroll_id, scroll='3m')["hits"]["hits"]
# result += query_scroll
# return result
# # 读取csv文件 获取ip归属地
# def get_ip_area_relation(csv_file_path):
# iprange_map = {}
# with codecs.open(csv_file_path, mode='r',encoding='utf-8') as file:
# csv_reader = csv.reader(file)
# for row in csv_reader:
# headers = next(csv_reader)
# ip_start = headers[0]
# ip_end = headers[1]
# ip_range = generate_ip_range(ip_start, ip_end)
# ip_area = headers[5]
# print (ip_area)
# for ip in ip_range:
# iprange_map[ip] = ip_area
# return iprange_map
# get_ip_area_relation("/tmp/data/ip_area_relation.csv")

@ -12,13 +12,13 @@ from appsUtils import env
APPFOLDERNAME = 'uebaMetricsAnalysis'
APP_CRON_FOLDERNAME = 'uebaMetricsAnalysis_cron'
def get_clean_files():
def get_clean_file_path():
fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files"
if not os.path.exists(fileroot):
os.mkdir(fileroot)
return fileroot
def get_logger(logfile):
"""
@ -43,3 +43,4 @@ def get_logger(logfile):
logger = get_logger(APPFOLDERNAME)
logger_cron = get_logger(APP_CRON_FOLDERNAME)

@ -0,0 +1,205 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import re,os,json
import codecs
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$')
LOG_TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
# 获取当前日期并格式化为"年-月"
def get_current_year_month():
now = datetime.now()
return now.strftime("%Y_%m")
# 获取当前月份的第一天并格式化为"年-月-日"
def get_first_day_of_current_month():
now = datetime.now()
first_day = now.replace(day=1)
return first_day.strftime("%Y-%m-%d")
# 获取当前日期,然后计算下个月的第一天
def get_first_day_of_next_month():
now = datetime.now()
if now.month == 12:
next_month = now.replace(year=now.year+1, month=1, day=1)
else:
next_month = now.replace(month=now.month+1, day=1)
return next_month.strftime("%Y-%m-%d")
#获取表名
def get_table_name():
year_month = get_current_year_month()
return LOG_TABLE_NAME+'_'+ year_month
#获取表区间
def get_table_data_range():
start= get_first_day_of_current_month()
end = get_first_day_of_next_month()
return start,end
#创建分区表
def create_fq_table():
table_name = get_table_name()
start,end = get_table_data_range()
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end)
CFunction.execute(CPgSqlParam(sql))
logger_cron.info("INSERT:创建分区表完成")
#读取大文件
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
def get_all_files(path):
# 列出所有包含匹配模式的文件名
files = []
for filename in os.listdir(path):
if date_pattern.search(filename):
if datetime.now().strftime("%Y-%m-%d")+".json" != filename:
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
def insert_data(files):
for itemFile in files:
if os.path.exists(itemFile.get("path",'')):
data =read_large_json_file(itemFile.get("path",''))
logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path',''))
logger_cron.info("INSERT: 读取聚合文件完成")
ip_list = data.get('ip', [])
account_list = data.get('account', [])
interface_list = data.get('interface', [])
menu_list = data.get('menu', [])
logger_cron.info("INSERT: IP维度 " +str(len(ip_list)))
logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list)))
logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list)))
logger_cron.info("INSERT: MENU维度 " +str(len(menu_list)))
basename, extension = os.path.splitext(itemFile.get('filename', ''))
log_date = basename
print ("filename:"+log_date)
records = []
for item in ip_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
data_list=[]
for item in records:
data_list.append(item.get('menu', ''))
data_list.append(item.get('ip', ''))
data_list.append(item.get('account', ''))
data_list.append(item.get('jobnum', ''))
data_list.append(item.get('count', ''))
data_list.append(item.get('logdate', ''))
data_list.append(item.get('datatype', ''))
data_list.append(item.get('interface', ''))
sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface)
VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records]))
CFunction.execute(CPgSqlParam(sql, params=data_list))
logger_cron.info("INSERT: 数据插入完成")
#重命名文件
logger_cron.info(itemFile.get('path',''))
logger_cron.info("done_"+itemFile.get('filename', ''))
os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', ''))
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', ''))
def delete_files(directory_path):
"""
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件
其中日期部分早于10天以前
:param directory_path: 要检查的目录的绝对路径
"""
# 计算10天前的日期
ten_days_ago = datetime.now() - timedelta(days=10)
ten_days_ago_str = ten_days_ago.strftime('%Y-%m-%d')
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})')
# 遍历目录中的文件
for filename in os.listdir(directory_path):
match = date_pattern.search(filename)
if match:
file_date_str = match.group(1)
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
# 检查文件日期是否在10天前
if file_date <= ten_days_ago:
file_path = os.path.join(directory_path, filename)
os.remove(file_path)
logger_cron.info("INSERT: 删除文件"+file_path)
def entry():
base_path = get_clean_file_path()
files = get_all_files(base_path)
logger_cron.info("INSERT:获取文件数量"+str(len(files)))
#创建分区表
create_fq_table()
#数据入库
insert_data(files)
#删除文件
delete_files(base_path)

@ -20,7 +20,7 @@ class DashboardViewSets(viewsets.GenericViewSet):
def get_summary_data_list(self,request):
try:
data_type = request.GET.get('type')
startTime = request.GET.get('startDate')
startTime =request.GET.get('startDate')
endTime = request.GET.get('endDate')
#1:ip,2:账号,3:接口,4:菜单
logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime)

Loading…
Cancel
Save