Compare commits

..

2 Commits

Author SHA1 Message Date
TANGWY 315f1470ec '提交' 4 months ago
TANGWY b97f549d89 '代码提交' 4 months ago
  1. 144
      conf/defaultRule.json
  2. 78
      corn/ueba_corn_data_insert.py
  3. 56
      corn/ueba_corn_pg.py
  4. 142
      cron/log_alarm.py
  5. 0
      cron/ueba_cron.py
  6. 37
      cron/ueba_cron_data_insert.py
  7. 73
      cron/ueba_cron_pg.py
  8. 44
      files/2024-07-13.json
  9. 2
      install.py
  10. 21
      jobs/jobmeta.json
  11. 823
      mock/mock_data.json
  12. 221
      mock/mock_data2.json
  13. 6
      package.json
  14. 14
      right_config.json
  15. 18
      test.py
  16. 292
      utils/base_dataclean.py
  17. 179
      utils/base_dataclean_pg.py
  18. 281
      utils/dashboard_data.py
  19. 40
      utils/dashboard_data_conversion.py
  20. 63
      utils/dashboard_data_pg.py
  21. 69
      utils/db2json.py
  22. 90
      utils/es_operation.py
  23. 7
      utils/ext_logging.py
  24. 205
      utils/file_to_pg.py

@ -1,38 +1,150 @@
{ {
"white_list": { "white_list": {
"ip": [ "ip": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"account": [ "account": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"interface": [ "interface": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"menu": [ "menu": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
] ]
}, },
"grey_list": { "grey_list": {
"ip": [ "ip": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"account": [ "account": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"interface": [ "interface": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
], ],
"menu": [ "menu": [
400000, 510400,
400001 510401,
510402,
510405,
510406,
510407,
510404,
510403,
510030,
510031,
510009,
510008,
510004,
510408,
510410,
510409
] ]
} }
} }

@ -1,78 +0,0 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from commandCyberRange.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id():
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
#每5分钟执行一次
def processing(self):
try:
logger.info("job:开始执行")
start,end=self.get_job_period()
job_id =self.generate_job_id()
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
logger.info("job:"+"准备获取es数据")
#entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,56 +0,0 @@
# coding=utf-8
"""
@Author: tangwy
@FileName: user_cron_pg.py
@DateTime: 2024/7/09 14:19
@Description: 定时清洗es数据
"""
from __future__ import unicode_literals
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
def generate_job_id(self):
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#每5分钟执行一次
def processing(self):
job_id =self.generate_job_id()
try:
logger.info("job:开始执行")
start,end= DBUtils.get_job_period()
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
logger.info("job:"+"准备获取es数据")
entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc())
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise
if __name__ == '__main__':
UserCron().processing()

@ -0,0 +1,142 @@
# coding:utf-8
import sys
import uuid
import json
import time
import random
path = str(sys.path[0])
home_path = path.split("isop_uebaapiData")[0]
sys.path.append(home_path)
from isop_uebaapiData.util import send_logs
def alarm(cookies, api):
"""2、HTTP日志"""
inputstr = '''[{"msgtype":1,"hash":"8DE9-BDAB-F622-2FA8","dev_ip":"10.67.5.17","product":"uts"},{"sid":"6004744450036c44f815500016d00a5f5151105430a3ed","timestamp":1567673939,"sip":"10.67.0.52","sport":5624,"dip":"10.67.0.53","dport":80,"protocol":6,"app":3087428650795009,"app_proto":8,"direct":4,"app.detail":{"method":"GET","http_protocol":"1.1","ret_code":200,"host":"10.67.1.1","uri":"/webtest/uploadFile.php","referer":"http://[2222::65]/webtest/","content_type":" multipart/form-data; boundary=----WebKitFormBoundary2zcCUl4lQf1h7A7S","content_type_server":" text/html","server":"Apache/2.4.4 (Win32) OpenSSL/0.9.8y PHP/5.4.19","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36","link":"","cookies":"loginmainacctid=wangshiguang;operatorId=d2601586;com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;","content_encoding":"","location":"","content_length":70080,"content_length_server":200,"set_cookie":"","range":"","connection":"keep-alive","connection_server":"Keep-Alive","x_forwarded_for":"","post_data":"LS0tLS0tV2ViS2l0Rm9ybUJvdW5kYXJ5MnpjQ1VsNGxRZjFoN0E3Uw0KQ29udGVudC1EaXNwb3NpdGlvbjogZm9ybS1kYXRhOyBuYW1lPSJmaWxlIjsgZmlsZW5hbWU9IjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1Ig0=","response_body":"VXBsb2FkOiAwMDAxYWQ0NDFkY2IzODYyMThhNzY5OTJhY2Y4YjcwNTxiciAvPlR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTxiciAvPlNpemU6IDY4LjEyNzkyOTY4NzUgS2I8YnIgLz5UZW1wIGZpbGU6IEQ6XHhhbXBwXHRtcFxwaHA2ODI1LnRtcDxiciAvPjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1IGFscmVhZHkgZXhpc3RzLiA="}}]'''
inputarr = json.loads(inputstr, strict=False)
# 随机生成timestamp
inputarr[1]["timestamp"] = int(time.time())
inputarr[1]["sid"] = str(uuid.uuid1())
# inputarr[1]["sip"] = "10.67.4.33"
inputarr[1]["sip"] = generate_random_ip()
inputarr[1]["dip"] = "10.67.1.1"
inputarr[1]["dport"] = "8180"
inputarr[1]["app.detail"]["uri"] = "/alarmtest.action?BMECID=352432757&BMETimestamp=1692788489260&queryNumber=158713459"
inputarr[1]["app.detail"]["host"] = api
inputarr[1]["app.detail"]["cookies"] = cookies
return json.dumps(inputarr)
def generate_random_ip():
# 固定前缀 "192.168."
prefix = "192.168."
# 生成随机的第三和第四段IP地址
third_octet = 1
fourth_octet = random.randint(0, 50)
# 拼接IP地址
ip = "{}{}.{}".format(prefix, third_octet, fourth_octet)
return ip
def AbIDVisitAPINums510404():
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com",
"good.alarm.com"]
info_list = [
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["武汉"][
0] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 60],
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["荆州"][
2] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 120]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
return "510405场景的告警数据已生成"
def get_random_jobnum():
# 定义包含不同前缀的字符串数组
prefix_strings = [
['10243', '10895', '10134', '10781', '10962'], # 10打头的字符串示例
['11089', '11057', '11023', '11016', '11030'], # 110打头的字符串示例
['14076', '14049', '14098', '14032', '14061'], # 140打头的字符串示例
['26054', '26013', '26087', '26029', '26061'], # 260打头的字符串示例
['20083', '20015', '20072', '20096', '20048'], # 200打头的字符串示例
['19035', '19017', '19049', '19082', '19096'], # 190打头的字符串示例
['180237', '180276', '180204', '180295', '180219'] # 1802打头的字符串示例
]
# 随机选择一个前缀数组
selected_prefix_array = random.choice(prefix_strings)
# 随机选择一个具体的字符串
selected_string = random.choice(selected_prefix_array)
return selected_string
def get_random_person():
people_list = [
"Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Henry", "Isabel", "Jack",
"Kate", "Liam", "Mia", "Noah", "Olivia"
# 继续添加更多的名称...
]
random_person = random.choice(people_list)
return random_person
def get_random_menu():
# 定义系统菜单列表
system_menu = [
"主页", "设置", "个人资料", "消息", "通知", "帮助", "帐户", "关于", "联系我们", "服务",
"购物车", "订单", "支付", "地址", "密码"
]
# 随机选择一个菜单项
random_menu_item = random.choice(system_menu)
return random_menu_item
if __name__ == '__main__':
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com", "good.alarm.com","baidu.com","sohu.com","xinlang.com","erpx.com"]
info_list = [
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
print "510405场景的告警数据已生成"

@ -0,0 +1,37 @@
# coding=utf-8
"""
@Author: fu-zhe
@FileName: user_cron.py
@DateTime: 2024/5/23 14:19
@Description: 用户相关的定时任务 全量获取对已有数据进行先删除后写入 周期一天
"""
from __future__ import unicode_literals
import random,string
import traceback
import time
from datetime import datetime, timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.file_to_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
#每5分钟执行一次
def processing(self):
try:
logger_cron.info("INSERT:开始执行")
entry()
logger_cron.info("INSERT:"+"执行完成")
except Exception ,e:
err_info=traceback.format_exc()
logger_cron.error("INSERT:"+"执行失败,"+err_info)
if __name__ == '__main__':
UserCron().processing()

@ -0,0 +1,73 @@
# coding=utf-8
"""
@Author: tangwy
@FileName: user_cron_pg.py
@DateTime: 2024/7/09 14:19
@Description: 定时清洗es数据
"""
from __future__ import unicode_literals
import random,string
import traceback,json
import time
from datetime import datetime,timedelta
import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
JOB_STATUS ={
"RUNNING":1,
"FINISH":2,
"ERROR":3
}
class UserCron:
#生成job_id
def generate_job_id(self):
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#每5分钟执行一次
def processing(self):
logger_cron.info("JOB:接收到执行指令")
job_id =self.generate_job_id()
task_run_count =0
try:
start,end,status,run_count,jobid= DBUtils.get_job_period()
if jobid !="":
job_id=jobid
logger_cron.info("JOB:"+job_id+"开始执行")
if status ==1:
logger_cron.info("JOB:"+job_id+"正在运行中不执行")
return
if status==3 and run_count >3:
logger_cron.info("JOB:"+job_id+"失败次数大于3不执行")
return
if start is None or end is None:
logger_cron.info("JOB:"+job_id+"结束时间大于(服务器时间-15分钟)不执行")
return
task_run_count = run_count+1
logger_cron.info("JOB:"+job_id+"运行参数:{},{}".format(start,end))
logger_cron.info("JOB:"+job_id+"准备将job写入job表")
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger_cron.info("JOB:"+job_id+"完成job表写入")
logger_cron.info("JOB:"+job_id+"准备获取es数据")
entry(start,end,job_id)
logger_cron.info("JOB:"+job_id+"完成es数据获取")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"",task_run_count)
logger_cron.info("JOB:"+job_id+"更新job表状态完成")
except Exception ,e:
err_info=traceback.format_exc()
logger_cron.error("JOB:"+job_id+"执行失败:"+err_info)
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info,task_run_count)
raise
if __name__ == '__main__':
UserCron().processing()

