'代码提交'

dev
TANGWY 4 months ago
parent 3e379403df
commit b97f549d89
  1. 210
      corn/log_alarm.py
  2. 57
      corn/ueba_corn_data_insert.py
  3. 22
      corn/ueba_corn_pg.py
  4. 18
      test.py
  5. 141
      utils/base_dataclean_pg.py
  6. 4
      utils/dashboard_data_pg.py
  7. 31
      utils/db2json.py
  8. 7
      utils/ext_logging.py
  9. 129
      utils/file_to_pg.py
  10. 4
      views/dashboard_views.py

@ -0,0 +1,210 @@
# coding:utf-8
import sys
import uuid
import json
import time
import random
path = str(sys.path[0])
home_path = path.split("isop_uebaapiData")[0]
sys.path.append(home_path)
from isop_uebaapiData.util import send_logs
def alarm(cookies, api):
"""2、HTTP日志"""
inputstr = '''[{"msgtype":1,"hash":"8DE9-BDAB-F622-2FA8","dev_ip":"10.67.5.17","product":"uts"},{"sid":"6004744450036c44f815500016d00a5f5151105430a3ed","timestamp":1567673939,"sip":"10.67.0.52","sport":5624,"dip":"10.67.0.53","dport":80,"protocol":6,"app":3087428650795009,"app_proto":8,"direct":4,"app.detail":{"method":"GET","http_protocol":"1.1","ret_code":200,"host":"10.67.1.1","uri":"/webtest/uploadFile.php","referer":"http://[2222::65]/webtest/","content_type":" multipart/form-data; boundary=----WebKitFormBoundary2zcCUl4lQf1h7A7S","content_type_server":" text/html","server":"Apache/2.4.4 (Win32) OpenSSL/0.9.8y PHP/5.4.19","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36","link":"","cookies":"loginmainacctid=wangshiguang;operatorId=d2601586;com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;","content_encoding":"","location":"","content_length":70080,"content_length_server":200,"set_cookie":"","range":"","connection":"keep-alive","connection_server":"Keep-Alive","x_forwarded_for":"","post_data":"LS0tLS0tV2ViS2l0Rm9ybUJvdW5kYXJ5MnpjQ1VsNGxRZjFoN0E3Uw0KQ29udGVudC1EaXNwb3NpdGlvbjogZm9ybS1kYXRhOyBuYW1lPSJmaWxlIjsgZmlsZW5hbWU9IjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1Ig0=","response_body":"VXBsb2FkOiAwMDAxYWQ0NDFkY2IzODYyMThhNzY5OTJhY2Y4YjcwNTxiciAvPlR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTxiciAvPlNpemU6IDY4LjEyNzkyOTY4NzUgS2I8YnIgLz5UZW1wIGZpbGU6IEQ6XHhhbXBwXHRtcFxwaHA2ODI1LnRtcDxiciAvPjAwMDFhZDQ0MWRjYjM4NjIxOGE3Njk5MmFjZjhiNzA1IGFscmVhZHkgZXhpc3RzLiA="}}]'''
inputarr = json.loads(inputstr, strict=False)
# 随机生成timestamp
inputarr[1]["timestamp"] = 1720782138
inputarr[1]["sid"] = str(uuid.uuid1())
# inputarr[1]["sip"] = "10.67.4.33"
inputarr[1]["sip"] = generate_random_ip()
inputarr[1]["dip"] = "10.67.1.1"
inputarr[1]["dport"] = "8180"
inputarr[1]["app.detail"]["uri"] = "/alarmtest.action?BMECID=352432757&BMETimestamp=1692788489260&queryNumber=158713459"
inputarr[1]["app.detail"]["host"] = api
inputarr[1]["app.detail"]["cookies"] = cookies
return json.dumps(inputarr)
def generate_random_ip():
# 固定前缀 "192.168."
prefix = "192.168."
# 生成随机的第三和第四段IP地址
third_octet = random.randint(0, 255)
fourth_octet = random.randint(0, 255)
# 拼接IP地址
ip = "{}{}.{}".format(prefix, third_octet, fourth_octet)
return ip
def AbIDVisitAPINums510404():
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com",
"good.alarm.com"]
info_list = [
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["武汉"][
0] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 60],
["u-locale=zh_CN;loginmainacctid=zhang3;operatorId=" + ID2Area["荆州"][
2] + ";com.huawei.boss.CURRENT_MENUID=BLAR_ChargeCrm3_WEB;", 120]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
return "510405场景的告警数据已生成"
def get_random_jobnum():
# 定义包含不同前缀的字符串数组
prefix_strings = [
['10243', '10895', '10134', '10781', '10962'], # 10打头的字符串示例
['11089', '11057', '11023', '11016', '11030'], # 110打头的字符串示例
['14076', '14049', '14098', '14032', '14061'], # 140打头的字符串示例
['26054', '26013', '26087', '26029', '26061'], # 260打头的字符串示例
['20083', '20015', '20072', '20096', '20048'], # 200打头的字符串示例
['19035', '19017', '19049', '19082', '19096'], # 190打头的字符串示例
['180237', '180276', '180204', '180295', '180219'] # 1802打头的字符串示例
]
# 随机选择一个前缀数组
selected_prefix_array = random.choice(prefix_strings)
# 随机选择一个具体的字符串
selected_string = random.choice(selected_prefix_array)
return selected_string
def get_random_person():
people_list = [
"Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Henry", "Isabel", "Jack",
"Kate", "Liam", "Mia", "Noah", "Olivia", "Patrick", "Quinn", "Rachel", "Samuel", "Taylor",
"Ursula", "Victor", "Wendy", "Xavier", "Yvonne", "Zachary", "Amelia", "Benjamin", "Catherine",
"Daniel", "Ella", "Finn", "Gabriella", "Hugo", "Isabella", "Jacob", "Katherine", "Lucas",
"Madeline", "Nathan", "Olivia", "Peter", "Quincy", "Riley", "Sophia", "Thomas", "Uma",
"Vincent", "Willa", "Xander", "Yasmine", "Zoe", "Aaron", "Bella", "Connor", "Daisy", "Ethan",
"Freya", "George", "Hannah", "Isaac", "Julia", "Kevin", "Lily", "Matthew", "Nora", "Owen",
"Penelope", "Quentin", "Rebecca", "Samantha", "Tristan", "Ursula", "Violet", "Wyatt", "Ximena",
"Yara", "Zane", "Anna", "Blake", "Charlotte", "David", "Eva", "Felix", "Grace", "Hector",
"Ivy", "James", "Kylie", "Luna", "Milo", "Natalie", "Oscar", "Paige", "Quinn", "Ruby",
"Simon", "Tessa", "Uriel", "Victoria", "Wesley", "Xavier", "Yasmine", "Zara"
# 继续添加更多的名称...
]
random_person = random.choice(people_list)
return random_person
def get_random_menu():
# 定义系统菜单列表
system_menu = [
"主页", "设置", "个人资料", "消息", "通知", "帮助", "帐户", "关于", "联系我们", "服务",
"购物车", "订单", "支付", "地址", "密码", "登出", "登入", "注册", "搜索", "反馈",
"隐私政策", "条款与条件", "FAQ", "文档", "论坛", "博客", "新闻", "视频", "图片", "音频",
"下载", "上传", "社交", "分享", "喜欢", "收藏", "评论", "点赞", "订阅", "播放列表",
"播放历史", "推荐", "推广", "活动", "招聘", "加入我们", "团队", "合作伙伴", "协议",
"项目", "贡献", "捐赠", "赞助", "开发", "设计", "产品", "技术支持", "客户支持",
"销售", "市场营销", "业务", "管理", "数据", "分析", "报告", "绩效", "策略",
"创新", "优化", "测试", "安全", "备份", "恢复", "更新", "版本", "发布",
"变更日志", "许可证", "授权", "注册码", "订购", "付款方式", "配置", "设置向导", "快捷方式",
"自定义", "调整", "模板", "样式", "主题", "颜色", "字体", "大小", "布局",
"格式", "检查更新", "下载中心", "资源", "链接", "网站地图", "计划", "时间表", "日历",
"事件", "提醒", "警报", "通讯录", "联系人", "目录", "分类", "标签", "搜索结果",
"页面", "文章", "产品", "服务", "项目", "案例", "作品", "示例", "演示",
"展示", "参考", "指南", "教程", "培训", "学习", "认证", "证书", "奖章",
"徽章", "勋章", "成就", "积分", "排名", "比赛", "竞赛", "评估", "评价",
"考核", "调查", "研究", "分析", "文章", "书籍", "参考文献", "论文", "报告",
"期刊", "杂志", "图书馆", "书架", "存档", "档案", "历史", "数据", "统计",
"指标", "指数", "系列", "序列", "集合", "列表", "图表", "图形", "统计",
"数字", "计数", "数量", "比率", "百分比", "概述", "汇总", "详情", "全球",
"国家", "地区", "城市", "位置", "地点", "位置", "方向", "距离", "路线",
"导航", "地图", "位置", "坐标", "GPS", "导航", "位置", "追踪", "监控",
"控制台", "管理面板", "仪表板", "仪表盘", "仪表板", "仪表盘", "指示灯", "信号", "状态",
"进度", "完成", "处理", "操作", "任务", "流程", "工作流", "记录", "日志",
"日志", "评论", "反馈", "意见", "建议", "建议", "改进建议", "问题", "解决方案",
"答案", "解释", "说明", "描述", "详情", "信息", "数据", "内容", "媒体",
"文档", "文件", "附件", "图像", "图片", "照片", "图表", "图形", "表格",
"表单", "输入", "输出", "导入", "导出", "分享", "链接", "电子邮件", "消息",
"聊天", "对话", "会话", "会议", "通话", "视频", "音频", "音乐", "歌曲",
"播放", "暂停", "停止", "跳过", "前进", "回放", "录制", "编辑", "剪辑",
"修剪", "调整", "滤镜", "效果", "转换", "格式", "编码", "解码", "播放器",
"播放列表", "收藏夹", "书签", "标签", "标签", "评论", "反馈", "评分", "评级",
"排名", "推荐", "推广", "广告", "宣传", "促销", "优惠", "折扣", "优惠券",
"礼品卡", "优惠码", "资料", "信息", "内容", "资源", "资产", "库存", "存储",
"存储", "备份", "还原", "升级", "更新", "版本", "修复", "修复", "故障",
"错误", "问题", "错误", "故障", "问题", "警告", "异常", "异常", "异常",
"重试", "恢复", "恢复", "取消", "撤销", "回滚", "复制", "粘贴", "剪切",
"移动", "重命名", "删除", "清除", "清理", "清除", "清理", "优化", "优化",
"增加", "增强", "强化", "加强", "改进", "改善", "优化", "优化", "设计",
"开发", "测试", "部署", "配置", "设置", "安装", "卸载", "升级", "更新",
"修复", "修正", "修补", "更新", "安全", "保护", "防护", "防御", "防止",
"检查", "扫描", "监控", "跟踪", "追踪", "审计", "审查", "测试", "测量"
]
# 随机选择一个菜单项
random_menu_item = random.choice(system_menu)
return random_menu_item
if __name__ == '__main__':
datalist = {"TCP_5011": list()}
ID2Area = {
"武汉": ["1101820", "1101821", "1101822", "1101823", "1101825"],
"荆州": ["2001800", "2001801", "2001808"],
"江汉": ["1801820", "1801810"],
"省公司市场部": ["1002011", "1002012", "1002013"]
}
api_list = ["test.alarm.com/webtest", "alarm.com/testalarm", "business.system..alarmcom", "hhh.alarm.com", "good.alarm.com","baidu.com","sohu.com","xinlang.com","erpx.com"]
info_list = [
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 5000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 5000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 5000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 2000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 2000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 2000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 2000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 3000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000],
["u-locale=zh_CN; loginmainacctid="+get_random_person()+"; userticket=209@9889@23223@10.0.1.183@lis8; operatorId=" + get_random_jobnum() + "; com.huawei.boss.CURRENT_MENUID="+get_random_menu()+";", 1000]
]
for i in range(len(info_list)):
cookies = info_list[i][0]
count = info_list[i][1]
for j in range(count):
api = random.choice(api_list)
datalist["TCP_5011"].append(alarm(cookies, api))
for key in datalist.keys():
send_logs(datalist[key])
print "510405场景的告警数据已生成"

