|  |  |  | # coding=utf-8
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  |  @Author: tangwy
 | 
					
						
							|  |  |  |  @FileName: ueba_cron_pg.py
 | 
					
						
							|  |  |  |  @DateTime: 2024/7/09 14:19
 | 
					
						
							|  |  |  |  @Description: 定时清洗es数据
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | from __future__ import unicode_literals
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import random,string
 | 
					
						
							|  |  |  | import traceback,json
 | 
					
						
							|  |  |  | import time,threading
 | 
					
						
							|  |  |  | from uebaMetricsAnalysis.utils.ext_logging import logger_cron
 | 
					
						
							|  |  |  | from uebaMetricsAnalysis.utils.db2json import DBUtils, DBType
 | 
					
						
							|  |  |  | from uebaMetricsAnalysis.utils.base_dataclean_pg import entry
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | JOB_STATUS ={
 | 
					
						
							|  |  |  |     "RUNNING":1,
 | 
					
						
							|  |  |  |     "FINISH":2,
 | 
					
						
							|  |  |  |     "ERROR":3
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DataCleanCron:
 | 
					
						
							|  |  |  |     #生成job_id
 | 
					
						
							|  |  |  |     def generate_job_id(self):
 | 
					
						
							|  |  |  |         timestamp = int(time.time() * 1000)
 | 
					
						
							|  |  |  |         random_letters = ''.join(random.choice(string.ascii_letters) for _ in range(7))
 | 
					
						
							|  |  |  |         return str(timestamp) + random_letters
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     #每5分钟执行一次
 | 
					
						
							|  |  |  |     def processing(self):
 | 
					
						
							|  |  |  |         logger_cron.info("JOB:接收到执行指令")
 | 
					
						
							|  |  |  |         job_id =self.generate_job_id()
 | 
					
						
							|  |  |  |         task_run_count =0
 | 
					
						
							|  |  |  |         try:
 | 
					
						
							|  |  |  |             start,end,status,run_count,jobid= DBUtils.get_job_period()
 | 
					
						
							|  |  |  |             if jobid !="":
 | 
					
						
							|  |  |  |                 job_id=jobid
 | 
					
						
							|  |  |  |             if end<start:
 | 
					
						
							|  |  |  |                 logger_cron.info("JOB:"+job_id+"开始时间大于结束时间不执行")
 | 
					
						
							|  |  |  |                 return
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"开始执行")
 | 
					
						
							|  |  |  |             if status ==1:
 | 
					
						
							|  |  |  |                 logger_cron.info("JOB:"+job_id+"正在运行中不执行")
 | 
					
						
							|  |  |  |                 return
 | 
					
						
							|  |  |  |             
 | 
					
						
							|  |  |  |             #延迟15分钟读取es数据
 | 
					
						
							|  |  |  |             if start is None or end is None:
 | 
					
						
							|  |  |  |                 logger_cron.info("JOB:"+job_id+"结束时间大于(服务器时间-15分钟)不执行")
 | 
					
						
							|  |  |  |                 return
 | 
					
						
							|  |  |  |             
 | 
					
						
							|  |  |  |             task_run_count = run_count+1
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"运行参数:{},{}".format(start,end))
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"准备将job写入job表")
 | 
					
						
							|  |  |  |             DBUtils.insert_job_record(job_id,start,end,JOB_STATUS.get("RUNNING"))
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"完成job表写入")
 | 
					
						
							|  |  |  |             
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"准备获取es数据")
 | 
					
						
							|  |  |  |             entry(start,end,job_id)
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"完成es数据获取")
 | 
					
						
							|  |  |  |             DBUtils.write_job_status(job_id,JOB_STATUS.get("FINISH"),"",task_run_count)
 | 
					
						
							|  |  |  |             logger_cron.info("JOB:"+job_id+"更新job表状态完成")
 | 
					
						
							|  |  |  |             
 | 
					
						
							|  |  |  |         except Exception ,e:
 | 
					
						
							|  |  |  |             err_info=traceback.format_exc()
 | 
					
						
							|  |  |  |             logger_cron.error("JOB:"+job_id+"执行失败:"+err_info)
 | 
					
						
							|  |  |  |             DBUtils.write_job_status(job_id,JOB_STATUS.get("ERROR"),err_info,task_run_count)
 | 
					
						
							|  |  |  |             raise
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__':
 | 
					
						
							|  |  |  |     DataCleanCron().processing()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     
 |