@ -1,44 +0,0 @@
{
"ip":[
{
"ip":"192.168.1.1",
"jobnum":"1122222",
"count":212
},
{
"ip":"192.168.2.1",
"jobnum":"1122222",
"count":212
}
],
"account":[
{
"account":"zhangs",
"jobnum":"1122222",
"count":212
},
{
"account":"zhang3",
"jobnum":"112222",
"count":211
}
],
"interface":[
{
"interface":"www.baidu.com/user",
"jobnum":"1122222",
"account":"zhangs",
"ip":"192.168.1.1",
"count":212
}
],
"menu":[
{
"menu":"菜单1",
"jobnum":"1122222",
"account":"zhangs",
"ip":"192.168.1.1",
"count":212
}
]
}

@ -76,7 +76,7 @@ class Install():
def install(self): def install(self):
try: try:
installDBSchema(["pg_struct.sql"]) installDBSchema(["pg_struct.sql","pg_data.sql"])
add_task() add_task()
logger.info('>>>安装结束!!!') logger.info('>>>安装结束!!!')
except Exception as e: except Exception as e:

@ -1,13 +1,24 @@
[ [
{ {
"task_name": "ueba_corn", "task_name": "ueba_cron",
"task_type": 1, "task_type": 1,
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/corn/ueba_corn.py", "exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_pg.py",
"task_owner": "uebaMetricsAnalysis", "task_owner": "uebaMetricsAnalysis",
"run_mode": 1, "run_mode": 1,
"duration_args": "*/2 * * * * *", "duration_args": "0 */1 * * * ?",
"retry_nums": 5, "retry_nums": 0,
"is_enable": 1, "is_enable": 1,
"task_description": "每天执行一次 清洗数据到es-ueba索引" "task_description": "每分钟执行一次数据清洗"
},
{
"task_name": "ueba_cron_data_insert",
"task_type": 1,
"exec_cmd": "python /home/master/ISOP/apps/uebaMetricsAnalysis/cron/ueba_cron_data_insert.py",
"task_owner": "uebaMetricsAnalysis",
"run_mode": 1,
"duration_args": "0 0 3 * * ?",
"retry_nums": 0,
"is_enable": 1,
"task_description": "每天执行一次 将汇总数据写入pg"
} }
] ]