@ -12,9 +12,9 @@ import traceback
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
import calendar import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from commandCyberRange.utils.db2json import DBUtils, DBType from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean import entry from uebaMetricsAnalysis.utils.file_to_pg import entry
JOB_STATUS ={ JOB_STATUS ={
"RUNNING":1, "RUNNING":1,
@ -23,56 +23,15 @@ JOB_STATUS ={
} }
class UserCron: class UserCron:
def generate_job_id():
timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
return str(timestamp) + random_letters
#获取 job的执行时间 开始时间-结束时间
def get_job_period(self):
sql = "select job_id, end_time from ueba_jobs order by end_time desc limit 1"
fields=["job_id", "end_time"]
data = DBUtils.transition(fields, sql, DBType.LIST)
start_time = ''
end_time = ''
if len(data)==0:
start_time = datetime.datetime.now() - timedelta(minutes=5)
end_time = datetime.datetime.now()
if len(data)>0:
start_time = data[0].get('end_time')
end_time = data[0].get('end_time') + timedelta(minutes=5)
if end_time > datetime.datetime.now():
return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time
#处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date():
end_time = datetime.datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999))
return start_time, end_time
#每5分钟执行一次 #每5分钟执行一次
def processing(self): def processing(self):
try: try:
logger.info("job:开始执行") logger_cron.info("INSERT:开始执行")
start,end=self.get_job_period() entry()
job_id =self.generate_job_id() logger_cron.info("INSERT:"+"执行完成")
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end))
if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行")
return
logger.info("job:"+"准备获取es数据")
#entry(start,end)
logger.info("job:"+"执行完成")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
except Exception ,e: except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc()) err_info=traceback.format_exc()
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info) logger_cron.error("INSERT:"+"执行失败,"+err_info)
logger.error(err_info)
raise
if __name__ == '__main__': if __name__ == '__main__':
UserCron().processing() UserCron().processing()

