| | |
| | | * @author xuxueli 2015-9-1 18:05:56 |
| | | */ |
| | | public class JobCompleteHelper { |
| | | private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); |
| | | |
| | | private static JobCompleteHelper instance = new JobCompleteHelper(); |
| | | public static JobCompleteHelper getInstance(){ |
| | | return instance; |
| | | } |
| | | private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class); |
| | | |
| | | // ---------------------- monitor ---------------------- |
| | | private static JobCompleteHelper instance = new JobCompleteHelper(); |
| | | |
| | | private ThreadPoolExecutor callbackThreadPool = null; |
| | | private Thread monitorThread; |
| | | private volatile boolean toStop = false; |
| | | public void start(){ |
| | | public static JobCompleteHelper getInstance() { |
| | | return instance; |
| | | } |
| | | |
| | | // for callback |
| | | callbackThreadPool = new ThreadPoolExecutor( |
| | | 2, |
| | | 20, |
| | | 30L, |
| | | TimeUnit.SECONDS, |
| | | new LinkedBlockingQueue<Runnable>(3000), |
| | | new ThreadFactory() { |
| | | @Override |
| | | public Thread newThread(Runnable r) { |
| | | return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); |
| | | } |
| | | }, |
| | | new RejectedExecutionHandler() { |
| | | @Override |
| | | public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { |
| | | r.run(); |
| | | logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); |
| | | } |
| | | }); |
| | | // ---------------------- monitor ---------------------- |
| | | |
| | | private ThreadPoolExecutor callbackThreadPool = null; |
| | | private Thread monitorThread; |
| | | private volatile boolean toStop = false; |
| | | |
| | | public void start() { |
| | | |
| | | // for callback |
| | | callbackThreadPool = new ThreadPoolExecutor( |
| | | 2, |
| | | 20, |
| | | 30L, |
| | | TimeUnit.SECONDS, |
| | | new LinkedBlockingQueue<Runnable>(3000), |
| | | new ThreadFactory() { |
| | | @Override |
| | | public Thread newThread(Runnable r) { |
| | | return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); |
| | | } |
| | | }, |
| | | new RejectedExecutionHandler() { |
| | | @Override |
| | | public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { |
| | | r.run(); |
| | | logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); |
| | | } |
| | | }); |
| | | |
| | | |
| | | // for monitor |
| | | monitorThread = new Thread(new Runnable() { |
| | | // for monitor |
| | | monitorThread = new Thread(new Runnable() { |
| | | |
| | | @Override |
| | | public void run() { |
| | | @Override |
| | | public void run() { |
| | | |
| | | // wait for JobTriggerPoolHelper-init |
| | | try { |
| | | TimeUnit.MILLISECONDS.sleep(50); |
| | | } catch (InterruptedException e) { |
| | | if (!toStop) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | // wait for JobTriggerPoolHelper-init |
| | | try { |
| | | TimeUnit.MILLISECONDS.sleep(50); |
| | | } catch (InterruptedException e) { |
| | | if (!toStop) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | // monitor |
| | | while (!toStop) { |
| | | try { |
| | | // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; |
| | | Date losedTime = DateUtil.addMinutes(new Date(), -10); |
| | | List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); |
| | | // monitor |
| | | while (!toStop) { |
| | | try { |
| | | // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; |
| | | Date losedTime = DateUtil.addMinutes(new Date(), -10); |
| | | List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); |
| | | |
| | | if (losedJobIds!=null && losedJobIds.size()>0) { |
| | | for (Long logId: losedJobIds) { |
| | | if (losedJobIds != null && losedJobIds.size() > 0) { |
| | | for (Long logId : losedJobIds) { |
| | | |
| | | XxlJobLog jobLog = new XxlJobLog(); |
| | | jobLog.setId(logId); |
| | | XxlJobLog jobLog = new XxlJobLog(); |
| | | jobLog.setId(logId); |
| | | |
| | | jobLog.setHandleTime(new Date()); |
| | | jobLog.setHandleCode(ReturnT.FAIL_CODE); |
| | | jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); |
| | | jobLog.setHandleTime(new Date()); |
| | | jobLog.setHandleCode(ReturnT.FAIL_CODE); |
| | | jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail")); |
| | | |
| | | XxlJobCompleter.updateHandleInfoAndFinish(jobLog); |
| | | } |
| | | XxlJobCompleter.updateHandleInfoAndFinish(jobLog); |
| | | } |
| | | |
| | | } |
| | | } catch (Exception e) { |
| | | if (!toStop) { |
| | | logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | if (!toStop) { |
| | | logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}" , e); |
| | | } |
| | | } |
| | | |
| | | try { |
| | | TimeUnit.SECONDS.sleep(60); |
| | |
| | | |
| | | } |
| | | |
| | | logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); |
| | | logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); |
| | | |
| | | } |
| | | }); |
| | | monitorThread.setDaemon(true); |
| | | monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); |
| | | monitorThread.start(); |
| | | } |
| | | } |
| | | }); |
| | | monitorThread.setDaemon(true); |
| | | monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); |
| | | monitorThread.start(); |
| | | } |
| | | |
| | | public void toStop(){ |
| | | toStop = true; |
| | | public void toStop() { |
| | | toStop = true; |
| | | |
| | | // stop registryOrRemoveThreadPool |
| | | callbackThreadPool.shutdownNow(); |
| | | // stop registryOrRemoveThreadPool |
| | | callbackThreadPool.shutdownNow(); |
| | | |
| | | // stop monitorThread (interrupt and wait) |
| | | monitorThread.interrupt(); |
| | | try { |
| | | monitorThread.join(); |
| | | } catch (InterruptedException e) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | // stop monitorThread (interrupt and wait) |
| | | monitorThread.interrupt(); |
| | | try { |
| | | monitorThread.join(); |
| | | } catch (InterruptedException e) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | |
| | | // ---------------------- helper ---------------------- |
| | | // ---------------------- helper ---------------------- |
| | | |
| | | public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { |
| | | public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { |
| | | |
| | | callbackThreadPool.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | for (HandleCallbackParam handleCallbackParam: callbackParamList) { |
| | | ReturnT<String> callbackResult = callback(handleCallbackParam); |
| | | logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", |
| | | (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); |
| | | } |
| | | } |
| | | }); |
| | | callbackThreadPool.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | for (HandleCallbackParam handleCallbackParam : callbackParamList) { |
| | | ReturnT<String> callbackResult = callback(handleCallbackParam); |
| | | logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}" , |
| | | (callbackResult.getCode() == ReturnT.SUCCESS_CODE ? "success" : "fail"), handleCallbackParam, callbackResult); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { |
| | | // valid log item |
| | | XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); |
| | | if (log == null) { |
| | | return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); |
| | | } |
| | | if (log.getHandleCode() > 0) { |
| | | return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc |
| | | } |
| | | private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { |
| | | // valid log item |
| | | XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); |
| | | if (log == null) { |
| | | return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); |
| | | } |
| | | if (log.getHandleCode() > 0) { |
| | | return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc |
| | | } |
| | | |
| | | // handle msg |
| | | StringBuffer handleMsg = new StringBuffer(); |
| | | if (log.getHandleMsg()!=null) { |
| | | handleMsg.append(log.getHandleMsg()).append("<br>"); |
| | | } |
| | | if (handleCallbackParam.getHandleMsg() != null) { |
| | | handleMsg.append(handleCallbackParam.getHandleMsg()); |
| | | } |
| | | // handle msg |
| | | StringBuffer handleMsg = new StringBuffer(); |
| | | if (log.getHandleMsg() != null) { |
| | | handleMsg.append(log.getHandleMsg()).append("<br>"); |
| | | } |
| | | if (handleCallbackParam.getHandleMsg() != null) { |
| | | handleMsg.append(handleCallbackParam.getHandleMsg()); |
| | | } |
| | | |
| | | // success, save log |
| | | log.setHandleTime(new Date()); |
| | | log.setHandleCode(handleCallbackParam.getHandleCode()); |
| | | log.setHandleMsg(handleMsg.toString()); |
| | | XxlJobCompleter.updateHandleInfoAndFinish(log); |
| | | // success, save log |
| | | log.setHandleTime(new Date()); |
| | | log.setHandleCode(handleCallbackParam.getHandleCode()); |
| | | log.setHandleMsg(handleMsg.toString()); |
| | | XxlJobCompleter.updateHandleInfoAndFinish(log); |
| | | |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | |
| | | } |