@ -2,191 +2,784 @@
"summary": { "summary": {
"ip": [ "ip": [
{ {
"company": "湖北公司", "company": "宜昌分公司",
"req_frequency": 122, "req_frequency": 517,
"frequency_rate": 0.2, "frequency_rate": 17.1475953565506,
"ip_count": 323, "ip_count": 8,
"ip_reat": 0.3, "ip_rate": 0.195121951219512,
"ip_avg": 0.43, "ip_avg": 2.14344941956882,
"trend": 0.3 "trend": 0.09
}, },
{ {
"company": "宜昌公司", "company": "随州分公司",
"req_frequency": 122, "req_frequency": 329,
"frequency_rate": 0.2, "frequency_rate": 10.9121061359867,
"ip_count": 323, "ip_count": 7,
"ip_reat": 0.3, "ip_rate": 0.170731707317073,
"ip_avg": 0.43, "ip_avg": 1.55887230514096,
"trend": 0.3 "trend": 0.1
},
{
"company": "孝感分公司",
"req_frequency": 399,
"frequency_rate": 13.2338308457711,
"ip_count": 7,
"ip_rate": 0.170731707317073,
"ip_avg": 1.89054726368159,
"trend": -0.07
},
{
"company": "黄冈分公司",
"req_frequency": 495,
"frequency_rate": 16.4179104477612,
"ip_count": 9,
"ip_rate": 0.219512195121951,
"ip_avg": 1.82421227197347,
"trend": -0.02
},
{
"company": "省公司",
"req_frequency": 1275,
"frequency_rate": 42.2885572139304,
"ip_count": 10,
"ip_rate": 0.24390243902439,
"ip_avg": 4.22885572139304,
"trend": 0.1
} }
], ],
"account": [ "account": [
{ {
"company": "湖北公司", "company": "宜昌分公司",
"req_frequency": 122, "req_frequency": 134,
"frequency_rate": 0.2, "frequency_rate": 19.7058823529412,
"account_count": 323, "account_count": 8,
"account_reat": 0.3, "account_rate": 0.242424242424242,
"account_avg": 0.43, "account_avg": 2.46323529411765,
"trend": 0.3 "trend": 0.09
},
{
"company": "随州分公司",
"req_frequency": 73,
"frequency_rate": 10.7352941176471,
"account_count": 7,
"account_rate": 0.212121212121212,
"account_avg": 1.53361344537815,
"trend": 0.1
},
{
"company": "孝感分公司",
"req_frequency": 225,
"frequency_rate": 33.0882352941176,
"account_count": 7,
"account_rate": 0.212121212121212,
"account_avg": 4.72689075630252,
"trend": -0.07
},
{
"company": "黄冈分公司",
"req_frequency": 166,
"frequency_rate": 24.4117647058824,
"account_count": 9,
"account_rate": 0.272727272727273,
"account_avg": 2.71241830065359,
"trend": -0.02
}, },
{ {
"company": "宜昌公司", "company": "公司",
"req_frequency": 122, "req_frequency": 216,
"frequency_rate": 0.2, "frequency_rate": 31.7647058823529,
"account_count": 323, "account_count": 10,
"account_reat": 0.3, "account_rate": 0.303030303030303,
"account_avg": 0.43, "account_avg": 3.17647058823529,
"trend": 0.3 "trend": 0.1
} }
], ],
"interface": [ "interface": [
{ {
"interface_addr": "/getuser", "interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 122, "req_frequency": 212,
"frequency_rate": 0.2, "frequency_rate": 0.160727824109174,
"frequency_avg": 0.43, "frequency_avg": 0,
"trend": 0.3 "trend": 0.07
}, },
{ {
"interface_addr": "/getcpminfo", "interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 122, "req_frequency": 225,
"frequency_rate": 0.2, "frequency_rate": 0.170583775587566,
"frequency_avg": 0.43, "frequency_avg": 0,
"trend": 0.3 "trend": 0.02
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 882,
"frequency_rate": 0.66868840030326,
"frequency_avg": 0,
"trend": -0.09
} }
], ],
"menu": [ "menu": [
{ {
"menu_name": "接口地址", "menu_name": "菜单1",
"req_frequency": 122, "req_frequency": 333,
"frequency_rate": 0.2, "frequency_rate": 0.263449367088608,
"frequency_avg": 0.43, "frequency_avg": 111,
"trend": 0.3 "trend": 0.09
},
{
"menu_name": "菜单2",
"req_frequency": 315,
"frequency_rate": 0.249208860759494,
"frequency_avg": 105,
"trend": -0.01
}, },
{ {
"menu_name": "接口地址", "menu_name": "菜单3",
"req_frequency": 122, "req_frequency": 616,
"frequency_rate": 0.2, "frequency_rate": 0.487341772151899,
"frequency_avg": 0.43, "frequency_avg": 205.333333333333,
"trend": 0.3 "trend": 0.02
} }
] ]
}, },
"detail": { "detail": {
"ip": { "ip": {
"湖北公司": [ "宜昌分公司": [
{ {
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.11",
"req_frequency": 22 "req_frequency": 22
}, },
{ {
"req_ip": "xx1x.xx.xx.x", "req_ip": "192.156.3.12",
"req_frequency": 21 "req_frequency": 12
},
{
"req_ip": "192.156.3.19",
"req_frequency": 78
},
{
"req_ip": "192.156.3.20",
"req_frequency": 79
},
{
"req_ip": "192.156.3.21",
"req_frequency": 80
},
{
"req_ip": "192.156.3.22",
"req_frequency": 81
},
{
"req_ip": "192.156.3.23",
"req_frequency": 82
},
{
"req_ip": "192.156.3.24",
"req_frequency": 83
} }
], ],
"宜昌公司": [ "随州分公司": [
{ {
"req_ip": "xxx.xx.xx.x", "req_ip": "192.116.3.24",
"req_frequency": 22 "req_frequency": 44
},
{
"req_ip": "192.116.3.25",
"req_frequency": 45
},
{
"req_ip": "192.116.3.26",
"req_frequency": 46
},
{
"req_ip": "192.116.3.27",
"req_frequency": 47
},
{
"req_ip": "192.116.3.28",
"req_frequency": 48
},
{
"req_ip": "192.116.3.29",
"req_frequency": 49
},
{
"req_ip": "192.116.3.30",
"req_frequency": 50
}
],
"孝感分公司": [
{
"req_ip": "192.126.3.24",
"req_frequency": 54
},
{
"req_ip": "192.126.3.25",
"req_frequency": 55
},
{
"req_ip": "192.126.3.26",
"req_frequency": 56
},
{
"req_ip": "192.126.3.27",
"req_frequency": 57
},
{
"req_ip": "192.126.3.28",
"req_frequency": 58
},
{
"req_ip": "192.126.3.29",
"req_frequency": 59
}, },
{ {
"req_ip": "xx1x.xx.xx.x", "req_ip": "192.106.3.30",
"req_frequency": 21 "req_frequency": 60
}
],
"黄冈分公司": [
{
"req_ip": "192.106.3.30",
"req_frequency": 51
},
{
"req_ip": "192.106.3.31",
"req_frequency": 52
},
{
"req_ip": "192.106.3.32",
"req_frequency": 53
},
{
"req_ip": "192.106.3.33",
"req_frequency": 54
},
{
"req_ip": "192.106.3.34",
"req_frequency": 55
},
{
"req_ip": "192.106.3.35",
"req_frequency": 56
},
{
"req_ip": "192.106.3.36",
"req_frequency": 57
},
{
"req_ip": "192.106.3.37",
"req_frequency": 58
},
{
"req_ip": "192.106.3.38",
"req_frequency": 59
}
],
"省公司": [
{
"req_ip": "192.146.3.38",
"req_frequency": 123
},
{
"req_ip": "192.146.3.39",
"req_frequency": 124
},
{
"req_ip": "192.146.3.40",
"req_frequency": 125
},
{
"req_ip": "192.146.3.41",
"req_frequency": 126
},
{
"req_ip": "192.146.3.42",
"req_frequency": 127
},
{
"req_ip": "192.146.3.43",
"req_frequency": 128
},
{
"req_ip": "192.146.3.44",
"req_frequency": 129
},
{
"req_ip": "192.146.3.45",
"req_frequency": 130
},
{
"req_ip": "192.146.3.46",
"req_frequency": 131
},
{
"req_ip": "192.146.3.47",
"req_frequency": 132
} }
] ]
}, },
"account": { "account": {
"湖北公司": [ "宜昌分公司": [
{ {
"req_account": "admin", "req_account": "huqx",
"req_frequency": 22, "req_frequency": 33,
"req_jobnum": 98799 "req_jobnum": 54412
}, },
{ {
"req_account": "admin", "req_account": "zhangsf",
"req_frequency": 22, "req_frequency": 34,
"req_jobnum": 98799 "req_jobnum": 54413
},
{
"req_account": "zhaoj",
"req_frequency": 35,
"req_jobnum": 54414
} }
], ],
"宜昌公司": [ "随州分公司": [
{ {
"req_account": "admin", "req_account": "sangdq",
"req_frequency": 22, "req_frequency": 36,
"req_jobnum": 98799 "req_jobnum": 54415
}, },
{ {
"req_account": "admin", "req_account": "hujt",
"req_frequency": 22, "req_frequency": 37,
"req_jobnum": 98799 "req_jobnum": 54416
}
],
"孝感分公司": [
{
"req_account": "zhangs",
"req_frequency": 98,
"req_jobnum": 43325
},
{
"req_account": "lin",
"req_frequency": 43,
"req_jobnum": 43326
},
{
"req_account": "liuhr",
"req_frequency": 33,
"req_jobnum": 43327
},
{
"req_account": "sunxq01",
"req_frequency": 51,
"req_jobnum": 43328
}
],
"黄冈分公司": [
{
"req_account": "shicl",
"req_frequency": 47,
"req_jobnum": 65341
},
{
"req_account": "gongxs",
"req_frequency": 65,
"req_jobnum": 65342
},
{
"req_account": "sunzs",
"req_frequency": 54,
"req_jobnum": 65343
}
],
"省公司": [
{
"req_account": "maoxt",
"req_frequency": 37,
"req_jobnum": 98761
},
{
"req_account": "xiaod01",
"req_frequency": 29,
"req_jobnum": 98761
},
{
"req_account": "qingsx",
"req_frequency": 71,
"req_jobnum": 98761
},
{
"req_account": "guobg",
"req_frequency": 79,
"req_jobnum": 98761
} }
] ]
}, },
"interface": { "interface": {
"接口1": [ "http://190.89.233.2:8909/getUser": [
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 23,
"req_ip": "192.156.3.12",
"req_account": "zhangq",
"req_jobnum": 54411
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 24,
"req_ip": "192.156.3.12",
"req_account": "huqx",
"req_jobnum": 54412
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 25,
"req_ip": "192.156.3.13",
"req_account": "zhangsf",
"req_jobnum": 54413
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 26,
"req_ip": "192.156.3.14",
"req_account": "zhaoj",
"req_jobnum": 54414
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 27,
"req_ip": "192.156.3.15",
"req_account": "sangdq",
"req_jobnum": 54415
},
{ {
"interface_addr": "接口地址", "interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 122, "req_frequency": 28,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.16",
"req_account": 0.2, "req_account": "hujt",
"req_jobnum": 0.2 "req_jobnum": 54416
}, },
{ {
"interface_addr": "接口地址", "interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 122, "req_frequency": 29,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.17",
"req_account": 0.2, "req_account": "zhangs",
"req_jobnum": 0.2 "req_jobnum": 43325
},
{
"interface_addr": "http://190.89.233.2:8909/getUser",
"req_frequency": 30,
"req_ip": "192.156.3.18",
"req_account": "lin",
"req_jobnum": 43326
} }
], ],
"接口2": [ "http://190.89.233.2:8909/getpublicconfig": [
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 43,
"req_ip": "192.156.3.12",
"req_account": "liuhr",
"req_jobnum": 43327
},
{ {
"interface_addr": "接口地址", "interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 122, "req_frequency": 44,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.12",
"req_account": 0.2, "req_account": "sunxq01",
"req_jobnum": 0.2 "req_jobnum": 43328
}, },
{ {
"interface_addr": "接口地址", "interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 122, "req_frequency": 45,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.18",
"req_account": 0.2, "req_account": "shicl",
"req_jobnum": 0.2 "req_jobnum": 65341
},
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 46,
"req_ip": "192.106.3.33",
"req_account": "gongxs",
"req_jobnum": 65342
},
{
"interface_addr": "http://190.89.233.2:8909/getpublicconfig",
"req_frequency": 47,
"req_ip": "192.106.3.34",
"req_account": "sunzs",
"req_jobnum": 65343
}
],
"http://190.89.233.2:8909/update/sysconfig": [
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 34,
"req_ip": "192.106.3.35",
"req_account": "zhangsf",
"req_jobnum": 54415
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 23,
"req_ip": "192.106.3.36",
"req_account": "zhaoj",
"req_jobnum": 54416
},
{
"interface_addr": "http://190.89.233.2:8909/update/sysconfig",
"req_frequency": 78,
"req_ip": "192.106.3.37",
"req_account": "sangdq",
"req_jobnum": 43325
},
{
"interface_addr": "http://190.89.233.2:8910/update/sysconfig",
"req_frequency": 79,
"req_ip": "192.146.3.38",
"req_account": "hujt",
"req_jobnum": 43326
},
{
"interface_addr": "http://190.89.233.2:8911/update/sysconfig",
"req_frequency": 80,
"req_ip": "192.146.3.39",
"req_account": "zhangs",
"req_jobnum": 43327
},
{
"interface_addr": "http://190.89.233.2:8912/update/sysconfig",
"req_frequency": 81,
"req_ip": "192.146.3.40",
"req_account": "lin",
"req_jobnum": 43328
},
{
"interface_addr": "http://190.89.233.2:8913/update/sysconfig",
"req_frequency": 82,
"req_ip": "192.146.3.41",
"req_account": "liuhr",
"req_jobnum": 65341
},
{
"interface_addr": "http://190.89.233.2:8914/update/sysconfig",
"req_frequency": 83,
"req_ip": "192.146.3.42",
"req_account": "sunxq01",
"req_jobnum": 65342
},
{
"interface_addr": "http://190.89.233.2:8915/update/sysconfig",
"req_frequency": 84,
"req_ip": "192.146.3.43",
"req_account": "xiaod01",
"req_jobnum": 65343
},
{
"interface_addr": "http://190.89.233.2:8916/update/sysconfig",
"req_frequency": 85,
"req_ip": "192.146.3.44",
"req_account": "qingsx",
"req_jobnum": 98761
},
{
"interface_addr": "http://190.89.233.2:8917/update/sysconfig",
"req_frequency": 86,
"req_ip": "192.146.3.45",
"req_account": "guobg",
"req_jobnum": 98761
},
{
"interface_addr": "http://190.89.233.2:8918/update/sysconfig",
"req_frequency": 87,
"req_ip": "192.146.3.46",
"req_account": "zhangq",
"req_jobnum": 98761
} }
] ]
}, },
"menu": { "menu": {
"菜单1": [ "菜单1": [
{ {
"menu_name": "接口地址", "menu_name": "菜单1",
"req_frequency": 122, "req_frequency": 53,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.106.3.32",
"req_account": 0.2, "req_account": "lin",
"req_jobnum": 0.2 "req_jobnum": "43326"
},
{
"menu_name": "菜单1",
"req_frequency": 54,
"req_ip": "192.106.3.33",
"req_account": "liuhr",
"req_jobnum": "43327"
},
{
"menu_name": "菜单1",
"req_frequency": 55,
"req_ip": "192.106.3.34",
"req_account": "sunxq01",
"req_jobnum": "43328"
},
{
"menu_name": "菜单1",
"req_frequency": 56,
"req_ip": "192.106.3.35",
"req_account": "shicl",
"req_jobnum": "65341"
}, },
{ {
"menu_name": "接口地址", "menu_name": "菜单1",
"req_frequency": 122, "req_frequency": 57,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.106.3.36",
"req_account": 0.2, "req_account": "gongxs",
"req_jobnum": 0.2 "req_jobnum": "65342"
},
{
"menu_name": "菜单1",
"req_frequency": 58,
"req_ip": "192.106.3.37",
"req_account": "sunzs",
"req_jobnum": "65343"
} }
], ],
"菜单2": [ "菜单2": [
{ {
"menu_name": "接口地址", "menu_name": "菜单2",
"req_frequency": 122, "req_frequency": 31,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.156.3.12",
"req_account": 0.2, "req_account": "zhangq",
"req_jobnum": 0.2 "req_jobnum": "54411"
},
{
"menu_name": "菜单2",
"req_frequency": 32,
"req_ip": "192.156.3.12",
"req_account": "huqx",
"req_jobnum": "54412"
},
{
"menu_name": "菜单2",
"req_frequency": 33,
"req_ip": "192.156.3.13",
"req_account": "zhangsf",
"req_jobnum": "54413"
},
{
"menu_name": "菜单2",
"req_frequency": 34,
"req_ip": "192.156.3.14",
"req_account": "zhaoj",
"req_jobnum": "54414"
},
{
"menu_name": "菜单2",
"req_frequency": 35,
"req_ip": "192.156.3.15",
"req_account": "sangdq",
"req_jobnum": "54415"
},
{
"menu_name": "菜单2",
"req_frequency": 36,
"req_ip": "192.156.3.16",
"req_account": "hujt",
"req_jobnum": "54416"
},
{
"menu_name": "菜单2",
"req_frequency": 37,
"req_ip": "192.156.3.17",
"req_account": "zhangs",
"req_jobnum": "43325"
},
{
"menu_name": "菜单2",
"req_frequency": 38,
"req_ip": "192.156.3.18",
"req_account": "lin",
"req_jobnum": "43326"
},
{
"menu_name": "菜单2",
"req_frequency": 39,
"req_ip": "192.156.3.12",
"req_account": "liuhr",
"req_jobnum": "43327"
}
],
"菜单3": [
{
"menu_name": "菜单3",
"req_frequency": 51,
"req_ip": "192.106.3.33",
"req_account": "gongxs",
"req_jobnum": "65342"
},
{
"menu_name": "菜单3",
"req_frequency": 52,
"req_ip": "192.106.3.34",
"req_account": "sunzs",
"req_jobnum": "65343"
},
{
"menu_name": "菜单3",
"req_frequency": 53,
"req_ip": "192.106.3.35",
"req_account": "zhangsf",
"req_jobnum": "54415"
},
{
"menu_name": "菜单3",
"req_frequency": 54,
"req_ip": "192.106.3.36",
"req_account": "zhaoj",
"req_jobnum": "54416"
},
{
"menu_name": "菜单3",
"req_frequency": 55,
"req_ip": "192.106.3.37",
"req_account": "sangdq",
"req_jobnum": "43325"
},
{
"menu_name": "菜单3",
"req_frequency": 56,
"req_ip": "192.146.3.38",
"req_account": "hujt",
"req_jobnum": "43326"
},
{
"menu_name": "菜单3",
"req_frequency": 57,
"req_ip": "192.146.3.39",
"req_account": "zhangs",
"req_jobnum": "43327"
},
{
"menu_name": "菜单3",
"req_frequency": 58,
"req_ip": "192.146.3.40",
"req_account": "lin",
"req_jobnum": "43328"
},
{
"menu_name": "菜单3",
"req_frequency": 59,
"req_ip": "192.146.3.41",
"req_account": "liuhr",
"req_jobnum": "65341"
},
{
"menu_name": "菜单3",
"req_frequency": 60,
"req_ip": "192.146.3.42",
"req_account": "sunxq01",
"req_jobnum": "65342"
}, },
{ {
"menu_name": "接口地址", "menu_name": "菜单3",
"req_frequency": 122, "req_frequency": 61,
"req_ip": "xxx.xx.xx.x", "req_ip": "192.146.3.43",
"req_account": 0.2, "req_account": "xiaod01",
"req_jobnum": 0.2 "req_jobnum": "65343"
} }
] ]
} }

