package com.dingzhuo.compute.engine.actor.indexcalc; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.UntypedAbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import com.dingzhuo.compute.engine.message.ExecuteType; import com.dingzhuo.compute.engine.message.calculation.CalculateMessage; import com.dingzhuo.compute.engine.message.timer.RegisterTimeMessage; import com.dingzhuo.compute.engine.message.timer.RegisterType; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.compute.engine.utils.CacheService; import com.dingzhuo.compute.engine.utils.ServiceProvicer; import com.dingzhuo.energy.common.utils.time.TimeManager; import com.dingzhuo.energy.common.utils.time.TimeType; import java.util.Date; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** * @author fanxinfu */ @Component("timerActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TimerActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "timerActor"; private final CacheService cacheService; LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); private final ActorSystem actorSystem; private Cancellable timerCancelable; private ActorSelection calculationActor; public TimerActor(ActorSystem actorSystem, CacheService cacheService) { this.actorSystem = actorSystem; this.cacheService = cacheService; } @Override public void preStart() throws Exception { super.preStart(); calculationActor = getContext() .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME)); FiniteDuration interval = Duration .create(ServiceProvicer.getCalculationConfig().getInterval(), TimeUnit.SECONDS); FiniteDuration delay = Duration.Zero(); this.timerCancelable = actorSystem.scheduler() .scheduleAtFixedRate(delay, interval, self(), Message.TIMER, actorSystem.dispatcher(), self()); } @Override public void postStop() throws Exception { super.postStop(); if (this.timerCancelable != null) { this.timerCancelable.cancel(); } } @Override public void onReceive(Object message) { if (message instanceof Message) { if (message == Message.TIMER) { doTimer(); } } else if (message instanceof RegisterTimeMessage) { register((RegisterTimeMessage) message); } } private void register(RegisterTimeMessage message) { if (!cacheService.getRegisters().containsKey(message.getTimeType())) { cacheService.getRegisters().put(message.getTimeType(), new HashSet<>()); } if (message.getRegisterType() == RegisterType.REGISTER) { cacheService.getRegisters().get(message.getTimeType()).add(message.getActorId()); } else if (message.getRegisterType() == RegisterType.UNREGISTER) { cacheService.getRegisters().get(message.getTimeType()).remove(message.getActorId()); } } private void doTimer() { Date now = DateTime.now().toDate(); TimeType[] timeTypes = TimeManager.typeArray; int delayTime = ServiceProvicer.getCalculationConfig().getInterval() * 3; for (TimeType timeType : timeTypes) { String timeCode = TimeManager.getExecuteTimeCode(now, timeType, delayTime); if (StringUtils.isEmpty(timeCode)) { continue; } Set actorIds = cacheService.getRegisters().get(timeType); if (actorIds == null || actorIds.isEmpty()) { continue; } for (String actorId : actorIds) { calculationActor .tell(new CalculateMessage(actorId, timeCode, timeType, ExecuteType.TIMER), getSelf()); } } } public enum Message { /** * 时间触发 */ TIMER } }