package com.shlb.timescaledbutils.task; import com.alibaba.fastjson.JSONObject; import com.shlb.timescaledbutils.constant.AppConstants; import com.shlb.timescaledbutils.entity.*; import com.shlb.timescaledbutils.service.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.SessionCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Redis 数据同步任务 * 定期从 Redis 获取设备实时数据并分流写入不同 PostgreSQL 表 */ @Component @Slf4j public class RedisToPostgresSyncTask implements ApplicationRunner { @Autowired private StringRedisTemplate stringRedisTemplate; // 各设备对应的 Service @Autowired private IRollerTimeDataService rollerService; @Autowired private IPackerTimeDataService packerService; @Autowired private IBoxTimeDataService boxService; @Autowired private IMakeupTimeDataService makeupService; @Autowired private ITransTimeDataService transService; @Autowired private IHoisterTimeDataService hoisterService; @Autowired private IFeedmatchTimeDataService feedmatchService; @Value("${spring.redis.sync-interval-ms:5000}") private long intervalMs; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private volatile boolean running = true; @Override public void run(ApplicationArguments args) { // 强制限制最小间隔为 1000ms if (intervalMs < 1000) { log.warn("配置的同步间隔 {}ms 小于最小限制 1000ms,已自动修正为 1000ms", intervalMs); intervalMs = 1000; } executorService.submit(this::processSync); log.info("Redis -> PostgreSQL 多表分流数据采集任务已启动,采集间隔: {}ms", intervalMs); } private void processSync() { while (running) { try { long startTime = System.currentTimeMillis(); // 1. 获取班次信息 // Redis Key: "shift", Field: "ID" Object shiftIdObj = stringRedisTemplate.opsForHash().get("shift", "ID"); String shiftId = shiftIdObj != null ? shiftIdObj.toString() : null; if (shiftId == null || shiftId.isEmpty()) { // 如果获取不到班次,可能是Redis暂时没数据,记录error日志并等待下一次 log.error("Redis中未找到班次信息 (key='shift', field='id'),跳过本次采集"); sleepRemaining(startTime); continue; } // 2. 构造所有设备的 Redis Key (班次 + 设备3位码) // 例如:班次"2" + 设备"101" = "2101" List equipmentKeys = new ArrayList<>(AppConstants.EQUIPMENT_LIST.size()); for (String code : AppConstants.EQUIPMENT_LIST) { equipmentKeys.add(shiftId + code); } // 3. 批量获取设备数据 (Pipeline) // 使用 Pipeline 减少网络 RTT List results = stringRedisTemplate.executePipelined(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { for (String key : equipmentKeys) { operations.opsForHash().entries(key); } return null; } }); // 将 results 转为 Map,方便后续通过 Key 获取其他设备数据 Map> equipmentDataMap = new HashMap<>(); if (results != null) { for (int i = 0; i < results.size(); i++) { Object res = results.get(i); if (res instanceof Map && i < equipmentKeys.size()) { equipmentDataMap.put(equipmentKeys.get(i), (Map) res); } } } // 4. 数据分类与映射 Date now = new Date(); List rollerList = new ArrayList<>(); List packerList = new ArrayList<>(); List boxList = new ArrayList<>(); List makeupList = new ArrayList<>(); List transList = new ArrayList<>(); List hoisterList = new ArrayList<>(); List feedmatchList = new ArrayList<>(); for (int i = 0; i < results.size(); i++) { Object result = results.get(i); // executePipelined 返回的结果中,hash entries 返回的是 Map if (!(result instanceof Map)) { continue; } Map map = (Map) result; if (map.isEmpty()) { continue; } String fullKey = equipmentKeys.get(i); int type = AppConstants.getEquipmentType(fullKey); try { // 解析班次和设备号 Integer shift = null; Integer equNo = null; if (fullKey != null && fullKey.length() >= 4) { try { shift = Integer.parseInt(fullKey.substring(0, 1)); equNo = Integer.parseInt(fullKey.substring(1)); } catch (NumberFormatException e) { log.warn("解析班次/设备号失败 Key: {}", fullKey); } } // 将 Map 转为 JSONObject 以便利用 fastjson 的类型转换功能 JSONObject json = new JSONObject(); for (Map.Entry entry : map.entrySet()) { json.put(entry.getKey().toString(), entry.getValue()); } // 根据设备类型转换并添加到对应的列表 switch (type) { case AppConstants.TYPE_ROLLING: RollerTimeData roller = json.toJavaObject(RollerTimeData.class); roller.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); rollerList.add(roller); break; case AppConstants.TYPE_PACKAGING: PackerTimeData packer = json.toJavaObject(PackerTimeData.class); packer.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); try { if(json.containsKey("chcQty")){ packer.setQty(Double.parseDouble(json.get("chcQty").toString())); } if(json.containsKey("ptchQty")){ packer.setMainQty(Double.parseDouble(json.get("ptchQty").toString())); } // 包装机需要获取提升机的产量,提升机的key为 早1601、中2601、晚3601 // 逻辑:1-3号包装机对应tQty1-3;4号无;5-13号对应tQty4-12 (即号数-1) if (equNo != null && equNo > 0) { int machineNo = equNo % 100; String targetField = "tQty" + machineNo;; // if (machineNo < 4) { // targetField = "tQty" + machineNo; // } else if (machineNo > 4) { // targetField = "tQty" + (machineNo - 1); // } Map tsjMap = equipmentDataMap.get(shiftId + "601"); if (tsjMap != null) { Object qtyObj = tsjMap.get(targetField); if (qtyObj != null) { packer.setTsQty(Double.parseDouble(qtyObj.toString())); } } } }catch (Exception e){} packerList.add(packer); break; case AppConstants.TYPE_SEALING: BoxTimeData box = json.toJavaObject(BoxTimeData.class); box.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); boxList.add(box); break; case AppConstants.TYPE_FORMING: MakeupTimeData makeup = json.toJavaObject(MakeupTimeData.class); makeup.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); makeupList.add(makeup); break; case AppConstants.TYPE_LAUNCHING: TransTimeData trans = json.toJavaObject(TransTimeData.class); trans.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); transList.add(trans); break; case AppConstants.TYPE_LIFTING: HoisterTimeData hoister = json.toJavaObject(HoisterTimeData.class); hoister.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); hoisterList.add(hoister); break; case AppConstants.TYPE_FEED: FeedmatchTimeData feed = json.toJavaObject(FeedmatchTimeData.class); feed.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo); feedmatchList.add(feed); break; default: log.warn("未知的设备类型,Key: {}", fullKey); } } catch (Exception e) { log.error("数据转换失败 Key: {}", fullKey, e); } } // 5. 批量入库 if (!rollerList.isEmpty()) rollerService.saveBatch(rollerList); if (!packerList.isEmpty()) packerService.saveBatch(packerList); if (!boxList.isEmpty()) boxService.saveBatch(boxList); if (!makeupList.isEmpty()) makeupService.saveBatch(makeupList); if (!transList.isEmpty()) transService.saveBatch(transList); if (!hoisterList.isEmpty()) hoisterService.saveBatch(hoisterList); if (!feedmatchList.isEmpty()) feedmatchService.saveBatch(feedmatchList); int total = rollerList.size() + packerList.size() + boxList.size() + makeupList.size() + transList.size() + hoisterList.size() + feedmatchList.size(); if (total > 0) { log.info("采集完成: Shift={}, Total={}, Cost={}ms", shiftId, total, System.currentTimeMillis() - startTime); } sleepRemaining(startTime); } catch (Exception e) { log.error("同步任务异常", e); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } private void sleepRemaining(long startTime) { long elapsed = System.currentTimeMillis() - startTime; long sleepTime = intervalMs - elapsed; if (sleepTime > 0) { try { TimeUnit.MILLISECONDS.sleep(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // 应用关闭时停止任务 public void stop() { this.running = false; executorService.shutdown(); } }