@ -0,0 +1,221 @@
{
"summary": {
"ip": [
{
"company": "孝感分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "宜昌分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "随州分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "黄冈分公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
},
{
"company": "省公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"ip_count": 323,
"ip_rate": 0.3,
"ip_avg": 0.43,
"trend": 0.3
}
],
"account": [
{
"company": "湖北公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_rate": 0.3,
"account_avg": 0.43,
"trend": 0.3
},
{
"company": "宜昌公司",
"req_frequency": 122,
"frequency_rate": 0.2,
"account_count": 323,
"account_rate": 0.3,
"account_avg": 0.43,
"trend": 0.3
}
],
"interface": [
{
"interface_addr": "/getuser",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
},
{
"interface_addr": "/getcpminfo",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
}
],
"menu": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"frequency_rate": 0.2,
"frequency_avg": 0.43,
"trend": 0.3
}
]
},
"detail": {
"ip": {
"湖北公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_frequency": 22
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
}
],
"宜昌公司": [
{
"req_ip": "xxx.xx.xx.x",
"req_frequency": 22
},
{
"req_ip": "xx1x.xx.xx.x",
"req_frequency": 21
}
]
},
"account": {
"湖北公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
}
],
"宜昌公司": [
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
},
{
"req_account": "admin",
"req_frequency": 22,
"req_jobnum": 98799
}
]
},
"interface": {
"接口1": [
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
],
"接口2": [
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"interface_addr": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
]
},
"menu": {
"菜单1": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
],
"菜单2": [
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
},
{
"menu_name": "接口地址",
"req_frequency": 122,
"req_ip": "xxx.xx.xx.x",
"req_account": 0.2,
"req_jobnum": 0.2
}
]
}
}
}

@ -1,8 +1,8 @@
{ {
"name": "UebaMetricsAnalysis", "name": "uebaMetricsAnalysis",
"version": "V3.0R01F00", "version": "V3.0R01F00",
"menuurl": "/UebaMetricsAnalysis", "menuurl": "/uebaMetricsAnalysis",
"menuregx": "^/UebaMetricsAnalysis", "menuregx": "^/uebaMetricsAnalysis",
"menuname": "指标晾晒统计", "menuname": "指标晾晒统计",
"summary": "指标晾晒统计", "summary": "指标晾晒统计",
"platform_mode": "simple_mode", "platform_mode": "simple_mode",

@ -1,16 +1,16 @@
{ {
"name": "指标晾晒统计", "name": "指标晾晒统计",
"key": "UebaMetricsAnalysis", "key": "uebaMetricsAnalysis",
"prefix": "UebaMetricsAnalysis", "prefix": "uebaMetricsAnalysis",
"allfix": [ "allfix": [
"^/UebaMetricsAnalysis" "^/uebaMetricsAnalysis"
], ],
"fix": [ "fix": [
"^/UebaMetricsAnalysis" "^/uebaMetricsAnalysis"
], ],
"link": "/WebApi/UebaMetricsAnalysis/static/dist/#/", "link": "/WebApi/uebaMetricsAnalysis/static/dist/#/",
"pinyin": "zhibiaoliangshaitongji", "pinyin": "zhibiaoliangshaitongji",
"app_name": "UebaMetricsAnalysis", "app_name": "uebaMetricsAnalysis",
"children": [ "children": [
], ],
@ -18,6 +18,6 @@
1 1
], ],
"role_menu_register":{ "role_menu_register":{
"指标晾晒统计": "UebaMetricsAnalysis" "指标晾晒统计": "uebaMetricsAnalysis"
} }
} }

@ -1,18 +0,0 @@
#encoding=utf-8
import json
from isoc.utils.esUtil import EsUtil
def createIndex():
map={
"field1": "text",
"field2": "text"
}
es_util_instance = EsUtil()
res = es_util_instance.create_index_simple("bsa_traffic*",3,scroll_search)
return res
res = createIndex()
print(res)

