疯狂的狮子li
2022-01-13 a0bed51d966ab5d161d3fdd5423ba84f59fb60ff
ruoyi-extend/ruoyi-xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
@@ -15,190 +15,192 @@
/**
 * job registry instance
 *
 * @author xuxueli 2016-10-02 19:10:24
 */
public class JobRegistryHelper {
   private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
    private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
   private static JobRegistryHelper instance = new JobRegistryHelper();
   public static JobRegistryHelper getInstance(){
      return instance;
   }
    private static JobRegistryHelper instance = new JobRegistryHelper();
   private ThreadPoolExecutor registryOrRemoveThreadPool = null;
   private Thread registryMonitorThread;
   private volatile boolean toStop = false;
    public static JobRegistryHelper getInstance() {
        return instance;
    }
   public void start(){
    private ThreadPoolExecutor registryOrRemoveThreadPool = null;
    private Thread registryMonitorThread;
    private volatile boolean toStop = false;
      // for registry or remove
      registryOrRemoveThreadPool = new ThreadPoolExecutor(
            2,
            10,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
               @Override
               public Thread newThread(Runnable r) {
                  return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
               }
            },
            new RejectedExecutionHandler() {
               @Override
               public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                  r.run();
                  logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
               }
            });
    public void start() {
      // for monitor
      registryMonitorThread = new Thread(new Runnable() {
         @Override
         public void run() {
            while (!toStop) {
               try {
                  // auto registry group
                  List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                  if (groupList!=null && !groupList.isEmpty()) {
        // for registry or remove
        registryOrRemoveThreadPool = new ThreadPoolExecutor(
            2,
            10,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                    logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
                }
            });
                     // remove dead address (admin/executor)
                     List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                     if (ids!=null && ids.size()>0) {
                        XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                     }
        // for monitor
        registryMonitorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // auto registry group
                        List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                        if (groupList != null && !groupList.isEmpty()) {
                     // fresh online address (admin/executor)
                     HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                     List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                     if (list != null) {
                        for (XxlJobRegistry item: list) {
                           if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                              String appname = item.getRegistryKey();
                              List<String> registryList = appAddressMap.get(appname);
                              if (registryList == null) {
                                 registryList = new ArrayList<String>();
                              }
                            // remove dead address (admin/executor)
                            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (ids != null && ids.size() > 0) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                            }
                              if (!registryList.contains(item.getRegistryValue())) {
                                 registryList.add(item.getRegistryValue());
                              }
                              appAddressMap.put(appname, registryList);
                           }
                        }
                     }
                            // fresh online address (admin/executor)
                            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (list != null) {
                                for (XxlJobRegistry item : list) {
                                    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                        String appname = item.getRegistryKey();
                                        List<String> registryList = appAddressMap.get(appname);
                                        if (registryList == null) {
                                            registryList = new ArrayList<String>();
                                        }
                     // fresh group address
                     for (XxlJobGroup group: groupList) {
                        List<String> registryList = appAddressMap.get(group.getAppname());
                        String addressListStr = null;
                        if (registryList!=null && !registryList.isEmpty()) {
                           Collections.sort(registryList);
                           StringBuilder addressListSB = new StringBuilder();
                           for (String item:registryList) {
                              addressListSB.append(item).append(",");
                           }
                           addressListStr = addressListSB.toString();
                           addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                        }
                        group.setAddressList(addressListStr);
                        group.setUpdateTime(new Date());
                                        if (!registryList.contains(item.getRegistryValue())) {
                                            registryList.add(item.getRegistryValue());
                                        }
                                        appAddressMap.put(appname, registryList);
                                    }
                                }
                            }
                        XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                     }
                  }
               } catch (Exception e) {
                  if (!toStop) {
                     logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                  }
               }
               try {
                  TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
               } catch (InterruptedException e) {
                  if (!toStop) {
                     logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                  }
               }
            }
            logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
         }
      });
      registryMonitorThread.setDaemon(true);
      registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
      registryMonitorThread.start();
   }
                            // fresh group address
                            for (XxlJobGroup group : groupList) {
                                List<String> registryList = appAddressMap.get(group.getAppname());
                                String addressListStr = null;
                                if (registryList != null && !registryList.isEmpty()) {
                                    Collections.sort(registryList);
                                    StringBuilder addressListSB = new StringBuilder();
                                    for (String item : registryList) {
                                        addressListSB.append(item).append(",");
                                    }
                                    addressListStr = addressListSB.toString();
                                    addressListStr = addressListStr.substring(0, addressListStr.length() - 1);
                                }
                                group.setAddressList(addressListStr);
                                group.setUpdateTime(new Date());
   public void toStop(){
      toStop = true;
                                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}" , e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}" , e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });
        registryMonitorThread.setDaemon(true);
        registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
        registryMonitorThread.start();
    }
      // stop registryOrRemoveThreadPool
      registryOrRemoveThreadPool.shutdownNow();
    public void toStop() {
        toStop = true;
      // stop monitir (interrupt and wait)
      registryMonitorThread.interrupt();
      try {
         registryMonitorThread.join();
      } catch (InterruptedException e) {
         logger.error(e.getMessage(), e);
      }
   }
        // stop registryOrRemoveThreadPool
        registryOrRemoveThreadPool.shutdownNow();
        // stop monitir (interrupt and wait)
        registryMonitorThread.interrupt();
        try {
            registryMonitorThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }
   // ---------------------- helper ----------------------
    // ---------------------- helper ----------------------
   public ReturnT<String> registry(RegistryParam registryParam) {
    public ReturnT<String> registry(RegistryParam registryParam) {
      // valid
      if (!StringUtils.hasText(registryParam.getRegistryGroup())
            || !StringUtils.hasText(registryParam.getRegistryKey())
            || !StringUtils.hasText(registryParam.getRegistryValue())) {
         return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
      }
        // valid
        if (!StringUtils.hasText(registryParam.getRegistryGroup())
            || !StringUtils.hasText(registryParam.getRegistryKey())
            || !StringUtils.hasText(registryParam.getRegistryValue())) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
        }
      // async execute
      registryOrRemoveThreadPool.execute(new Runnable() {
         @Override
         public void run() {
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) {
               XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
        // async execute
        registryOrRemoveThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
                if (ret < 1) {
                    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
               // fresh
               freshGroupRegistryInfo(registryParam);
            }
         }
      });
                    // fresh
                    freshGroupRegistryInfo(registryParam);
                }
            }
        });
      return ReturnT.SUCCESS;
   }
        return ReturnT.SUCCESS;
    }
   public ReturnT<String> registryRemove(RegistryParam registryParam) {
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
      // valid
      if (!StringUtils.hasText(registryParam.getRegistryGroup())
            || !StringUtils.hasText(registryParam.getRegistryKey())
            || !StringUtils.hasText(registryParam.getRegistryValue())) {
         return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
      }
        // valid
        if (!StringUtils.hasText(registryParam.getRegistryGroup())
            || !StringUtils.hasText(registryParam.getRegistryKey())
            || !StringUtils.hasText(registryParam.getRegistryValue())) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
        }
      // async execute
      registryOrRemoveThreadPool.execute(new Runnable() {
         @Override
         public void run() {
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
            if (ret > 0) {
               // fresh
               freshGroupRegistryInfo(registryParam);
            }
         }
      });
        // async execute
        registryOrRemoveThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
                if (ret > 0) {
                    // fresh
                    freshGroupRegistryInfo(registryParam);
                }
            }
        });
      return ReturnT.SUCCESS;
   }
        return ReturnT.SUCCESS;
    }
   private void freshGroupRegistryInfo(RegistryParam registryParam){
      // Under consideration, prevent affecting core tables
   }
    private void freshGroupRegistryInfo(RegistryParam registryParam) {
        // Under consideration, prevent affecting core tables
    }
}