package com.dingzhuo.compute.engine.actor.monitor; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.UntypedAbstractActor; import com.alibaba.fastjson.JSONArray; import com.dingzhuo.compute.engine.actor.indexcalc.CalculationIndexActor; import com.dingzhuo.compute.engine.message.ExecuteType; import com.dingzhuo.compute.engine.message.calculation.CalculateMessage; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.compute.engine.utils.CacheService; import com.dingzhuo.energy.common.utils.time.TimeManager; import com.dingzhuo.energy.common.utils.time.TimeType; import java.io.File; import java.nio.charset.Charset; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** * @author fanxinfu */ @Component("recastDataActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class RecastDataActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "recastDataActor"; private final CacheService cacheService; private Cancellable timerCancelable; private final ActorSystem actorSystem; private ActorSelection calculationActor; Logger logger = LoggerFactory.getLogger(RecastDataActor.class.getName()); public RecastDataActor(ActorSystem actorSystem, CacheService cacheService) { this.actorSystem = actorSystem; this.cacheService = cacheService; } @Override public void preStart() { calculationActor = getContext() .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME)); FiniteDuration interval = Duration.create(10, TimeUnit.SECONDS); FiniteDuration delay = Duration.Zero(); this.timerCancelable = actorSystem.scheduler().scheduleAtFixedRate(delay, interval, self(), Message.TIMER, actorSystem.dispatcher(), self()); } @Override public void postStop() throws Exception { super.postStop(); if (this.timerCancelable != null) { this.timerCancelable.cancel(); } } @Override public void onReceive(Object message) { if (message instanceof Message) { if (message == Message.TIMER) { verifyRecastData(); } } } private void verifyRecastData() { String pathname = "recast.json"; try { File jsonFile = new File(pathname); if (jsonFile.exists()) { sendRecastData(jsonFile); } } catch (Exception e) { logger.error("", e); } } private void sendRecastData(File jsonFile) { try { String jsonString = FileUtils.readFileToString(jsonFile, Charset.defaultCharset()); List configs = JSONArray.parseArray(jsonString, RecastConfig.class); configs.forEach(config -> { DateTime time = config.getBeginTime(); TimeType timeType = config.getTimeType(); Set actorIds = cacheService.getRegisters().get(timeType); if (actorIds != null && actorIds.size() > 0) { while (time.isBefore(config.getEndTime())) { String timeCode = TimeManager.getTimeCode(time.toDate(), timeType); for (String actorId : actorIds) { CalculateMessage msg = new CalculateMessage(actorId, timeCode, timeType, ExecuteType.TIMER); calculationActor.tell(msg, getSelf()); } if (timeType == TimeType.HOUR) { time = time.plusHours(1); } else if (timeType == TimeType.DAY) { time = time.plusDays(1); } else if (timeType == TimeType.MONTH) { time = time.plusMonths(1); } else if (timeType == TimeType.YEAR) { time = time.plusYears(1); } try { Thread.sleep(0); } catch (InterruptedException e) { logger.error("", e); } } } }); jsonFile.delete(); } catch (Exception ex) { logger.error("", ex); } } private static class RecastConfig { private DateTime beginTime; private DateTime endTime; private TimeType timeType; public DateTime getBeginTime() { return beginTime; } public void setBeginTime(DateTime beginTime) { this.beginTime = beginTime; } public DateTime getEndTime() { return endTime; } public void setEndTime(DateTime endTime) { this.endTime = endTime; } public TimeType getTimeType() { return timeType; } public void setTimeType(TimeType timeType) { this.timeType = timeType; } } public enum Message { /** * 时间触发 */ TIMER } }