广丰卷烟厂数采质量分析系统
zhuguifei
2026-03-02 974c7aa4010d77bb410b99931b4435d5442deb4b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package com.shlb.timescaledbutils.task;
 
import com.alibaba.fastjson.JSONObject;
import com.shlb.timescaledbutils.constant.AppConstants;
import com.shlb.timescaledbutils.entity.*;
import com.shlb.timescaledbutils.service.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
 
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
/**
 * Redis 数据同步任务
 * 定期从 Redis 获取设备实时数据并分流写入不同 PostgreSQL 表
 */
@Component
@Slf4j
public class RedisToPostgresSyncTask implements ApplicationRunner {
 
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
 
    // 各设备对应的 Service
    @Autowired
    private IRollerTimeDataService rollerService;
    @Autowired
    private IPackerTimeDataService packerService;
    @Autowired
    private IBoxTimeDataService boxService;
    @Autowired
    private IMakeupTimeDataService makeupService;
    @Autowired
    private ITransTimeDataService transService;
    @Autowired
    private IHoisterTimeDataService hoisterService;
    @Autowired
    private IFeedmatchTimeDataService feedmatchService;
 
    @Value("${spring.redis.sync-interval-ms:5000}")
    private long intervalMs;
 
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    private volatile boolean running = true;
 
    @Override
    public void run(ApplicationArguments args) {
        // 强制限制最小间隔为 1000ms
        if (intervalMs < 1000) {
            log.warn("配置的同步间隔 {}ms 小于最小限制 1000ms,已自动修正为 1000ms", intervalMs);
            intervalMs = 1000;
        }
 
        executorService.submit(this::processSync);
        log.info("Redis -> PostgreSQL 多表分流数据采集任务已启动,采集间隔: {}ms", intervalMs);
    }
 
