| | |
| | | package org.dromara.job.processors; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import cn.hutool.core.lang.Dict; |
| | | import com.google.common.collect.Lists; |
| | | import lombok.AllArgsConstructor; |
| | | import lombok.Getter; |
| | | import lombok.NoArgsConstructor; |
| | | import lombok.ToString; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.json.utils.JsonUtils; |
| | | import org.springframework.stereotype.Component; |
| | | import tech.powerjob.common.serialize.JsonUtils; |
| | | import tech.powerjob.worker.core.processor.ProcessResult; |
| | | import tech.powerjob.worker.core.processor.TaskContext; |
| | | import tech.powerjob.worker.core.processor.TaskResult; |
| | |
| | | |
| | | log.info("============== TestMapReduceProcessor#process =============="); |
| | | log.info("isRootTask:{}", isRootTask()); |
| | | log.info("taskContext:{}", JsonUtils.toJSONString(context)); |
| | | log.info("taskContext:{}", JsonUtils.toJsonString(context)); |
| | | |
| | | // 根据控制台参数获取MR批次及子任务大小 |
| | | final JSONObject jobParams = JSONObject.parseObject(context.getJobParams()); |
| | | final Dict jobParams = JsonUtils.parseMap(context.getJobParams()); |
| | | |
| | | Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100); |
| | | Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10); |
| | |
| | | return new ProcessResult(true, "MAP_SUCCESS"); |
| | | } else { |
| | | log.info("==== NORMAL_PROCESS ===="); |
| | | omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask())); |
| | | log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask())); |
| | | omsLogger.info("[DemoMRProcessor] process subTask: {}.", JsonUtils.toJsonString(context.getSubTask())); |
| | | log.info("subTask: {}", JsonUtils.toJsonString(context.getSubTask())); |
| | | Thread.sleep(1000); |
| | | if (context.getCurrentRetryTimes() == 0) { |
| | | return new ProcessResult(false, "FIRST_FAILED"); |
| | |
| | | @Override |
| | | public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) { |
| | | log.info("================ MapReduceProcessorDemo#reduce ================"); |
| | | log.info("TaskContext: {}", JSONObject.toJSONString(context)); |
| | | log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults)); |
| | | log.info("TaskContext: {}", JsonUtils.toJsonString(context)); |
| | | log.info("List<TaskResult>: {}", JsonUtils.toJsonString(taskResults)); |
| | | context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults); |
| | | |
| | | boolean success = ThreadLocalRandom.current().nextBoolean(); |