@ -1,292 +0,0 @@
#encoding=utf-8
import json
import time,datetime
import traceback
from datetime import datetime, timedelta
import calendar
from esUtil import EsUtil
import pytz
size = 1000# 可以根据实际情况调整
##01 创建索引
def createIndex(index):
map={
"data_type":"keyword",
"req_account":"keyword",
"req_frequency":"integer",
"req_jobnum":"keyword",
"interface_addr":"keyword",
"req_ip":"ip",
"menu_name":"keyword",
"date_time":"date"
}
es_util_instance = EsUtil()
reqs = es_util_instance.is_index_exist(index)
if reqs =="false":
try:
res = es_util_instance.create_index_simple(index,map)
except Exception,e:
print e.message
## IP维度
def get_ip_group_data(index,startTime,endTime):
try:
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"sip": { "terms": {"field": "sip"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try:
response = es_util_instance.search(index,query_body)
except Exception,e:
print "err"
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = {
"data_type": "ip",
"req_account": "",
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":bucket['key']['sip'] ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas
## 账号维度
def get_account_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"account": { "terms": {"field": "account"} }},
{"trojan_type": { "terms": { "field": "trojan_type"}}}
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "account",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "",
"req_ip":"0.0.0.0" ,
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def get_interface_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "interface",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": bucket['key']['interface'] ,
"req_ip":bucket['key']['sip'],
"menu_name": "",
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def get_menu_group_data(index,startTime,endTime):
query_body={
"size": 0,
"query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}}
},
"aggs": {
"composite_buckets": {
"composite": {
"size": size,
"sources": [
{"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}},
]
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas=[]
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = {
"data_type": "menu",
"req_account": bucket['key']['account'],
"req_frequency": bucket['doc_count'],
"req_jobnum": bucket['key']['trojan_type'] ,
"interface_addr": "" ,
"req_ip":bucket['key']['sip'],
"menu_name": bucket['key']['worm_family'],
"date_time": int(time.time() * 1000) # 当前时间,使用isoformat格式化
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
##03 数据写入
def data_insert(index,data):
es_util_instance = EsUtil()
response = es_util_instance.bulk_insert(index,data)
return response
def clean_data(write_index,read_index,start,end):
data_ip = get_ip_group_data(read_index,start,end)
print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end)
print "data_ip:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end)
print "data_ip:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end)
print "data_ip:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
response = data_insert(write_index,res_data)
print json.dumps(response)
#入口
def entry(write_index,read_index,start,end):
createIndex(write_index)
clean_data(write_index,read_index,start,end)
#前一天的0点0分0秒
def get_start_end_time(hour,minute,second):
# 获取当前日期时间
now = datetime.now()
# 计算昨天的日期时间
yesterday = now - timedelta(days=1)
# 将时间部分设为 00:00:00
yesterday_midnight = yesterday.replace(hour=hour, minute=minute, second=second, microsecond=0)
# 使用 pytz 来获取 UTC 时区对象
utc = pytz.utc
# 将时间对象本地化为 UTC 时区
yesterday_midnight_utc = utc.localize(yesterday_midnight)
# 格式化为带时区的字符串(ISO 8601格式)
formatted_date = yesterday_midnight_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_date
def index():
try:
#写入的索引 按月创建,注意跨天的场景
write_index= "b_ueba_2024_07"
read_index ="bsa_traffic*"
#任务执行时间是每天 凌晨12点
#查询的范围 开始时间前一天的0点0分0秒,结束时间是 前一天的23.59.59秒
start = "2024-06-02T00:00:00Z"#get_start_end_time(0,0,0)
end = get_start_end_time(23,59,59)
print start +":"+ end
entry(write_index,read_index,start,end)
except Exception ,e:
print "定时任务执行失败:"+traceback.format_exc()
# logger.error("定时任务执行失败:".format(str(e), traceback.format_exc()))
index()

@ -5,10 +5,11 @@ import time,datetime
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
import calendar import calendar
import codecs
from esUtil import EsUtil from esUtil import EsUtil
import pytz from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
size = 1000# 可以根据实际情况调整 size = 9999#根据实际情况调整
DATA_TYPE = { DATA_TYPE = {
"IP": 1, "IP": 1,
@ -19,11 +20,19 @@ DATA_TYPE = {
## IP维度 ## IP维度
def get_ip_group_data(index,startTime,endTime): def get_ip_group_data(index,startTime,endTime):
try:
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -43,10 +52,7 @@ def get_ip_group_data(index,startTime,endTime):
while True: while True:
if after_key: if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try:
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
except Exception,e:
print "err"
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
data = { data = {
"data_type": DATA_TYPE.get("IP"), "data_type": DATA_TYPE.get("IP"),
@ -56,13 +62,9 @@ def get_ip_group_data(index,startTime,endTime):
} }
datas.append(data) datas.append(data)
after_key = bucket["key"] after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"): if not response["aggregations"]["composite_buckets"].get("after_key"):
break break
after_key = response["aggregations"]["composite_buckets"]["after_key"] after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas return datas
## 账号维度 ## 账号维度
@ -70,7 +72,16 @@ def get_account_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -92,7 +103,6 @@ def get_account_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("ACCOUNT"), "data_type": DATA_TYPE.get("ACCOUNT"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -114,7 +124,16 @@ def get_interface_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -124,7 +143,7 @@ def get_interface_group_data(index,startTime,endTime):
{"interface": { "terms": {"field": "interface"} }}, {"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}}, {"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}}, {"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}
] ]
} }
} }
@ -138,7 +157,6 @@ def get_interface_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("INTERFACE"), "data_type": DATA_TYPE.get("INTERFACE"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -162,7 +180,16 @@ def get_menu_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "bool": {
"filter": [
{"range": {"timestamp": {"gt": startTime,"lte": endTime}}},
{"bool": {
"must_not": [
{"match_phrase": {"account": ""}}
]
}}
]
}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -172,7 +199,7 @@ def get_menu_group_data(index,startTime,endTime):
{"worm_family": { "terms": {"field": "worm_family"} }}, {"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}}, {"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}}, {"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}
] ]
} }
} }
@ -186,7 +213,6 @@ def get_menu_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("MENU"), "data_type": DATA_TYPE.get("MENU"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -204,24 +230,48 @@ def get_menu_group_data(index,startTime,endTime):
return datas return datas
def datetime_to_timestamp(dt): def datetime_to_timestamp(dt):
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000) dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
def clean_data(read_index,start,end): return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end,jobid):
data_ip = get_ip_group_data(read_index,start,end) data_ip = get_ip_group_data(read_index,start,end)
# print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end) data_account = get_account_group_data(read_index,start,end)
# print "data_account:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end) data_interface = get_interface_group_data(read_index,start,end)
# print "data_interface:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end) data_menu = get_menu_group_data(read_index,start,end)
# print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
print ("resdata:"+json.dumps(res_data)) if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")
return
logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据")
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
#todo 读取上一次5分钟的文件,与这5分钟的文件合并 #todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件 #合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start) group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start):
#读取大文件
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
#写入大文件
def write_large_file(filename, data_list, chunk_size=1024*1024*5):
with codecs.open(filename, 'w', encoding='utf-8') as f:
for i in range(0, len(data_list), chunk_size):
chunk = data_list[i:i + chunk_size]
f.write(chunk)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid):
ipGroupStr = "ip,jobnum" ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr) ipGroup = group_and_sum(data_ip, ipGroupStr)
accountGroupStr = "account,jobnum" accountGroupStr = "account,jobnum"
@ -231,7 +281,6 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
menuGroupStr = "menu,ip,account,jobnum" menuGroupStr = "menu,ip,account,jobnum"
menuGroup = group_and_sum(data_menu, menuGroupStr) menuGroup = group_and_sum(data_menu, menuGroupStr)
data = {} data = {}
data["ip"] = ipGroup data["ip"] = ipGroup
data["account"] = accountGroup data["account"] = accountGroup
@ -239,34 +288,31 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
data["menu"] = menuGroup data["menu"] = menuGroup
# 获取当前工作目录 # 获取当前工作目录
current_dir = os.getcwd() # current_dir = os.getcwd()
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
date_time = convert_utc_to_local_time(start) date_time = convert_utc_to_local_time(start)
date_str = time.strftime('%Y-%m-%d', date_time) date_str = time.strftime('%Y-%m-%d', date_time)
file_path = os.path.join(base_path,date_str + '.json')
file_path = os.path.join(current_dir, 'files/' + date_str + '.json') logger_cron.info("JOB:"+jobid+", 写入文件路径"+file_path)
all_data = [data] all_data = [data]
logger_cron.info("JOB: "+jobid+",准备读取已有文件")
if os.path.exists(file_path): if os.path.exists(file_path):
# 打开文件并读取内容 # 打开文件并读取内容
with codecs.open(file_path, 'r', encoding='utf-8') as file: old_json_data =read_large_json_file(file_path)
content = file.read()
old_json_data = json.loads(content)
all_data = [data, old_json_data] all_data = [data, old_json_data]
logger_cron.info("JOB:"+jobid+", 读取已有文件完成")
merged_data = merge_data(all_data) merged_data = merge_data(all_data)
# 使用codecs模块以UTF-8编码打开文件
f = codecs.open(file_path, 'w', encoding='utf-8')
json_data = json.dumps(merged_data) json_data = json.dumps(merged_data)
# 写入Unicode字符串 #写入文件
f.write(json_data) write_large_file(file_path,json_data)
# 关闭文件
f.close()
logger_cron.info("JOB: "+jobid+",写入文件完成")
def group_and_sum(data, by_fields="ip,jobnum"): def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表 # 将by_fields转换为列表
by_fields_list = by_fields.split(',') by_fields_list = by_fields.split(',')
@ -361,28 +407,33 @@ def convert_utc_to_local_time(timestamp):
return time_struct_beijing return time_struct_beijing
#入口 #入口
def entry(start,end): def entry(start,end,jobid):
base_index ="bsa_traffic*" base_index ="bsa_traffic*"
es_util_instance = EsUtil() es_util_instance = EsUtil()
# start = datetime_to_timestamp(start) start = datetime_to_timestamp(start)
# end = datetime_to_timestamp(end) end = datetime_to_timestamp(end)
logger_cron.info("JOB:"+jobid+",start为"+str(start))
logger_cron.info("JOB:"+jobid+",end为"+str(end))
res=es_util_instance.get_available_index_name(start,end,base_index) res=es_util_instance.get_available_index_name(start,end,base_index)
print "xxxx:"+str(len(res)) logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res))
if len(res)==0: if len(res)==0:
return return
index =",".join(res) index =",".join(res)
clean_data(index,start,end) clean_data(index,start,end,jobid)
# start = '2024-07-18 15:20:31'
start = 1720772586000 # end = '2024-07-18 15:25:31'
end = 1720776186000 # date_format = "%Y-%m-%d %H:%M:%S"
# # 将 datetime 对象转换为秒级时间戳
# timestamp_seconds = time.mktime(dt.timetuple()) # date_str = datetime.strptime(start, date_format)
# # 获取微秒数 # end_str = datetime.strptime(end, date_format)
# microseconds = dt.microsecond # # # 将 datetime 对象转换为秒级时间戳
# # 转换为毫秒级时间戳 # # timestamp_seconds = time.mktime(dt.timetuple())
# timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0) # # # 获取微秒数
# # microseconds = dt.microsecond
# # # 转换为毫秒级时间戳
entry(start,end) # # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
# entry(date_str,end_str,"xxxxxxxxxxxxxxxxxxx")