    private void processSync() {
        while (running) {
            try {
                long startTime = System.currentTimeMillis();
 
                // 1. 获取班次信息
                // Redis Key: "shift", Field: "ID"
                Object shiftIdObj = stringRedisTemplate.opsForHash().get("shift", "ID");
                String shiftId = shiftIdObj != null ? shiftIdObj.toString() : null;
 
                if (shiftId == null || shiftId.isEmpty()) {
                    // 如果获取不到班次,可能是Redis暂时没数据,记录error日志并等待下一次
                    log.error("Redis中未找到班次信息 (key='shift', field='id'),跳过本次采集");
                    sleepRemaining(startTime);
                    continue;
                }
 
                // 2. 构造所有设备的 Redis Key (班次 + 设备3位码)
                // 例如:班次"2" + 设备"101" = "2101"
                List<String> equipmentKeys = new ArrayList<>(AppConstants.EQUIPMENT_LIST.size());
                for (String code : AppConstants.EQUIPMENT_LIST) {
                    equipmentKeys.add(shiftId + code);
                }
 
                // 3. 批量获取设备数据 (Pipeline)
                // 使用 Pipeline 减少网络 RTT
                List<Object> results = stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
                    @Override
                    public Object execute(RedisOperations operations) throws DataAccessException {
                        for (String key : equipmentKeys) {
                            operations.opsForHash().entries(key);
                        }
                        return null;
                    }
                });
 
                // 将 results 转为 Map,方便后续通过 Key 获取其他设备数据
                Map<String, Map<Object, Object>> equipmentDataMap = new HashMap<>();
                if (results != null) {
                    for (int i = 0; i < results.size(); i++) {
                        Object res = results.get(i);
                        if (res instanceof Map && i < equipmentKeys.size()) {
                            equipmentDataMap.put(equipmentKeys.get(i), (Map<Object, Object>) res);
                        }
                    }
                }
 
                // 4. 数据分类与映射
                Date now = new Date();
                
                List<RollerTimeData> rollerList = new ArrayList<>();
                List<PackerTimeData> packerList = new ArrayList<>();
                List<BoxTimeData> boxList = new ArrayList<>();
                List<MakeupTimeData> makeupList = new ArrayList<>();
                List<TransTimeData> transList = new ArrayList<>();
                List<HoisterTimeData> hoisterList = new ArrayList<>();
                List<FeedmatchTimeData> feedmatchList = new ArrayList<>();
 
                for (int i = 0; i < results.size(); i++) {
                    Object result = results.get(i);
                    // executePipelined 返回的结果中,hash entries 返回的是 Map<Object, Object>
                    if (!(result instanceof Map)) {
                        continue;
                    }
                    Map<Object, Object> map = (Map<Object, Object>) result;
                    if (map.isEmpty()) {
                        continue;
                    }
 
                    String fullKey = equipmentKeys.get(i);
                    int type = AppConstants.getEquipmentType(fullKey);
 
                    try {
                        // 解析班次和设备号
                        Integer shift = null;
                        Integer equNo = null;
                        if (fullKey != null && fullKey.length() >= 4) {
                             try {
                                 shift = Integer.parseInt(fullKey.substring(0, 1));
                                 equNo = Integer.parseInt(fullKey.substring(1));
                             } catch (NumberFormatException e) {
                                 log.warn("解析班次/设备号失败 Key: {}", fullKey);
                             }
                        }
 
                        // 将 Map 转为 JSONObject 以便利用 fastjson 的类型转换功能
                        JSONObject json = new JSONObject();
                        for (Map.Entry<Object, Object> entry : map.entrySet()) {
                            json.put(entry.getKey().toString(), entry.getValue());
                        }
 
                        // 根据设备类型转换并添加到对应的列表
                        switch (type) {
                            case AppConstants.TYPE_ROLLING:
                                RollerTimeData roller = json.toJavaObject(RollerTimeData.class);
                                roller.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                rollerList.add(roller);
                                break;
                            case AppConstants.TYPE_PACKAGING:
                                PackerTimeData packer = json.toJavaObject(PackerTimeData.class);
                                packer.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                try {
                                    if(json.containsKey("chcQty")){
                                        packer.setQty(Double.parseDouble(json.get("chcQty").toString()));
                                    }
                                    if(json.containsKey("ptchQty")){
                                        packer.setMainQty(Double.parseDouble(json.get("ptchQty").toString()));
                                    }
                                    // 包装机需要获取提升机的产量,提升机的key为 早1601、中2601、晚3601
                                    // 逻辑:1-3号包装机对应tQty1-3;4号无;5-13号对应tQty4-12 (即号数-1)
                                    if (equNo != null && equNo > 0) {
                                        int machineNo = equNo % 100;
                                        String targetField =  "tQty" + machineNo;;
//                                        if (machineNo < 4) {
//                                            targetField = "tQty" + machineNo;
//                                        } else if (machineNo > 4) {
//                                            targetField = "tQty" + (machineNo - 1);
//                                        }
 
                                        Map<Object, Object> tsjMap = equipmentDataMap.get(shiftId + "601");
                                        if (tsjMap != null) {
                                            Object qtyObj = tsjMap.get(targetField);
                                            if (qtyObj != null) {
                                                packer.setTsQty(Double.parseDouble(qtyObj.toString()));
                                            }
                                        }
                                    }
                                }catch (Exception e){}
                                packerList.add(packer);
                                break;
                            case AppConstants.TYPE_SEALING:
                                BoxTimeData box = json.toJavaObject(BoxTimeData.class);
                                box.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                boxList.add(box);
                                break;
                            case AppConstants.TYPE_FORMING:
                                MakeupTimeData makeup = json.toJavaObject(MakeupTimeData.class);
                                makeup.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                makeupList.add(makeup);
                                break;
                            case AppConstants.TYPE_LAUNCHING:
                                TransTimeData trans = json.toJavaObject(TransTimeData.class);
                                trans.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                transList.add(trans);
                                break;
                            case AppConstants.TYPE_LIFTING:
                                HoisterTimeData hoister = json.toJavaObject(HoisterTimeData.class);
                                hoister.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                hoisterList.add(hoister);
                                break;
                            case AppConstants.TYPE_FEED:
                                FeedmatchTimeData feed = json.toJavaObject(FeedmatchTimeData.class);
                                feed.setTime(now).setKey(fullKey).setShift(shift).setEquNo(equNo);
                                feedmatchList.add(feed);
                                break;
                            default:
                                log.warn("未知的设备类型,Key: {}", fullKey);
                        }
                    } catch (Exception e) {
                        log.error("数据转换失败 Key: {}", fullKey, e);
                    }
                }
 
                // 5. 批量入库
                if (!rollerList.isEmpty()) rollerService.saveBatch(rollerList);
                if (!packerList.isEmpty()) packerService.saveBatch(packerList);
                if (!boxList.isEmpty()) boxService.saveBatch(boxList);
                if (!makeupList.isEmpty()) makeupService.saveBatch(makeupList);
                if (!transList.isEmpty()) transService.saveBatch(transList);
                if (!hoisterList.isEmpty()) hoisterService.saveBatch(hoisterList);
                if (!feedmatchList.isEmpty()) feedmatchService.saveBatch(feedmatchList);
 
                int total = rollerList.size() + packerList.size() + boxList.size() + 
                            makeupList.size() + transList.size() + hoisterList.size() + feedmatchList.size();
                
                if (total > 0) {
                    log.info("采集完成: Shift={}, Total={}, Cost={}ms", shiftId, total, System.currentTimeMillis() - startTime);
                }
 
                sleepRemaining(startTime);
 
            } catch (Exception e) {
                log.error("同步任务异常", e);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
 
    private void sleepRemaining(long startTime) {
        long elapsed = System.currentTimeMillis() - startTime;
        long sleepTime = intervalMs - elapsed;
        if (sleepTime > 0) {
            try {
                TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
 
    // 应用关闭时停止任务
    public void stop() {
        this.running = false;
        executorService.shutdown();
    }
}