疯狂的狮子Li
2023-06-17 bda0e0ec64a8f08f211975d1526b17f19cf0b130
ruoyi-modules/ruoyi-job/src/main/java/org/dromara/job/processors/MapReduceProcessorDemo.java
@@ -1,15 +1,14 @@
package org.dromara.job.processors;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cn.hutool.core.lang.Dict;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.json.utils.JsonUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
@@ -37,10 +36,10 @@
        log.info("============== TestMapReduceProcessor#process ==============");
        log.info("isRootTask:{}", isRootTask());
        log.info("taskContext:{}", JsonUtils.toJSONString(context));
        log.info("taskContext:{}", JsonUtils.toJsonString(context));
        // 根据控制台参数获取MR批次及子任务大小
        final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
        final Dict jobParams = JsonUtils.parseMap(context.getJobParams());
        Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
        Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
@@ -61,8 +60,8 @@
            return new ProcessResult(true, "MAP_SUCCESS");
        } else {
            log.info("==== NORMAL_PROCESS ====");
            omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask()));
            log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask()));
            omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask()));
            log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask()));
            Thread.sleep(1000);
            if (context.getCurrentRetryTimes() == 0) {
                return new ProcessResult(false, "FIRST_FAILED");
@@ -75,8 +74,8 @@
    @Override
    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
        log.info("================ MapReduceProcessorDemo#reduce ================");
        log.info("TaskContext: {}", JSONObject.toJSONString(context));
        log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
        log.info("TaskContext: {}", JsonUtils.toJsonString(context));
        log.info("List<TaskResult>: {}", JsonUtils.toJsonString(taskResults));
        context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);
        boolean success = ThreadLocalRandom.current().nextBoolean();