@ -1,281 +0,0 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import json
import os,re
import codecs
import traceback
from isoc.utils.esUtil import EsUtil
from dashboard_data_conversion import ip_summary_data_format, account_summary_data_format, \
interface_summary_data_format, menu_summary_data_format
from ext_logging import logger
## IP维度
def es_get_ip_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "ip" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"ip":bucket['key']['req_ip'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 账号维度
def es_get_account_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "account" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 菜单维度
def es_get_menu_group_data(index,startTime,endTime):
page_size = 9000 #可以根据实际情况调整
query_body={
"size": 0,
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "menu" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"composite_buckets": {
"composite": {
"size" : page_size,
"sources": [
{ "menu_name": { "terms": { "field": "menu_name" } } },
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"menu":bucket['key']['menu_name'],
"ip":bucket['key']['req_ip'],
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
## 接口维度
def es_get_interface_group_data(index,startTime,endTime):
page_size = 9999 #可以根据实际情况调整
query_body={
"query": {
"bool": {
"filter": [
{ "term": { "data_type": "interface" } },
{"range":{
"date_time": {
"gte": startTime,
"lte": endTime
}
}}
]
}
},
"aggs": {
"group_by_menu": {
"composite": {
"size" : page_size,
"sources": [
{ "interface_addr": { "terms": { "field": "interface_addr" } } },
{ "req_account": { "terms": { "field": "req_account" } } },
{ "req_ip": { "terms": { "field": "req_ip" } } },
{ "req_jobnum": { "terms": { "field": "req_jobnum" } } }
]
},
"aggregations": {
"total_count": {
"sum": {
"field": "req_frequency"
}
}
}
}
}
}
after_key = None
es_util_instance = EsUtil()
datas = []
while True:
if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body)
buckets = response.get("aggregations", {}).get("composite_buckets", {}).get("buckets", [])
for bucket in buckets:
data= {
"interface":bucket['key']['interface_addr'],
"ip":bucket['key']['req_ip'],
"account":bucket['key']['req_account'],
"jobnum":bucket['key']['req_jobnum'],
"count":bucket['total_count']['value']
}
datas.append(data)
after_key = bucket["key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
return datas
def entry(data_type,start,end):
base_index = 'c_ueba_001'
# es_util_instance = EsUtil()
# res=es_util_instance.get_available_index_name(start,end,base_index)
# if len(res)==0:
# return
# index =",".join(res)
index=base_index
try:
data = {}
if data_type == "1":
ip_summary_data = es_get_ip_group_data(index, start, end)
data = ip_summary_data_format(ip_summary_data)
if data_type == "2":
account_summary_data = es_get_account_group_data(index, start, end)
data = account_summary_data_format(account_summary_data)
if data_type == "3":
interface_summary_data = es_get_interface_group_data(index, start, end)
data = interface_summary_data_format(interface_summary_data)
if data_type == "4":
menu_summary_data = es_get_menu_group_data(index, start, end)
data = menu_summary_data_format(menu_summary_data)
return data
except Exception, e:
logger.error(traceback.format_exc())
raise e

@ -31,7 +31,12 @@ def keep_digits_filter(code):
""" """
return ''.join(filter(str.isdigit, str(code))) return ''.join(filter(str.isdigit, str(code)))
#安全除
def safe_divide(numerator, denominator):
if denominator == 0:
return
else:
return numerator / denominator
def find_region_by_code(code, region_dict): def find_region_by_code(code, region_dict):
""" """
查询工号对应公司 查询工号对应公司
@ -72,7 +77,6 @@ def ip_summary_data_format(ip_summary_data):
# 统计总请求次数和独立IP数 # 统计总请求次数和独立IP数
reqs_total = sum(data["reqs"] for data in grouped_data.values()) reqs_total = sum(data["reqs"] for data in grouped_data.values())
# 请求为0抛出 # 请求为0抛出
if reqs_total == 0: if reqs_total == 0:
return result return result
@ -88,7 +92,7 @@ def ip_summary_data_format(ip_summary_data):
# 本公司的 ip个数/所有公司 ip个数的合计 # 本公司的 ip个数/所有公司 ip个数的合计
"ip_rate": round(len(data["ips"]) / len(ips_total), 4), "ip_rate": round(len(data["ips"]) / len(ips_total), 4),
# 本公司的 请求次数/本公司 ip个数的合计 # 本公司的 请求次数/本公司 ip个数的合计
"ip_avg": data["reqs"] // len(data["ips"]), "ip_avg": safe_divide(data["reqs"],len(data["ips"])),
} }
for company, data in grouped_data.items() for company, data in grouped_data.items()
] ]
@ -146,7 +150,7 @@ def account_summary_data_format(account_summary_data):
# 本公司的 账号次数/所有公司 账号次数的合计 # 本公司的 账号次数/所有公司 账号次数的合计
"account_rate": round(len(data["accounts"]) / len(accounts_total), 4), "account_rate": round(len(data["accounts"]) / len(accounts_total), 4),
# 本公司的 请求次数/本公司 账号次数的合计 # 本公司的 请求次数/本公司 账号次数的合计
"account_avg": data["reqs"] // len(data["accounts"]), "account_avg": safe_divide(data["reqs"],len(data["accounts"])),
} }
for company, data in grouped_data.items() for company, data in grouped_data.items()
] ]
@ -199,9 +203,9 @@ def interface_summary_data_format(interface_summary_data):
# 本接口的 请求次数/所有接口 请求次数的合计 # 本接口的 请求次数/所有接口 请求次数的合计
"frequency_rate": round(data["reqs"] / reqs_total, 4), "frequency_rate": round(data["reqs"] / reqs_total, 4),
# 本接口的 请求次数/ 20 查询top20接口 # 本接口的 请求次数/ 20 查询top20接口
"frequency_avg": data["reqs"] // 20, "frequency_avg": safe_divide(data["reqs"],20),
} }
for company, data in grouped_data.items() for interface, data in grouped_data.items()
] ]
result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True) result["summary"]["interface"] = sorted(interface_data_list, key=lambda x: x["req_frequency"], reverse=True)
@ -257,9 +261,9 @@ def menu_summary_data_format(menu_summary_data):
# 本菜单的 请求次数 /所有菜单 请求次数的合计 # 本菜单的 请求次数 /所有菜单 请求次数的合计
"frequency_rate": round(data["reqs"] / reqs_total, 4), "frequency_rate": round(data["reqs"] / reqs_total, 4),
# 本菜单的 请求次数 /所有菜单 个数的合计 # 本菜单的 请求次数 /所有菜单 个数的合计
"frequency_avg": data["reqs"] // len(menu_total), "frequency_avg": safe_divide(data["reqs"],len(menu_total)),
} }
for company, data in grouped_data.items() for menu, data in grouped_data.items()
] ]
result["summary"]["menu"] = sorted(menu_data_list, key=lambda x: x["req_frequency"], reverse=True) result["summary"]["menu"] = sorted(menu_data_list, key=lambda x: x["req_frequency"], reverse=True)
@ -269,13 +273,17 @@ def menu_summary_data_format(menu_summary_data):
return result return result
#调整时间
def adjust_times(start_time, end_time): def adjust_times(start_time, end_time):
# 计算开始时间和结束时间之间的天数差 start_time = datetime.strptime(start_time, "%Y-%m-%d")
delta_days = (end_time - start_time).days end_time = datetime.strptime(end_time, "%Y-%m-%d")
# 从开始和结束时间各自减去这个天数差
adjusted_start_time = start_time - timedelta(days=delta_days)
adjusted_end_time = end_time - timedelta(days=delta_days)
return adjusted_start_time, adjusted_end_time delta_days = (end_time - start_time).days
if delta_days == 0:
pre_date = start_time-timedelta(1)
pre_date = start_time-timedelta(1)
return pre_date.strftime("%Y-%m-%d"),pre_date.strftime("%Y-%m-%d")
if delta_days > 0:
pre_start_date = start_time-timedelta(delta_days+1)
pre_end_date = end_time-timedelta(delta_days+1)
return pre_start_date.strftime("%Y-%m-%d"),pre_end_date.strftime("%Y-%m-%d")
return start_time, end_time