@ -12,7 +12,7 @@ import traceback,json
import time import time
from datetime import datetime,timedelta from datetime import datetime,timedelta
import calendar import calendar
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger_cron
from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
from uebaMetricsAnalysis.utils.base_dataclean_pg import entry from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
@ -24,6 +24,7 @@ JOB_STATUS ={
class UserCron: class UserCron:
#生成job_id
def generate_job_id(self): def generate_job_id(self):
timestamp = int(time.time() * 1000) timestamp = int(time.time() * 1000)
random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7)) random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
@ -33,23 +34,26 @@ class UserCron:
def processing(self): def processing(self):
job_id =self.generate_job_id() job_id =self.generate_job_id()
try: try:
logger.info("job:开始执行") logger_cron.info("JOB:"+job_id+"开始执行")
start,end= DBUtils.get_job_period() start,end= DBUtils.get_job_period()
if start is None or end is None: if start is None or end is None:
logger.info("job:结束时间大于服务器时间不执行") logger_cron.info("JOB:"+job_id+"结束时间大于服务器时间不执行")
return return
logger_cron.info("JOB:"+job_id+"运行参数:{},{}".format(start,end))
logger_cron.info("JOB:"+job_id+"准备将job写入job表")
DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING")) DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
logger.info("job:运行参数:{}".format(start,end)) logger_cron.info("JOB:"+job_id+"完成job表写入")
logger.info("job:"+"准备获取es数据") logger_cron.info("JOB:"+job_id+"准备获取es数据")
entry(start,end) entry(start,end,job_id)
logger.info("job"+"执行完成") logger_cron.info("JOB"+job_id+"完成es数据获取")
DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"") DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"")
logger_cron.info("JOB:"+job_id+"更新job表状态完成")
except Exception ,e: except Exception ,e:
err_info="定时任务执行失败:".format(str(e), traceback.format_exc()) err_info=traceback.format_exc()
logger_cron.error("JOB:"+job_id+"执行失败:"+err_info)
DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info) DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info)
logger.error(err_info)
raise raise
if __name__ == '__main__': if __name__ == '__main__':

