package com.shlb.timescaledbutils.task;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.shlb.timescaledbutils.constant.AppConstants;
|
import com.shlb.timescaledbutils.entity.*;
|
import com.shlb.timescaledbutils.service.*;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.dao.DataAccessException;
|
import org.springframework.data.redis.core.RedisOperations;
|
import org.springframework.data.redis.core.SessionCallback;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.stereotype.Component;
|
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* Redis 数据同步任务
|
* 定期从 Redis 获取设备实时数据并分流写入不同 PostgreSQL 表
|
*/
|
@Component
|
@Slf4j
|
public class RedisToPostgresSyncTask implements ApplicationRunner {
|
|
@Autowired
|
private StringRedisTemplate stringRedisTemplate;
|
|
// 各设备对应的 Service
|
@Autowired
|
private IRollerTimeDataService rollerService;
|
@Autowired
|
private IPackerTimeDataService packerService;
|
@Autowired
|
private IBoxTimeDataService boxService;
|
@Autowired
|
private IMakeupTimeDataService makeupService;
|
@Autowired
|
private ITransTimeDataService transService;
|
@Autowired
|
private IHoisterTimeDataService hoisterService;
|
@Autowired
|
private IFeedmatchTimeDataService feedmatchService;
|
|
@Value("${spring.redis.sync-interval-ms:5000}")
|
private long intervalMs;
|
|
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
private volatile boolean running = true;
|
|
@Override
|
public void run(ApplicationArguments args) {
|
// 强制限制最小间隔为 1000ms
|
if (intervalMs < 1000) {
|
log.warn("配置的同步间隔 {}ms 小于最小限制 1000ms,已自动修正为 1000ms", intervalMs);
|
intervalMs = 1000;
|
}
|
|
executorService.submit(this::processSync);
|
log.info("Redis -> PostgreSQL 多表分流数据采集任务已启动,采集间隔: {}ms", intervalMs);
|
}
|
|
private void processSync() {
|
while (running) {
|
try {
|
long startTime = System.currentTimeMillis();
|
|
// 1. 获取班次信息
|
// Redis Key: "shift", Field: "ID"
|
Object shiftIdObj = stringRedisTemplate.opsForHash().get("shift", "ID");
|
String shiftId = shiftIdObj != null ? shiftIdObj.toString() : null;
|
|
if (shiftId == null || shiftId.isEmpty()) {
|
// 如果获取不到班次,可能是Redis暂时没数据,记录error日志并等待下一次
|
log.error("Redis中未找到班次信息 (key='shift', field='id'),跳过本次采集");
|
sleepRemaining(startTime);
|
continue;
|
}
|
|
// 2. 构造所有设备的 Redis Key (班次 + 设备3位码)
|
// 例如:班次"2" + 设备"101" = "2101"
|
List<String> equipmentKeys = new ArrayList<>(AppConstants.EQUIPMENT_LIST.size());
|
for (String code : AppConstants.EQUIPMENT_LIST) {
|
equipmentKeys.add(shiftId + code);
|
}
|
|
// 3. 批量获取设备数据 (Pipeline)
|
// 使用 Pipeline 减少网络 RTT
|
List<Object> results = stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
|
@Override
|
public Object execute(RedisOperations operations) throws DataAccessException {
|
for (String key : equipmentKeys) {
|
operations.opsForHash().entries(key);
|
}
|
return null;
|
}
|
});
|
|
// 将 results 转为 Map,方便后续通过 Key 获取其他设备数据
|
Map<String, Map<Object, Object>> equipmentDataMap = new HashMap<>();
|
if (results != null) {
|
for (int i = 0; i < results.size(); i++) {
|
Object res = results.get(i);
|
if (res instanceof Map && i < equipmentKeys.size()) {
|
equipmentDataMap.put(equipmentKeys.get(i), (Map<Object, Object>) res);
|
}
|
}
|
}
|
|
// 4. 数据分类与映射
|
Date now = new Date();
|
|
List<RollerTimeData> rollerList = new ArrayList<>();
|
List<PackerTimeData> packerList = new ArrayList<>();
|
List<BoxTimeData> boxList = new ArrayList<>();
|
List<MakeupTimeData> makeupList = new ArrayList<>();
|
List<TransTimeData> transList = new ArrayList<>();
|
List<HoisterTimeData> hoisterList = new ArrayList<>();
|
List<FeedmatchTimeData> feedmatchList = new ArrayList<>();
|
|
for (int i = 0; i < results.size(); i++) {
|
Object result = results.get(i);
|
// executePipelined 返回的结果中,hash entries 返回的是 Map<Object, Object>
|
if (!(result instanceof Map)) {
|
continue;
|
}
|
Map<Object, Object> map = (Map<Object, Object>) result;
|
if (map.isEmpty()) {
|
continue;
|
}
|
|
String fullKey = equipmentKeys.get(i);
|
int type = AppConstants.getEquipmentType(fullKey);
|
|
try {
|
// 解析班次和设备号
|
Integer shift = null;
|
Integer equNo = null;
|
if (fullKey != null && fullKey.length() >= 4) {
|
try {
|
shift = Integer.parseInt(fullKey.substring(0, 1));
|
equNo = Integer.parseInt(fullKey.substring(1));
|
} catch (NumberFormatException e) {
|
log.warn("解析班次/设备号失败 Key: {}", fullKey);
|
}
|
}
|
|
// 将 Map 转为 JSONObject 以便利用 fastjson 的类型转换功能
|
JSONObject json = new JSONObject();
|
for (Map.Entry<Object, Object> entry : map.entrySet()) {
|
json.put(entry.getKey().toString(), entry.getValue());
|
}
|
|
// 根据设备类型转换并添加到对应的列表
|
switch (type) {
|
case AppConstants.TYPE_ROLLING:
|
RollerTimeData roller = json.toJavaObject(RollerTimeData.class);
|
roller.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
rollerList.add(roller);
|
break;
|
case AppConstants.TYPE_PACKAGING:
|
PackerTimeData packer = json.toJavaObject(PackerTimeData.class);
|
packer.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
try {
|
if(json.containsKey("chcQty")){
|
packer.setQty(Double.parseDouble(json.get("chcQty").toString()));
|
}
|
if(json.containsKey("ptchQty")){
|
packer.setMainQty(Double.parseDouble(json.get("ptchQty").toString()));
|
}
|
// 包装机需要获取提升机的产量,提升机的key为 早1601、中2601、晚3601
|
// 逻辑:1-3号包装机对应tQty1-3;4号无;5-13号对应tQty4-12 (即号数-1)
|
if (equNo != null && equNo > 0) {
|
int machineNo = equNo % 100;
|
String targetField = "tQty" + machineNo;;
|
// if (machineNo < 4) {
|
// targetField = "tQty" + machineNo;
|
// } else if (machineNo > 4) {
|
// targetField = "tQty" + (machineNo - 1);
|
// }
|
|
Map<Object, Object> tsjMap = equipmentDataMap.get(shiftId + "601");
|
if (tsjMap != null) {
|
Object qtyObj = tsjMap.get(targetField);
|
if (qtyObj != null) {
|
packer.setTsQty(Double.parseDouble(qtyObj.toString()));
|
}
|
}
|
}
|
}catch (Exception e){}
|
packerList.add(packer);
|
break;
|
case AppConstants.TYPE_SEALING:
|
BoxTimeData box = json.toJavaObject(BoxTimeData.class);
|
box.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
boxList.add(box);
|
break;
|
case AppConstants.TYPE_FORMING:
|
MakeupTimeData makeup = json.toJavaObject(MakeupTimeData.class);
|
makeup.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
makeupList.add(makeup);
|
break;
|
case AppConstants.TYPE_LAUNCHING:
|
TransTimeData trans = json.toJavaObject(TransTimeData.class);
|
trans.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
transList.add(trans);
|
break;
|
case AppConstants.TYPE_LIFTING:
|
HoisterTimeData hoister = json.toJavaObject(HoisterTimeData.class);
|
hoister.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
hoisterList.add(hoister);
|
break;
|
case AppConstants.TYPE_FEED:
|
FeedmatchTimeData feed = json.toJavaObject(FeedmatchTimeData.class);
|
feed.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
|
feedmatchList.add(feed);
|
break;
|
default:
|
log.warn("未知的设备类型,Key: {}", fullKey);
|
}
|
} catch (Exception e) {
|
log.error("数据转换失败 Key: {}", fullKey, e);
|
}
|
}
|
|
// 5. 批量入库
|
if (!rollerList.isEmpty()) rollerService.saveBatch(rollerList);
|
if (!packerList.isEmpty()) packerService.saveBatch(packerList);
|
if (!boxList.isEmpty()) boxService.saveBatch(boxList);
|
if (!makeupList.isEmpty()) makeupService.saveBatch(makeupList);
|
if (!transList.isEmpty()) transService.saveBatch(transList);
|
if (!hoisterList.isEmpty()) hoisterService.saveBatch(hoisterList);
|
if (!feedmatchList.isEmpty()) feedmatchService.saveBatch(feedmatchList);
|
|
int total = rollerList.size() + packerList.size() + boxList.size() +
|
makeupList.size() + transList.size() + hoisterList.size() + feedmatchList.size();
|
|
if (total > 0) {
|
log.info("采集完成: Shift={}, Total={}, Cost={}ms", shiftId, total, System.currentTimeMillis() - startTime);
|
}
|
|
sleepRemaining(startTime);
|
|
} catch (Exception e) {
|
log.error("同步任务异常", e);
|
try {
|
TimeUnit.SECONDS.sleep(5);
|
} catch (InterruptedException ex) {
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
}
|
|
private void sleepRemaining(long startTime) {
|
long elapsed = System.currentTimeMillis() - startTime;
|
long sleepTime = intervalMs - elapsed;
|
if (sleepTime > 0) {
|
try {
|
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
|
// 应用关闭时停止任务
|
public void stop() {
|
this.running = false;
|
executorService.shutdown();
|
}
|
}
|