@ -1,7 +1,7 @@
#!/usr/bin/python #!/usr/bin/python
# encoding=utf-8 # encoding=utf-8
# author: tangwy # author: tangwy
from __future__ import division
import json import json
import os, re import os, re
import codecs import codecs
@ -12,10 +12,9 @@ from dashboard_data_conversion import ip_summary_data_format, account_summary_da
interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
from ext_logging import logger from ext_logging import logger
TABLE_NAME = "ueba_logs" TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = { DATA_TYPE = {
"IP": 1, "IP": 1,
@ -24,7 +23,6 @@ DATA_TYPE = {
"MENU": 4, "MENU": 4,
} }
def pg_get_ip_group_data(startTime, endTime): def pg_get_ip_group_data(startTime, endTime):
""" """
IP维度查询 IP维度查询
@ -115,7 +113,27 @@ def pg_get_menu_group_data(startTime, endTime):
return result return result
def pg_get_previous_company_count(startTime, endTime, data_type): def pg_get_account_previous_company_count(startTime, endTime, data_type):
"""
账号维度查询菜请求次数
:param startTime: 开始时间,
:param endTime: 结束时间,
:param data_type: 公司聚合类型 ACCOUNT or IP ,
"""
result = defaultdict(int)
if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type]
sql = """ select jobnum, account,sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res:
for item in res:
company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[2]
return result
def pg_get_ip_previous_company_count(startTime, endTime, data_type):
""" """
账号维度查询菜请求次数 账号维度查询菜请求次数
:param startTime: 开始时间, :param startTime: 开始时间,
@ -125,14 +143,14 @@ def pg_get_previous_company_count(startTime, endTime, data_type):
result = defaultdict(int) result = defaultdict(int)
if data_type in DATA_TYPE: if data_type in DATA_TYPE:
data_type = DATA_TYPE[data_type] data_type = DATA_TYPE[data_type]
sql = """ select jobnum, sum(count) from {TABLE_NAME} sql = """ select jobnum,ip, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by jobnum""".format(TABLE_NAME=TABLE_NAME) group by jobnum,ip """.format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type)))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, data_type))))
if res: if res:
for item in res: for item in res:
company = find_region_by_code(item[0], jobnum_region_dict) company = find_region_by_code(item[0], jobnum_region_dict)
result[company] += item[1] result[company] += item[2]
return result return result
def pg_get_previous_interface_count(startTime, endTime): def pg_get_previous_interface_count(startTime, endTime):
@ -142,13 +160,13 @@ def pg_get_previous_interface_count(startTime, endTime):
:param endTime: 结束时间, :param endTime: 结束时间,
""" """
result = defaultdict(int) result = defaultdict(int)
sql = """ select interface, sum(count) from {TABLE_NAME} sql = """ select interface, ip, jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by interface""".format(TABLE_NAME=TABLE_NAME) group by interface, ip, jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"])))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["INTERFACE"]))))
if res: if res:
for item in res: for item in res:
result[item[0]] += item[1] result[item[0]] += item[4]
return result return result
@ -159,13 +177,13 @@ def pg_get_previous_menu_count(startTime, endTime):
:param endTime: 结束时间, :param endTime: 结束时间,
""" """
result = defaultdict(int) result = defaultdict(int)
sql = """ select menu, sum(count) from {TABLE_NAME} sql = """ select menu, ip,jobnum,account, sum(count) from {TABLE_NAME}
where logdate >= %s and logdate <= %s and data_type = %s where logdate >= %s and logdate <= %s and data_type = %s
group by menu""".format(TABLE_NAME=TABLE_NAME) group by menu, ip,jobnum,account""".format(TABLE_NAME=TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"])))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=(startTime, endTime, DATA_TYPE["MENU"]))))
if res: if res:
for item in res: for item in res:
result[item[0]] += item[1] result[item[0]] += item[4]
return result return result
def entry(data_type, start, end): def entry(data_type, start, end):
@ -174,17 +192,25 @@ def entry(data_type, start, end):
start = datetime.strptime(start, date_format) start = datetime.strptime(start, date_format)
end = datetime.strptime(end, date_format) end = datetime.strptime(end, date_format)
old_start,old_end = adjust_times(start, end)
start = start.strftime('%Y-%m-%d')
end = end.strftime('%Y-%m-%d')
old_start,old_end = adjust_times(start, end)
print (old_start)
print (old_end)
data = {} data = {}
if data_type == "1": if data_type == "1":
ip_summary_data = pg_get_ip_group_data(start, end) ip_summary_data = pg_get_ip_group_data(start, end)
data = ip_summary_data_format(ip_summary_data) data = ip_summary_data_format(ip_summary_data)
previous_company_dict = pg_get_previous_company_count(old_start, start, "IP") previous_company_dict = pg_get_ip_previous_company_count(old_start, old_end, "IP")
for d in data["summary"]["ip"]: for d in data["summary"]["ip"]:
if previous_company_dict.get(d["company"], 0) == 0: if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0 d["trend"] = 0
print("xxx")
else: else:
print("dddd:"+str(d["req_frequency"]))
print(previous_company_dict.get(d["company"], 0))
d["trend"] = round( d["trend"] = round(
(d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get( (d["req_frequency"] - previous_company_dict.get(d["company"], 0)) / previous_company_dict.get(
d["company"], 0), 4) d["company"], 0), 4)
@ -193,7 +219,7 @@ def entry(data_type, start, end):
account_summary_data = pg_get_account_group_data(start, end) account_summary_data = pg_get_account_group_data(start, end)
data = account_summary_data_format(account_summary_data) data = account_summary_data_format(account_summary_data)
previous_company_dict = pg_get_previous_company_count(old_start, start, "ACCOUNT") previous_company_dict = pg_get_account_previous_company_count(old_start, start, "ACCOUNT")
for d in data["summary"]["account"]: for d in data["summary"]["account"]:
if previous_company_dict.get(d["company"], 0) == 0: if previous_company_dict.get(d["company"], 0) == 0:
d["trend"] = 0 d["trend"] = 0
@ -229,3 +255,6 @@ def entry(data_type, start, end):
d["menu_name"], 0), 4) d["menu_name"], 0), 4)
return data return data
# res = entry("1","2024-07-17 00:00:00","2024-07-17 23:59:59")
# # print res
# logger.info(json.dumps(res))

@ -9,20 +9,18 @@ import json
import traceback import traceback
import random,string import random,string
import traceback,json import traceback,json
import time from datetime import datetime,timedelta,time
from datetime import datetime,timedelta
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron
class DBType(object): class DBType(object):
LIST = 'list' LIST = 'list'
DICT = 'dict' DICT = 'dict'
JOB_TABLE_NAME = "ueba_jobs" JOB_TABLE_NAME = "ueba_analysis_schema.jobs"
ANALYSIS_TABLE_NAME = "ueba_analysis_log"
class DBUtils(object): class DBUtils(object):
@classmethod @classmethod
@ -77,7 +75,7 @@ class DBUtils(object):
""" """
try: try:
sql_list = CPgSqlParam(sql) sql_list = CPgSqlParam(sql)
logger.info("execute sql:"+sql) #logger.info("execute sql:"+sql)
data = CFunction.execute(sql_list) data = CFunction.execute(sql_list)
logger.info("execute result : {}".format(data)) logger.info("execute result : {}".format(data))
return json.loads(data) return json.loads(data)
@ -103,7 +101,7 @@ class DBUtils(object):
return camel_list return camel_list
@classmethod @classmethod
def write_job_status(self,job_id,status,err): def write_job_status(self,job_id,status,err,run_count):
#success #success
if status == 2: if status == 2:
sql = """update {JOB_TABLE_NAME} set status=%s ,complate_time = %s sql = """update {JOB_TABLE_NAME} set status=%s ,complate_time = %s
@ -111,48 +109,67 @@ class DBUtils(object):
CFunction.execute(CPgSqlParam(sql, params=(status,datetime.now(), job_id))) CFunction.execute(CPgSqlParam(sql, params=(status,datetime.now(), job_id)))
#failed #failed
if status == 3: if status == 3:
sql = """update {JOB_TABLE_NAME} set status=%s, err=%s sql = """update {JOB_TABLE_NAME} set status=%s, err=%s ,run_count = %s
where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME) where job_id=%s """.format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id))) CFunction.execute(CPgSqlParam(sql, params=(status, err, job_id,run_count)))
@classmethod @classmethod
def insert_job_record(self,job_id,start_time,end_time,status): def insert_job_record(self,job_id,start_time,end_time,status):
sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status) values(%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME) sql = """insert into {JOB_TABLE_NAME}(job_id,start_time,end_time,status,run_count) values(%s,%s,%s,%s,%s)""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status))) CFunction.execute(CPgSqlParam(sql, params=(job_id,start_time, end_time,status,1)))
#获取 job的执行时间 开始时间-结束时间 #获取 job的执行时间 开始时间-结束时间
@classmethod @classmethod
def get_job_period(self): def get_job_period(self):
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME) sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time,status,run_count,to_char(start_time,'YYYY-MM-DD HH24:MI:SS') as start_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=()))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=())))
print json.dumps(res)
data = {} data = {}
if res: if res:
data["job_id"]=res[0][0] data["job_id"]=res[0][0]
data["end_time"]=res[0][1] data["end_time"]=res[0][1]
data["status"]=res[0][2]
fields=["job_id", "end_time"] data["run_count"]=res[0][3]
#data = DBUtils.transition(fields, sql, DBType.LIST) data["start_time"]=res[0][4]
if len(data)==0: if len(data)==0:
start_time = datetime.now() - timedelta(minutes=5) start_time = datetime.now() - timedelta(minutes=20)
end_time = datetime.now() end_time = datetime.now()- timedelta(minutes=15)
return start_time,end_time,2,0,""
if len(data)>0: if len(data)>0:
#运行中
if data["status"] ==1:
return None,None, 1,data["run_count"],data["job_id"]
#运行失败 重试不超过3次
if data["status"] ==3 and data["run_count"]<=3:
start_time = data["start_time"]
end_time = data["end_time"]
return start_time,end_time,3,data["run_count"],data["job_id"]
start_time = data["end_time"] start_time = data["end_time"]
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
# 检查时间是否为23:59:59
end_time = data["end_time"] if start_time.hour == 23 and start_time.minute == 59 and start_time.second == 59:
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + timedelta(minutes=5) # 是的话,增加一天并设置时间为00:00:00
if end_time > datetime.now(): start_time = start_time + timedelta(days=1)
return None,None start_time = start_time.replace(hour=0, minute=0, second=0)
end_time = start_time + timedelta(minutes=5)
#kafka数据到es会存在5分钟左右的数据延迟这里设置15分钟
if end_time > (datetime.now()-timedelta(minutes=15)):
logger_cron.info("end_time:"+end_time.strftime("%Y-%m-%d %H:%M:%S")+",datetime.now:"+datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return None,None,2,999,""
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time) start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time #需要在入口生成jobid 所以给空
return start_time,end_time,data["status"],data["run_count"],""
@classmethod @classmethod
#处理跨天的场景 #处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time): def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date(): if start_time.date() != end_time.date():
end_time = datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999)) end_time = datetime.combine(start_time.date(), time(23, 59, 59))
return start_time, end_time return start_time, end_time
# if __name__ == '__main__': if __name__ == '__main__':
# DBUtils.get_job_period() start,end = DBUtils.get_job_period()
print ( "job:运行参数:{},{}".format(start,end))

