疯狂的狮子li
2022-01-13 a0bed51d966ab5d161d3fdd5423ba84f59fb60ff
ruoyi-extend/ruoyi-xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java
@@ -20,83 +20,85 @@
 * @author xuxueli 2015-9-1 18:05:56
 */
public class JobCompleteHelper {
   private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class);
   private static JobCompleteHelper instance = new JobCompleteHelper();
   public static JobCompleteHelper getInstance(){
      return instance;
   }
    private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class);
   // ---------------------- monitor ----------------------
    private static JobCompleteHelper instance = new JobCompleteHelper();
   private ThreadPoolExecutor callbackThreadPool = null;
   private Thread monitorThread;
   private volatile boolean toStop = false;
   public void start(){
    public static JobCompleteHelper getInstance() {
        return instance;
    }
      // for callback
      callbackThreadPool = new ThreadPoolExecutor(
            2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(3000),
            new ThreadFactory() {
               @Override
               public Thread newThread(Runnable r) {
                  return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
               }
            },
            new RejectedExecutionHandler() {
               @Override
               public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                  r.run();
                  logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
               }
            });
    // ---------------------- monitor ----------------------
    private ThreadPoolExecutor callbackThreadPool = null;
    private Thread monitorThread;
    private volatile boolean toStop = false;
    public void start() {
        // for callback
        callbackThreadPool = new ThreadPoolExecutor(
            2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(3000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                    logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
                }
            });
      // for monitor
      monitorThread = new Thread(new Runnable() {
        // for monitor
        monitorThread = new Thread(new Runnable() {
         @Override
         public void run() {
            @Override
            public void run() {
            // wait for JobTriggerPoolHelper-init
            try {
               TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
               if (!toStop) {
                  logger.error(e.getMessage(), e);
               }
            }
                // wait for JobTriggerPoolHelper-init
                try {
                    TimeUnit.MILLISECONDS.sleep(50);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            // monitor
            while (!toStop) {
               try {
                  // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                  Date losedTime = DateUtil.addMinutes(new Date(), -10);
                  List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
                // monitor
                while (!toStop) {
                    try {
                        // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                        Date losedTime = DateUtil.addMinutes(new Date(), -10);
                        List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
                  if (losedJobIds!=null && losedJobIds.size()>0) {
                     for (Long logId: losedJobIds) {
                        if (losedJobIds != null && losedJobIds.size() > 0) {
                            for (Long logId : losedJobIds) {
                        XxlJobLog jobLog = new XxlJobLog();
                        jobLog.setId(logId);
                                XxlJobLog jobLog = new XxlJobLog();
                                jobLog.setId(logId);
                        jobLog.setHandleTime(new Date());
                        jobLog.setHandleCode(ReturnT.FAIL_CODE);
                        jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
                                jobLog.setHandleTime(new Date());
                                jobLog.setHandleCode(ReturnT.FAIL_CODE);
                                jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail"));
                        XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                     }
                                XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                            }
                  }
               } catch (Exception e) {
                  if (!toStop) {
                     logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                  }
               }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}" , e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(60);
@@ -108,77 +110,76 @@
                }
            logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
                logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
         }
      });
      monitorThread.setDaemon(true);
      monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
      monitorThread.start();
   }
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
        monitorThread.start();
    }
   public void toStop(){
      toStop = true;
    public void toStop() {
        toStop = true;
      // stop registryOrRemoveThreadPool
      callbackThreadPool.shutdownNow();
        // stop registryOrRemoveThreadPool
        callbackThreadPool.shutdownNow();
      // stop monitorThread (interrupt and wait)
      monitorThread.interrupt();
      try {
         monitorThread.join();
      } catch (InterruptedException e) {
         logger.error(e.getMessage(), e);
      }
   }
        // stop monitorThread (interrupt and wait)
        monitorThread.interrupt();
        try {
            monitorThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }
   // ---------------------- helper ----------------------
    // ---------------------- helper ----------------------
   public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
      callbackThreadPool.execute(new Runnable() {
         @Override
         public void run() {
            for (HandleCallbackParam handleCallbackParam: callbackParamList) {
               ReturnT<String> callbackResult = callback(handleCallbackParam);
               logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
                     (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
            }
         }
      });
        callbackThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (HandleCallbackParam handleCallbackParam : callbackParamList) {
                    ReturnT<String> callbackResult = callback(handleCallbackParam);
                    logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}" ,
                        (callbackResult.getCode() == ReturnT.SUCCESS_CODE ? "success" : "fail"), handleCallbackParam, callbackResult);
                }
            }
        });
      return ReturnT.SUCCESS;
   }
        return ReturnT.SUCCESS;
    }
   private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
      // valid log item
      XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
      if (log == null) {
         return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
      }
      if (log.getHandleCode() > 0) {
         return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
      }
    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
        // valid log item
        XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
        if (log == null) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
        }
        if (log.getHandleCode() > 0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
        }
      // handle msg
      StringBuffer handleMsg = new StringBuffer();
      if (log.getHandleMsg()!=null) {
         handleMsg.append(log.getHandleMsg()).append("<br>");
      }
      if (handleCallbackParam.getHandleMsg() != null) {
         handleMsg.append(handleCallbackParam.getHandleMsg());
      }
        // handle msg
        StringBuffer handleMsg = new StringBuffer();
        if (log.getHandleMsg() != null) {
            handleMsg.append(log.getHandleMsg()).append("<br>");
        }
        if (handleCallbackParam.getHandleMsg() != null) {
            handleMsg.append(handleCallbackParam.getHandleMsg());
        }
      // success, save log
      log.setHandleTime(new Date());
      log.setHandleCode(handleCallbackParam.getHandleCode());
      log.setHandleMsg(handleMsg.toString());
      XxlJobCompleter.updateHandleInfoAndFinish(log);
        // success, save log
        log.setHandleTime(new Date());
        log.setHandleCode(handleCallbackParam.getHandleCode());
        log.setHandleMsg(handleMsg.toString());
        XxlJobCompleter.updateHandleInfoAndFinish(log);
      return ReturnT.SUCCESS;
   }
        return ReturnT.SUCCESS;
    }
}