package com.dingzhuo.compute.engine.actor.device; import akka.actor.ActorSelection; import akka.actor.UntypedAbstractActor; import com.dingzhuo.compute.engine.message.device.LoadDeviceStatusMessage; import com.dingzhuo.compute.engine.message.device.UnloadDeviceStatusMessage; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.energy.data.monitoring.device.domain.DeviceFormula; import com.dingzhuo.energy.data.monitoring.device.service.IDeviceFormulaService; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import scala.concurrent.duration.Duration; @Component("loadDeviceStatusActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class LoadDeviceStatusActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "loadDeviceStatusActor"; private final IDeviceFormulaService deviceFormulaService; Map loadedDeviceFormula = new HashMap<>(); private ActorSelection deviceStatusTimerActor; public LoadDeviceStatusActor( IDeviceFormulaService deviceFormulaService) { this.deviceFormulaService = deviceFormulaService; } @Override public void preStart() throws Exception { super.preStart(); deviceStatusTimerActor = getContext() .actorSelection(ActorUtil.getActorAddress(DeviceStatusTimerActor.ACTOR_NAME)); this.context().system().scheduler() .scheduleAtFixedRate(Duration.Zero(), Duration.create(5, TimeUnit.MINUTES), this.self(), Message.REFRESH, this.context().system().dispatcher(), null); initDeviceStatus(); refreshDeviceStatusSetting(); } private void initDeviceStatus() { } @Override public void onReceive(Object message) { if (message instanceof Message) { if (message == Message.REFRESH) { refreshDeviceStatusSetting(); } else { this.unhandled(message); } } } private void refreshDeviceStatusSetting() { List formulas = deviceFormulaService.getAllDeviceFormula(); Map newFormulas = formulas.stream() .collect(Collectors.toMap(DeviceFormula::getId, item -> item)); Set needInstall = new HashSet<>(); Set needUninstall = new HashSet<>(); loadedDeviceFormula.forEach((id, alarmItem) -> { if (!newFormulas.containsKey(id)) { needUninstall.add(id); } else { Date nowUpdate = newFormulas.get(id).getUpdateTime(); Date lastUpdate = alarmItem.getUpdateTime(); if (lastUpdate != null && nowUpdate != null && lastUpdate.after(nowUpdate)) { needUninstall.add(id); needInstall.add(id); } } }); newFormulas.forEach((id, formula) -> { if (!loadedDeviceFormula.containsKey(id)) { needInstall.add(id); } }); needUninstall.forEach(id -> { DeviceFormula deviceFormula = loadedDeviceFormula.get(id); loadedDeviceFormula.remove(id); deviceStatusTimerActor.tell(new UnloadDeviceStatusMessage(deviceFormula), getSelf()); }); needInstall.forEach(id -> { DeviceFormula deviceFormula = newFormulas.get(id); deviceStatusTimerActor.tell(new LoadDeviceStatusMessage(deviceFormula), getSelf()); loadedDeviceFormula.put(id, deviceFormula); }); } public enum Message { /** * 检测配置 */ REFRESH } }