From 35ad3067307e40b89d3ea8c131ac35c17f62ff1c Mon Sep 17 00:00:00 2001 From: 三个三 <2029364173@qq.com> Date: 星期六, 17 六月 2023 23:13:02 +0800 Subject: [PATCH] Merge branch '5.X' of gitee.com:dromara/RuoYi-Vue-Plus into JustAuth --- ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 93 insertions(+), 0 deletions(-) diff --git a/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java new file mode 100644 index 0000000..1498854 --- /dev/null +++ b/ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java @@ -0,0 +1,93 @@ +package org.dromara.job.processors; + +import cn.hutool.core.lang.Dict; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.json.utils.JsonUtils; +import org.springframework.stereotype.Component; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; +import tech.powerjob.worker.log.OmsLogger; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * MapReduce 澶勭悊鍣ㄧず渚� + * 鎺у埗鍙板弬鏁帮細{"batchSize": 100, "batchNum": 2} + * + * @author tjq + * @since 2020/4/17 + */ +@Slf4j +@Component +public class MapReduceProcessorDemo implements MapReduceProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + OmsLogger omsLogger = context.getOmsLogger(); + + log.info("============== TestMapReduceProcessor#process =============="); + log.info("isRootTask:{}", isRootTask()); + log.info("taskContext:{}", JsonUtils.toJsonString(context)); + + // 鏍规嵁鎺у埗鍙板弬鏁拌幏鍙朚R鎵规鍙婂瓙浠诲姟澶у皬 + final Dict jobParams = JsonUtils.parseMap(context.getJobParams()); + + Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100); + Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10); + + if (isRootTask()) { + log.info("==== MAP ===="); + omsLogger.info("[DemoMRProcessor] start root task~"); + List<TestSubTask> subTasks = Lists.newLinkedList(); + for (int j = 0; j < batchNum; j++) { + for (int i = 0; i < batchSize; i++) { + int x = j * batchSize + i; + subTasks.add(new TestSubTask("name" + x, x)); + } + map(subTasks, "MAP_TEST_TASK"); + subTasks.clear(); + } + omsLogger.info("[DemoMRProcessor] map success~"); + return new ProcessResult(true, "MAP_SUCCESS"); + } else { + log.info("==== NORMAL_PROCESS ===="); + omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask())); + log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask())); + Thread.sleep(1000); + if (context.getCurrentRetryTimes() == 0) { + return new ProcessResult(false, "FIRST_FAILED"); + } else { + return new ProcessResult(true, "PROCESS_SUCCESS"); + } + } + } + + @Override + public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) { + log.info("================ MapReduceProcessorDemo#reduce ================"); + log.info("TaskContext: {}", JsonUtils.toJsonString(context)); + log.info("List<TaskResult>: {}", JsonUtils.toJsonString(taskResults)); + context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults); + + boolean success = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(success, context + ": " + success); + } + + @Getter + @ToString + @NoArgsConstructor + @AllArgsConstructor + public static class TestSubTask { + private String name; + private int age; + } +} -- Gitblit v1.9.3