package com.dingzhuo.compute.engine.actor.alarm; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.UntypedAbstractActor; import com.dingzhuo.compute.engine.message.alarm.AlarmJudgeMessage; import com.dingzhuo.compute.engine.message.alarm.AlarmRegisterMessage; import com.dingzhuo.compute.engine.message.alarm.AlarmType; import com.dingzhuo.compute.engine.message.timer.RegisterType; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.compute.engine.utils.CacheService; import com.dingzhuo.energy.dataservice.domain.TagValue; import com.dingzhuo.energy.dataservice.service.RealtimeDatabaseService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @Component("alarmTimerActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class AlarmTimerActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "alarmTimerActor"; private final ActorSystem actorSystem; private Cancellable timer; private Map> actorIds = new HashMap<>(); private List realtimeTags = new ArrayList<>(); private ActorSelection realtimeAlarmActor; private ActorSelection periodAlarmActor; private final RealtimeDatabaseService realtimeDatabaseService; private CacheService cacheService; public AlarmTimerActor(ActorSystem actorSystem, RealtimeDatabaseService realtimeDatabaseService, CacheService cacheService) { this.actorSystem = actorSystem; this.realtimeDatabaseService = realtimeDatabaseService; this.cacheService = cacheService; } @Override public void preStart() { realtimeAlarmActor = getContext() .actorSelection(ActorUtil.getActorAddress(RealtimeAlarmActor.ACTOR_NAME)); periodAlarmActor = getContext() .actorSelection(ActorUtil.getActorAddress(PeriodAlarmActor.ACTOR_NAME)); FiniteDuration interval = Duration.create(30, TimeUnit.SECONDS); FiniteDuration delay = Duration.Zero(); timer = actorSystem.scheduler() .scheduleAtFixedRate(delay, interval, self(), Message.TIMER, actorSystem.dispatcher(), self()); } @Override public void postStop() throws Exception { super.postStop(); if (timer != null) { timer.cancel(); } } @Override public void onReceive(Object message) { if (message == Message.TIMER) { timer(); } else if (message instanceof AlarmRegisterMessage) { registerAlarm((AlarmRegisterMessage) message); } } private void registerAlarm(AlarmRegisterMessage registerMessage) { AlarmType alarmType = registerMessage.getAlarmType(); if (!actorIds.containsKey(alarmType)) { actorIds.put(alarmType, new ArrayList<>()); } if (registerMessage.getRegisterType() == RegisterType.REGISTER) { actorIds.get(alarmType).add(registerMessage.getActorId()); if (alarmType == AlarmType.LIVE) { realtimeTags.add(registerMessage.getAlarmItem().getIndexCode()); } } else if (registerMessage.getRegisterType() == RegisterType.UNREGISTER) { actorIds.get(alarmType).remove(registerMessage.getActorId()); if (alarmType == AlarmType.LIVE) { realtimeTags.remove(registerMessage.getAlarmItem().getIndexCode()); } } } private void timer() { if (!actorIds.isEmpty()) { List realtimeAlarmIds = actorIds.get(AlarmType.LIVE); List periodAlarmIds = actorIds.get(AlarmType.PERIOD); if (!realtimeTags.isEmpty()) { List tagValues = realtimeDatabaseService.retrieve(realtimeTags); cacheService.cacheTagValues(tagValues); } if (realtimeAlarmIds != null && !realtimeAlarmIds.isEmpty()) { realtimeAlarmIds .forEach(id -> realtimeAlarmActor.tell(new AlarmJudgeMessage(id), getSender())); } if (periodAlarmIds != null && !periodAlarmIds.isEmpty()) { periodAlarmIds.forEach(id -> periodAlarmActor.tell(new AlarmJudgeMessage(id), getSender())); } } } private enum Message { /** * 时间触发 */ TIMER } }