@ -1,18 +0,0 @@
#encoding=utf-8
import json
from isoc.utils.esUtil import EsUtil
def createIndex():
map={
"field1": "text",
"field2": "text"
}
es_util_instance = EsUtil()
res = es_util_instance.create_index_simple("bsa_traffic*",3,scroll_search)
return res
res = createIndex()
print(res)

@ -6,9 +6,9 @@ import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
import calendar import calendar
from esUtil import EsUtil from esUtil import EsUtil
import pytz from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron,get_clean_file_path
size = 1000# 可以根据实际情况调整 size = 9999#根据实际情况调整
DATA_TYPE = { DATA_TYPE = {
"IP": 1, "IP": 1,
@ -19,11 +19,10 @@ DATA_TYPE = {
## IP维度 ## IP维度
def get_ip_group_data(index,startTime,endTime): def get_ip_group_data(index,startTime,endTime):
try: query_body={
query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "range": {"timestamp": {"gt": startTime,"lte": endTime}}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -37,32 +36,25 @@ def get_ip_group_data(index,startTime,endTime):
} }
} }
} }
after_key = None after_key = None
es_util_instance = EsUtil() es_util_instance = EsUtil()
datas=[] datas=[]
while True: while True:
if after_key: if after_key:
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
try: response = es_util_instance.search(index,query_body)
response = es_util_instance.search(index,query_body) for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
except Exception,e: data = {
print "err" "data_type": DATA_TYPE.get("IP"),
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: "count": bucket['doc_count'],
data = { "jobnum": bucket['key']['trojan_type'] ,
"data_type": DATA_TYPE.get("IP"), "ip":bucket['key']['sip']
"count": bucket['doc_count'], }
"jobnum": bucket['key']['trojan_type'] , datas.append(data)
"ip":bucket['key']['sip'] after_key = bucket["key"]
} if not response["aggregations"]["composite_buckets"].get("after_key"):
datas.append(data) break
after_key = bucket["key"] after_key = response["aggregations"]["composite_buckets"]["after_key"]
if not response["aggregations"]["composite_buckets"].get("after_key"):
break
after_key = response["aggregations"]["composite_buckets"]["after_key"]
except Exception,e:
print "x_err:"+e.message
return datas return datas
## 账号维度 ## 账号维度
@ -70,7 +62,7 @@ def get_account_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "range": {"timestamp": {"gt": startTime,"lte": endTime}}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -92,7 +84,6 @@ def get_account_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("ACCOUNT"), "data_type": DATA_TYPE.get("ACCOUNT"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -114,7 +105,7 @@ def get_interface_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "range": {"timestamp": {"gt": startTime,"lte": endTime}}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -124,7 +115,7 @@ def get_interface_group_data(index,startTime,endTime):
{"interface": { "terms": {"field": "interface"} }}, {"interface": { "terms": {"field": "interface"} }},
{"sip": { "terms": { "field": "sip"}}}, {"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}}, {"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}
] ]
} }
} }
@ -138,7 +129,6 @@ def get_interface_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("INTERFACE"), "data_type": DATA_TYPE.get("INTERFACE"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -162,7 +152,7 @@ def get_menu_group_data(index,startTime,endTime):
query_body={ query_body={
"size": 0, "size": 0,
"query": { "query": {
"range": {"timestamp": {"gte": startTime,"lte": endTime}} "range": {"timestamp": {"gt": startTime,"lte": endTime}}
}, },
"aggs": { "aggs": {
"composite_buckets": { "composite_buckets": {
@ -172,7 +162,7 @@ def get_menu_group_data(index,startTime,endTime):
{"worm_family": { "terms": {"field": "worm_family"} }}, {"worm_family": { "terms": {"field": "worm_family"} }},
{"sip": { "terms": { "field": "sip"}}}, {"sip": { "terms": { "field": "sip"}}},
{"account": { "terms": { "field": "account"}}}, {"account": { "terms": { "field": "account"}}},
{"trojan_type": { "terms": { "field": "trojan_type"}}}, {"trojan_type": { "terms": { "field": "trojan_type"}}}
] ]
} }
} }
@ -186,7 +176,6 @@ def get_menu_group_data(index,startTime,endTime):
query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key query_body["aggs"]["composite_buckets"]["composite"]["after"] = after_key
response = es_util_instance.search(index,query_body) response = es_util_instance.search(index,query_body)
for bucket in response["aggregations"]["composite_buckets"]["buckets"]: for bucket in response["aggregations"]["composite_buckets"]["buckets"]:
#print(bucket['key']['sip'] + ":" + str(bucket['doc_count']))
data = { data = {
"data_type": DATA_TYPE.get("MENU"), "data_type": DATA_TYPE.get("MENU"),
"account": bucket['key']['account'], "account": bucket['key']['account'],
@ -204,24 +193,26 @@ def get_menu_group_data(index,startTime,endTime):
return datas return datas
def datetime_to_timestamp(dt): def datetime_to_timestamp(dt):
return int(time.mktime(time.strptime(dt, "%Y-%m-%d %H:%M:%S"))*1000) dtstr=dt.strftime("%Y-%m-%d %H:%M:%S")
def clean_data(read_index,start,end): return int(time.mktime(time.strptime(dtstr,"%Y-%m-%d %H:%M:%S"))*1000)
def clean_data(read_index,start,end,jobid):
data_ip = get_ip_group_data(read_index,start,end) data_ip = get_ip_group_data(read_index,start,end)
# print "data_ip:"+str(len(data_ip))
data_account = get_account_group_data(read_index,start,end) data_account = get_account_group_data(read_index,start,end)
# print "data_account:"+str(len(data_account))
data_interface = get_interface_group_data(read_index,start,end) data_interface = get_interface_group_data(read_index,start,end)
# print "data_interface:"+str(len(data_interface))
data_menu = get_menu_group_data(read_index,start,end) data_menu = get_menu_group_data(read_index,start,end)
# print "data_menu:"+str(len(data_menu))
res_data = data_ip+data_account+data_interface+data_menu
print ("resdata:"+json.dumps(res_data)) if len(data_ip) == 0 and len(data_account) == 0 and len(data_interface) == 0 and len(data_menu) == 0:
logger_cron.info("JOB:"+jobid+",es中未获取到数据,无需做数据合并")
return
logger_cron.info("JOB:"+jobid+",ip维度获取到 "+str(len(data_ip))+" 条数据")
logger_cron.info("JOB:"+jobid+",账号维度获取到 "+str(len(data_account))+" 条数据")
logger_cron.info("JOB:"+jobid+",接口维度获取到 "+str(len(data_interface))+" 条数据")
logger_cron.info("JOB:"+jobid+",菜单维度获取到 "+str(len(data_menu))+" 条数据")
#todo 读取上一次5分钟的文件,与这5分钟的文件合并 #todo 读取上一次5分钟的文件,与这5分钟的文件合并
#合并完成后 写文件 #合并完成后 写文件
group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start) group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid)
def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start): def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, start,jobid):
ipGroupStr = "ip,jobnum" ipGroupStr = "ip,jobnum"
ipGroup = group_and_sum(data_ip, ipGroupStr) ipGroup = group_and_sum(data_ip, ipGroupStr)
accountGroupStr = "account,jobnum" accountGroupStr = "account,jobnum"
@ -231,7 +222,6 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
menuGroupStr = "menu,ip,account,jobnum" menuGroupStr = "menu,ip,account,jobnum"
menuGroup = group_and_sum(data_menu, menuGroupStr) menuGroup = group_and_sum(data_menu, menuGroupStr)
data = {} data = {}
data["ip"] = ipGroup data["ip"] = ipGroup
data["account"] = accountGroup data["account"] = accountGroup
@ -239,34 +229,32 @@ def group_and_write_to_file(data_ip, data_account, data_interface, data_menu, st
data["menu"] = menuGroup data["menu"] = menuGroup
# 获取当前工作目录 # 获取当前工作目录
current_dir = os.getcwd() # current_dir = os.getcwd()
base_path = get_clean_file_path()
logger_cron.info("JOB: "+jobid+",写入文件base路径"+base_path)
time_struct = time.gmtime(long(start) / 1000.0) # UTC时间 time_struct = time.gmtime(long(start) / 1000.0) # UTC时间
date_time = time.strftime('%Y-%m-%d', time_struct) date_time = time.strftime('%Y-%m-%d', time_struct)
file_path = os.path.join(base_path,date_time + '.json')
file_path = os.path.join(current_dir, 'files/' + date_time + '.json')
logger_cron.info("JOB:"+jobid+", 写入文件路径"+file_path)
all_data = [data] all_data = [data]
logger_cron.info("JOB: "+jobid+",准备读取已有文件")
if os.path.exists(file_path): if os.path.exists(file_path):
# 打开文件并读取内容 # 打开文件并读取内容
with codecs.open(file_path, 'r', encoding='utf-8') as file: with codecs.open(file_path, 'r', encoding='utf-8') as file:
content = file.read() content = file.read()
old_json_data = json.loads(content) old_json_data = json.loads(content)
all_data = [data, old_json_data] all_data = [data, old_json_data]
logger_cron.info("JOB:"+jobid+", 读取已有文件完成")
merged_data = merge_data(all_data) merged_data = merge_data(all_data)
# 使用codecs模块以UTF-8编码打开文件 # 使用codecs模块以UTF-8编码打开文件
f = codecs.open(file_path, 'w', encoding='utf-8') f = codecs.open(file_path, 'w', encoding='utf-8')
json_data = json.dumps(merged_data) json_data = json.dumps(merged_data)
# 写入Unicode字符串
f.write(json_data) f.write(json_data)
# 关闭文件
f.close() f.close()
logger_cron.info("JOB: "+jobid+",写入文件完成")
def group_and_sum(data, by_fields="ip,jobnum"): def group_and_sum(data, by_fields="ip,jobnum"):
# 将by_fields转换为列表 # 将by_fields转换为列表
by_fields_list = by_fields.split(',') by_fields_list = by_fields.split(',')
@ -347,28 +335,23 @@ def merge_data(datasets):
#入口 #入口
def entry(start,end): def entry(start,end,jobid):
base_index ="bsa_traffic*" base_index ="bsa_traffic*"
es_util_instance = EsUtil() es_util_instance = EsUtil()
# start = datetime_to_timestamp(start) start = datetime_to_timestamp(start)
# end = datetime_to_timestamp(end) end = datetime_to_timestamp(end)
res=es_util_instance.get_available_index_name(start,end,base_index) res=es_util_instance.get_available_index_name(start,end,base_index)
print "xxxx:"+str(len(res)) logger_cron.info("JOB:"+jobid+",index为"+json.dumps(res))
if len(res)==0: if len(res)==0:
return return
index =",".join(res) index =",".join(res)
clean_data(index,start,end) clean_data(index,start,end,jobid)
# start = 1720776186000
# end = 1720776486000
# # # 将 datetime 对象转换为秒级时间戳
start = 1720772586000 # # timestamp_seconds = time.mktime(dt.timetuple())
end = 1720776186000 # # # 获取微秒数
# # 将 datetime 对象转换为秒级时间戳 # # microseconds = dt.microsecond
# timestamp_seconds = time.mktime(dt.timetuple()) # # # 转换为毫秒级时间戳
# # 获取微秒数 # # timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
# microseconds = dt.microsecond # entry(start,end)
# # 转换为毫秒级时间戳
# timestamp_milliseconds = int(timestamp_seconds * 1000 + microseconds / 1000.0)
entry(start,end)

@ -12,10 +12,9 @@ from dashboard_data_conversion import ip_summary_data_format, account_summary_da
interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code interface_summary_data_format, menu_summary_data_format, adjust_times, jobnum_region_dict,find_region_by_code
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
from ext_logging import logger from ext_logging import logger
TABLE_NAME = "ueba_logs" TABLE_NAME = "ueba_analysis_schema.logs"
DATA_TYPE = { DATA_TYPE = {
"IP": 1, "IP": 1,
@ -24,7 +23,6 @@ DATA_TYPE = {
"MENU": 4, "MENU": 4,
} }
def pg_get_ip_group_data(startTime, endTime): def pg_get_ip_group_data(startTime, endTime):
""" """
IP维度查询 IP维度查询

@ -9,20 +9,18 @@ import json
import traceback import traceback
import random,string import random,string
import traceback,json import traceback,json
import time from datetime import datetime,timedelta,time
from datetime import datetime,timedelta
from dataInterface.functions import CFunction from dataInterface.functions import CFunction
from dataInterface.db.params import CPgSqlParam from dataInterface.db.params import CPgSqlParam
from uebaMetricsAnalysis.utils.ext_logging import logger from uebaMetricsAnalysis.utils.ext_logging import logger,logger_cron
class DBType(object): class DBType(object):
LIST = 'list' LIST = 'list'
DICT = 'dict' DICT = 'dict'
JOB_TABLE_NAME = "ueba_jobs" JOB_TABLE_NAME = "ueba_analysis_schema.jobs"
ANALYSIS_TABLE_NAME = "ueba_analysis_log"
class DBUtils(object): class DBUtils(object):
@classmethod @classmethod
@ -77,7 +75,7 @@ class DBUtils(object):
""" """
try: try:
sql_list = CPgSqlParam(sql) sql_list = CPgSqlParam(sql)
logger.info("execute sql:"+sql) #logger.info("execute sql:"+sql)
data = CFunction.execute(sql_list) data = CFunction.execute(sql_list)
logger.info("execute result : {}".format(data)) logger.info("execute result : {}".format(data))
return json.loads(data) return json.loads(data)
@ -125,23 +123,27 @@ class DBUtils(object):
def get_job_period(self): def get_job_period(self):
sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME) sql = """select job_id, to_char(end_time,'YYYY-MM-DD HH24:MI:SS') as end_time from {JOB_TABLE_NAME} order by end_time desc limit 1""".format(JOB_TABLE_NAME=JOB_TABLE_NAME)
res = json.loads(CFunction.execute(CPgSqlParam(sql, params=()))) res = json.loads(CFunction.execute(CPgSqlParam(sql, params=())))
print json.dumps(res)
data = {} data = {}
if res: if res:
data["job_id"]=res[0][0] data["job_id"]=res[0][0]
data["end_time"]=res[0][1] data["end_time"]=res[0][1]
fields=["job_id", "end_time"]
#data = DBUtils.transition(fields, sql, DBType.LIST)
if len(data)==0: if len(data)==0:
start_time = datetime.now() - timedelta(minutes=5) start_time = datetime.now() - timedelta(minutes=5)
end_time = datetime.now() end_time = datetime.now()
if len(data)>0: if len(data)>0:
start_time = data["end_time"] start_time = data["end_time"]
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
# 检查时间是否为23:59:59
if start_time.hour == 23 and start_time.minute == 59 and start_time.second == 59:
# 是的话,增加一天并设置时间为00:00:00
start_time = start_time + timedelta(days=1)
start_time = start_time.replace(hour=0, minute=0, second=0)
end_time = data["end_time"] end_time = start_time + timedelta(minutes=5)
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + timedelta(minutes=5)
if end_time > datetime.now(): if end_time > datetime.now():
logger_cron.info("end_time:"+end_time.strftime("%Y-%m-%d %H:%M:%S")+",datetime.now:"+datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return None,None return None,None
start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time) start_time ,end_time =self.adjust_end_time_if_cross_day(start_time,end_time)
return start_time,end_time return start_time,end_time
@ -150,9 +152,10 @@ class DBUtils(object):
#处理跨天的场景 #处理跨天的场景
def adjust_end_time_if_cross_day(self,start_time, end_time): def adjust_end_time_if_cross_day(self,start_time, end_time):
if start_time.date() != end_time.date(): if start_time.date() != end_time.date():
end_time = datetime.combine(start_time.date(), datetime.time(23, 59, 59, 999999)) end_time = datetime.combine(start_time.date(), time(23, 59, 59))
return start_time, end_time return start_time, end_time
# if __name__ == '__main__': if __name__ == '__main__':
# DBUtils.get_job_period() start,end = DBUtils.get_job_period()
print ( "job:运行参数:{},{}".format(start,end))

@ -12,13 +12,13 @@ from appsUtils import env
APPFOLDERNAME = 'uebaMetricsAnalysis' APPFOLDERNAME = 'uebaMetricsAnalysis'
APP_CRON_FOLDERNAME = 'uebaMetricsAnalysis_cron'
def get_clean_files(): def get_clean_file_path():
fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files" fileroot = env.get_isop_root() + "/apps/" + APPFOLDERNAME + "/files"
if not os.path.exists(fileroot): if not os.path.exists(fileroot):
os.mkdir(fileroot) os.mkdir(fileroot)
return fileroot
def get_logger(logfile): def get_logger(logfile):
""" """
@ -43,3 +43,4 @@ def get_logger(logfile):
logger = get_logger(APPFOLDERNAME) logger = get_logger(APPFOLDERNAME)
logger_cron = get_logger(APP_CRON_FOLDERNAME)

@ -0,0 +1,129 @@
#!/usr/bin/python
#encoding=utf-8
# author: tangwy
import re,os,json
from db2json import DBUtils
from datetime import datetime, timedelta
from ext_logging import logger_cron,get_clean_file_path
date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}')
DATA_TYPE = {
"IP": 1,
"ACCOUNT": 2,
"INTERFACE": 3,
"MENU": 4,
}
def get_all_files(path):
# 列出所有包含匹配模式的文件名
files = []
for filename in os.listdir(path):
if date_pattern.search(filename):
files.append({"filename": filename, "path": os.path.join(path,filename)})
return files
def insert_data(files):
for item in files:
if os.path.exists(item.get("path",'')):
with open(os.path.join(item.get('path','')), 'r') as file:
data = json.load(file)
ip_list = data.get('ip', [])
account_list = data.get('account', [])
interface_list = data.get('interface', [])
menu_list = data.get('menu', [])
logger_cron.info("INSERT: IP维度 " +str(len(ip_list)))
logger_cron.info("INSERT: ACCOUNT维度 " +str(len(account_list)))
logger_cron.info("INSERT: INTERFACE维度 " +str(len(interface_list)))
logger_cron.info("INSERT: MENU维度 " +str(len(menu_list)))
basename, extension = os.path.splitext(item.get('filename', ''))
log_date = basename
print ("filename:"+log_date)
records = []
for item in ip_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("IP",1)
interface = item.get('interface', '')
records.append((menu, ip, account, jobnum, count, logdate,datatype,interface))
for item in account_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("ACCOUNT",2)
interface = item.get('interface', '')
records.append((menu, ip, account, jobnum, count, logdate,datatype,interface))
for item in interface_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("INTERFACE",3)
interface = item.get('interface', '')
records.append((menu, ip, account, jobnum, count, logdate,datatype,interface))
for item in menu_list:
menu = item.get('menu', '')
ip = item.get('ip', '0.0.0.0')
account = item.get('account', '')
jobnum = item.get('jobnum', '')
count = item.get('count', 0)
logdate = log_date
datatype = DATA_TYPE.get("MENU",4)
interface = item.get('interface', '')
records.append((menu, ip, account, jobnum, count, logdate,datatype,interface))
# 构建SQL插入语句
sql = "INSERT INTO ueba_analysis_schema.logs (menu, ip, account, jobnum, count, logdate,data_type,interface) VALUES "
values = ["('%s', '%s', '%s', '%s', %d, '%s','%s','%s')" % (menu, ip, account, jobnum, count, logdate,datatype,interface)
for menu, ip, account, jobnum, count, logdate,datatype,interface in records]
sql += ",".join(values)
logger_cron.info("INSERT: 准备数据插入")
DBUtils.execute(sql)
logger_cron.info("INSERT: 数据插入完成")
#重命名文件
os.rename(item.get('path',''),"done_"+item.get('path',''))
logger_cron.info("INSERT: 重命名文件完成,"+item.get('path',''))
def delete_files(directory_path):
"""
删除指定目录下所有形如 'done_YYYY-MM-DD' 的文件
其中日期部分早于10天以前
:param directory_path: 要检查的目录的绝对路径
"""
# 计算10天前的日期
ten_days_ago = datetime.now() - timedelta(days=10)
ten_days_ago_str = ten_days_ago.strftime('%Y-%m-%d')
# 正则表达式模式,匹配形如 YYYY-MM-DD 的文件名
date_pattern = re.compile(r'done_(\d{4}-\d{2}-\d{2})')
# 遍历目录中的文件
for filename in os.listdir(directory_path):
match = date_pattern.search(filename)
if match:
file_date_str = match.group(1)
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
# 检查文件日期是否在10天前
if file_date <= ten_days_ago:
file_path = os.path.join(directory_path, filename)
os.remove(file_path)
logger_cron.info("INSERT: 删除文件"+file_path)
def entry():
files = get_all_files(get_clean_file_path())
insert_data(files)
delete_files()

@ -20,8 +20,8 @@ class DashboardViewSets(viewsets.GenericViewSet):
def get_summary_data_list(self,request): def get_summary_data_list(self,request):
try: try:
data_type = request.GET.get('type') data_type = request.GET.get('type')
startTime = request.GET.get('startDate') startTime ="2024-06-13 00:00:00"# request.GET.get('startDate')
endTime = request.GET.get('endDate') endTime = "2024-08-13 00:00:00"#request.GET.get('endDate')
#1:ip,2:账号,3:接口,4:菜单 #1:ip,2:账号,3:接口,4:菜单
logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime) logger.info("获取分析结构数据:"+data_type+";" + startTime +";"+ endTime)
return Result.ok(entry(data_type,startTime,endTime)) return Result.ok(entry(data_type,startTime,endTime))

Loading…
Cancel
Save