From 35ad3067307e40b89d3ea8c131ac35c17f62ff1c Mon Sep 17 00:00:00 2001
From: 三个三 <2029364173@qq.com>
Date: 星期六, 17 六月 2023 23:13:02 +0800
Subject: [PATCH] Merge branch '5.X' of gitee.com:dromara/RuoYi-Vue-Plus into JustAuth

---
 ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java |   93 ++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 93 insertions(+), 0 deletions(-)

diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java
new file mode 100644
index 0000000..1498854
--- /dev/null
+++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java
@@ -0,0 +1,93 @@
+package org.dromara.job.processors;
+
+import cn.hutool.core.lang.Dict;
+import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.json.utils.JsonUtils;
+import org.springframework.stereotype.Component;
+import tech.powerjob.worker.core.processor.ProcessResult;
+import tech.powerjob.worker.core.processor.TaskContext;
+import tech.powerjob.worker.core.processor.TaskResult;
+import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
+import tech.powerjob.worker.log.OmsLogger;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * MapReduce 澶勭悊鍣ㄧず渚�
+ * 鎺у埗鍙板弬鏁帮細{"batchSize": 100, "batchNum": 2}
+ *
+ * @author tjq
+ * @since 2020/4/17
+ */
+@Slf4j
+@Component
+public class MapReduceProcessorDemo implements MapReduceProcessor {
+
+    @Override
+    public ProcessResult process(TaskContext context) throws Exception {
+
+        OmsLogger omsLogger = context.getOmsLogger();
+
+        log.info("============== TestMapReduceProcessor#process ==============");
+        log.info("isRootTask:{}", isRootTask());
+        log.info("taskContext:{}", JsonUtils.toJsonString(context));
+
+        // 鏍规嵁鎺у埗鍙板弬鏁拌幏鍙朚R鎵规鍙婂瓙浠诲姟澶у皬
+        final Dict jobParams = JsonUtils.parseMap(context.getJobParams());
+
+        Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
+        Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
+
+        if (isRootTask()) {
+            log.info("==== MAP ====");
+            omsLogger.info("[DemoMRProcessor] start root task~");
+            List<TestSubTask> subTasks = Lists.newLinkedList();
+            for (int j = 0; j < batchNum; j++) {
+                for (int i = 0; i < batchSize; i++) {
+                    int x = j * batchSize + i;
+                    subTasks.add(new TestSubTask("name" + x, x));
+                }
+                map(subTasks, "MAP_TEST_TASK");
+                subTasks.clear();
+            }
+            omsLogger.info("[DemoMRProcessor] map success~");
+            return new ProcessResult(true, "MAP_SUCCESS");
+        } else {
+            log.info("==== NORMAL_PROCESS ====");
+            omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask()));
+            log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask()));
+            Thread.sleep(1000);
+            if (context.getCurrentRetryTimes() == 0) {
+                return new ProcessResult(false, "FIRST_FAILED");
+            } else {
+                return new ProcessResult(true, "PROCESS_SUCCESS");
+            }
+        }
+    }
+
+    @Override
+    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
+        log.info("================ MapReduceProcessorDemo#reduce ================");
+        log.info("TaskContext: {}", JsonUtils.toJsonString(context));
+        log.info("List<TaskResult>: {}", JsonUtils.toJsonString(taskResults));
+        context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);
+
+        boolean success = ThreadLocalRandom.current().nextBoolean();
+        return new ProcessResult(success, context + ": " + success);
+    }
+
+    @Getter
+    @ToString
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class TestSubTask {
+        private String name;
+        private int age;
+    }
+}

--
Gitblit v1.9.3