@ -1,90 +0,0 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import json
import os,re
import codecs
import csv
import ConfigParser
from isoc.utils.esUtil import EsUtil
print json.dumps(es_host_list)
# conf_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'conf')
# ini_path = os.path.join(conf_path, 'conf.ini')
# config = ConfigParser.ConfigParser()
# config.read(ini_path)
# ES_HOST = config.get('COMMON', 'es_host')
# ES_PER_COUNT = config.get('COMMON', 'es_per_count')
# ES_INDEX_NAME = config.get('COMMON', 'es_index_name')
# CSV_FILE_PATH = config.get('COMMON', 'csv_file_path')
def createIndex():
es = Elasticsearch(es_host_list)
es.create(index="urba_analyse_2024_06", ignore=400)
map={
"ip1": "text",
"ip2": "text",
"ip3": "text",
"ip4": "text",
}
es_instance = EsUtil()
res = es_instance.create_index_simple("urba_analyse_2024_06")
return res
# def generate_ip_range(start_ip, end_ip):
# start_parts = list(map(int, start_ip.split('.')))
# end_parts = list(map(int, end_ip.split('.')))
# ip_range = []
# while start_parts < end_parts:
# ip_range.append('.'.join(map(str, start_parts)))
# start_parts[3] += 1
# for i in range(3, 0, -1):
# if start_parts[i] == 256:
# start_parts[i] = 0
# start_parts[i-1] += 1
# ip_range.append('.'.join(map(str, start_parts))) # 添加结束IP地址
# return ip_range
# # scroll查询数据
# def get_ip_summary_data(start_time,end_time,query_body):
# es = Elasticsearch(ES_HOST)
# msg = es.search(index=ES_INDEX_NAME,scroll="3m",size=ES_PER_COUNT,_source_includes= ["cookies","url","sip","dip"], query=query_body)
# result = msg['hits']['hits']
# total = msg['hits']['total']
# scroll_id = msg['_scroll_id']
# for i in range(0,int(total["value"]/ES_PER_COUNT)+1):
# query_scroll = es.scroll(scroll_id=scroll_id, scroll='3m')["hits"]["hits"]
# result += query_scroll
# return result
# # 读取csv文件 获取ip归属地
# def get_ip_area_relation(csv_file_path):
# iprange_map = {}
# with codecs.open(csv_file_path, mode='r',encoding='utf-8') as file:
# csv_reader = csv.reader(file)
# for row in csv_reader:
# headers = next(csv_reader)
# ip_start = headers[0]
# ip_end = headers[1]
# ip_range = generate_ip_range(ip_start, ip_end)
# ip_area = headers[5]
# print (ip_area)
# for ip in ip_range:
# iprange_map[ip] = ip_area
# return iprange_map
# get_ip_area_relation("/tmp/data/ip_area_relation.csv")

@ -12,13 +12,13 @@ from appsUtils import env
APPFOLDERNAME = 'uebaMetricsAnalysis' APPFOLDERNAME = 'uebaMetricsAnalysis'
APP_CRON_FOLDERNAME = 'uebaMetricsAnalysis_cron'
def get_clean_files(): def get_clean_file_path():
fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files" fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files"
if not os.path.exists(fileroot): if not os.path.exists(fileroot):
os.mkdir(fileroot) os.mkdir(fileroot)
return fileroot
def get_logger(logfile): def get_logger(logfile):
""" """
@ -43,3 +43,4 @@ def get_logger(logfile):
logger = get_logger(APPFOLDERNAME) logger = get_logger(APPFOLDERNAME)
logger_cron = get_logger(APP_CRON_FOLDERNAME)

@ -0,0 +1,205 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import re,os,json
import codecs
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam
date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}.json$')
LOG_TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
# 获取当前日期并格式化为"年-月"
def get_current_year_month():
now = datetime.now()
return now.strftime("%Y_%m")
# 获取当前月份的第一天并格式化为"年-月-日"
def get_first_day_of_current_month():
now = datetime.now()
first_day = now.replace(day=1)
return first_day.strftime("%Y-%m-%d")
# 获取当前日期,然后计算下个月的第一天
def get_first_day_of_next_month():
now = datetime.now()
if now.month == 12:
next_month = now.replace(year=now.year+1, month=1, day=1)
else:
next_month = now.replace(month=now.month+1, day=1)
return next_month.strftime("%Y-%m-%d")
#获取表名
def get_table_name():
year_month = get_current_year_month()
return LOG_TABLE_NAME+'_'+ year_month
#获取表区间
def get_table_data_range():
start= get_first_day_of_current_month()
end = get_first_day_of_next_month()
return start,end
#创建分区表
def create_fq_table():
table_name = get_table_name()
start,end = get_table_data_range()
logger_cron.info("INSERT:准备创建分区表{},{},{}".format(table_name,start,end))
sql = """CREATE TABLE if not EXISTS {TABLE_NAME} PARTITION OF ueba_analysis_schema.logs
FOR VALUES FROM ('{START}') TO ('{END}');""".format(TABLE_NAME=table_name,START = start,END=end)
CFunction.execute(CPgSqlParam(sql))
logger_cron.info("INSERT:创建分区表完成")
#读取大文件
def read_large_json_file(filename, chunk_size=1024*1024*5): # 每次读取5MB的数据
json_object = ''
with codecs.open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
json_object += chunk
data = json.loads(json_object)
return data
def get_all_files(path):
# 列出所有包含匹配模式的文件名
files = []
for filename in os.listdir(path):
if date_pattern.search(filename):
if datetime.now().strftime("%Y-%m-%d")+".json" != filename:
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
def insert_data(files):
for itemFile in files:
if os.path.exists(itemFile.get("path",'')):
data =read_large_json_file(itemFile.get("path",''))
logger_cron.info("INSERT: 准备读取聚合文件:"+itemFile.get('path',''))
logger_cron.info("INSERT: 读取聚合文件完成")
ip_list = data.get('ip', [])
account_list = data.get('account', [])
interface_list = data.get('interface', [])
menu_list = data.get('menu', [])
logger_cron.info("INSERT: IP维度 " +str(len(ip_list)))
logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list)))
logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list)))
logger_cron.info("INSERT: MENU维度 " +str(len(menu_list)))
basename, extension = os.path.splitext(itemFile.get('filename', ''))
log_date = basename
print ("filename:"+log_date)
records = []
for item in ip_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append({"menu":menu, "ip":ip, "account":account, "jobnum":jobnum, "count":count, "logdate":logdate,"datatype":datatype,"interface":interface})
data_list=[]
for item in records:
data_list.append(item.get('menu', ''))
data_list.append(item.get('ip', ''))
data_list.append(item.get('account', ''))
data_list.append(item.get('jobnum', ''))
data_list.append(item.get('count', ''))
data_list.append(item.get('logdate', ''))
data_list.append(item.get('datatype', ''))
data_list.append(item.get('interface', ''))
sql = """INSERT INTO ueba_analysis_schema.logs(menu,ip,account,jobnum,count,logdate,data_type,interface)
VALUES {}""".format(",".join(['(%s,%s,%s,%s,%s,%s,%s,%s)' for x in records]))
CFunction.execute(CPgSqlParam(sql, params=data_list))
logger_cron.info("INSERT: 数据插入完成")
#重命名文件
logger_cron.info(itemFile.get('path',''))
logger_cron.info("done_"+itemFile.get('filename', ''))
os.rename(itemFile.get('path',''),get_clean_file_path()+"/done_"+itemFile.get('filename', ''))
logger_cron.info("INSERT: 重命名文件完成,"+itemFile.get('filename', ''))
def delete_files(directory_path):
"""
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件
其中日期部分早于10天以前
:param directory_path: 要检查的目录的绝对路径
"""
# 计算10天前的日期
ten_days_ago = datetime.now() - timedelta(days=10)
ten_days_ago_str = ten_days_ago.strftime('%Y-%m-%d')
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})')
# 遍历目录中的文件
for filename in os.listdir(directory_path):
match = date_pattern.search(filename)
if match:
file_date_str = match.group(1)
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
# 检查文件日期是否在10天前
if file_date <= ten_days_ago:
file_path = os.path.join(directory_path, filename)
os.remove(file_path)
logger_cron.info("INSERT: 删除文件"+file_path)
def entry():
base_path = get_clean_file_path()
files = get_all_files(base_path)
logger_cron.info("INSERT:获取文件数量"+str(len(files)))
#创建分区表
create_fq_table()
#数据入库
insert_data(files)
#删除文件
delete_files(base_path)
Loading…
Cancel
Save