| | |
| | | private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class); |
| | | |
| | | private static JobScheduleHelper instance = new JobScheduleHelper(); |
| | | public static JobScheduleHelper getInstance(){ |
| | | |
| | | public static JobScheduleHelper getInstance() { |
| | | return instance; |
| | | } |
| | | |
| | |
| | | private volatile boolean ringThreadToStop = false; |
| | | private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); |
| | | |
| | | public void start(){ |
| | | public void start() { |
| | | |
| | | // schedule thread |
| | | scheduleThread = new Thread(new Runnable() { |
| | |
| | | public void run() { |
| | | |
| | | try { |
| | | TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); |
| | | TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000); |
| | | } catch (InterruptedException e) { |
| | | if (!scheduleThreadToStop) { |
| | | logger.error(e.getMessage(), e); |
| | |
| | | connAutoCommit = conn.getAutoCommit(); |
| | | conn.setAutoCommit(false); |
| | | |
| | | preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); |
| | | preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update"); |
| | | preparedStatement.execute(); |
| | | |
| | | // tx start |
| | |
| | | // 1、pre read |
| | | long nowTime = System.currentTimeMillis(); |
| | | List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); |
| | | if (scheduleList!=null && scheduleList.size()>0) { |
| | | if (scheduleList != null && scheduleList.size() > 0) { |
| | | // 2、push time-ring |
| | | for (XxlJobInfo jobInfo: scheduleList) { |
| | | for (XxlJobInfo jobInfo : scheduleList) { |
| | | |
| | | // time-ring jump |
| | | if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { |
| | |
| | | if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { |
| | | // FIRE_ONCE_NOW 》 trigger |
| | | JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId()); |
| | | } |
| | | |
| | | // 2、fresh next |
| | |
| | | |
| | | // 1、trigger |
| | | JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId()); |
| | | |
| | | // 2、fresh next |
| | | refreshNextValidTime(jobInfo, new Date()); |
| | | |
| | | // next-trigger-time in 5s, pre-read again |
| | | if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { |
| | | if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { |
| | | |
| | | // 1、make ring second |
| | | int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); |
| | | int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60); |
| | | |
| | | // 2、push time ring |
| | | pushTimeRing(ringSecond, jobInfo.getId()); |
| | |
| | | // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time |
| | | |
| | | // 1、make ring second |
| | | int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); |
| | | int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60); |
| | | |
| | | // 2、push time ring |
| | | pushTimeRing(ringSecond, jobInfo.getId()); |
| | |
| | | } |
| | | |
| | | // 3、update trigger info |
| | | for (XxlJobInfo jobInfo: scheduleList) { |
| | | for (XxlJobInfo jobInfo : scheduleList) { |
| | | XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | long cost = System.currentTimeMillis()-start; |
| | | long cost = System.currentTimeMillis() - start; |
| | | |
| | | |
| | | // Wait seconds, align second |
| | | if (cost < 1000) { // scan-overtime, not wait |
| | | try { |
| | | // pre-read period: success > scan each second; fail > skip this period; |
| | | TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); |
| | | TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000); |
| | | } catch (InterruptedException e) { |
| | | if (!scheduleThreadToStop) { |
| | | logger.error(e.getMessage(), e); |
| | |
| | | List<Integer> ringItemData = new ArrayList<>(); |
| | | int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; |
| | | for (int i = 0; i < 2; i++) { |
| | | List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); |
| | | List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60); |
| | | if (tmpData != null) { |
| | | ringItemData.addAll(tmpData); |
| | | } |
| | | } |
| | | |
| | | // ring trigger |
| | | logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); |
| | | logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData)); |
| | | if (ringItemData.size() > 0) { |
| | | // do trigger |
| | | for (int jobId: ringItemData) { |
| | | for (int jobId : ringItemData) { |
| | | // do trigger |
| | | JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); |
| | | } |
| | |
| | | jobInfo.setTriggerLastTime(0); |
| | | jobInfo.setTriggerNextTime(0); |
| | | logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}", |
| | | jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf()); |
| | | jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf()); |
| | | } |
| | | } |
| | | |
| | | private void pushTimeRing(int ringSecond, int jobId){ |
| | | private void pushTimeRing(int ringSecond, int jobId) { |
| | | // push async ring |
| | | List<Integer> ringItemData = ringData.get(ringSecond); |
| | | if (ringItemData == null) { |
| | |
| | | } |
| | | ringItemData.add(jobId); |
| | | |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); |
| | | logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData)); |
| | | } |
| | | |
| | | public void toStop(){ |
| | | public void toStop() { |
| | | |
| | | // 1、stop schedule |
| | | scheduleThreadToStop = true; |
| | |
| | | } catch (InterruptedException e) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | if (scheduleThread.getState() != Thread.State.TERMINATED){ |
| | | if (scheduleThread.getState() != Thread.State.TERMINATED) { |
| | | // interrupt and wait |
| | | scheduleThread.interrupt(); |
| | | try { |
| | |
| | | if (!ringData.isEmpty()) { |
| | | for (int second : ringData.keySet()) { |
| | | List<Integer> tmpData = ringData.get(second); |
| | | if (tmpData!=null && tmpData.size()>0) { |
| | | if (tmpData != null && tmpData.size() > 0) { |
| | | hasRingData = true; |
| | | break; |
| | | } |
| | |
| | | } catch (InterruptedException e) { |
| | | logger.error(e.getMessage(), e); |
| | | } |
| | | if (ringThread.getState() != Thread.State.TERMINATED){ |
| | | if (ringThread.getState() != Thread.State.TERMINATED) { |
| | | // interrupt and wait |
| | | ringThread.interrupt(); |
| | | try { |
| | |
| | | Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime); |
| | | return nextValidTime; |
| | | } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) { |
| | | return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 ); |
| | | return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf()) * 1000); |
| | | } |
| | | return null; |
| | | } |