package com.dingzhuo.compute.engine.actor.indexcalc; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.UntypedAbstractActor; import akka.cluster.sharding.ClusterSharding; import com.dingzhuo.compute.engine.function.FunctionEngine; import com.dingzhuo.compute.engine.message.calculation.CalculateMessage; import com.dingzhuo.compute.engine.message.calculation.LinkMessage; import com.dingzhuo.compute.engine.message.calculation.LoadCalcIndexMessage; import com.dingzhuo.compute.engine.message.calculation.UnlinkMessage; import com.dingzhuo.compute.engine.message.calculation.UnloadCalcIndexMessage; import com.dingzhuo.compute.engine.message.save.SaveMessage; import com.dingzhuo.compute.engine.message.timer.RegisterTimeMessage; import com.dingzhuo.compute.engine.message.timer.RegisterType; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.compute.engine.utils.CacheService; import com.dingzhuo.energy.common.utils.StringUtils; import com.dingzhuo.energy.data.model.domain.IndexStorage; import com.greenpineyu.fel.context.FelContext; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; /** * @author fanxinfu */ @Component("indexCalcActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class CalculationIndexActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "indexCalcActor"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final CacheService cacheService; private ActorSelection calculateActor; private ActorSelection timerActor; private ActorSelection saveActor; public CalculationIndexActor(CacheService cacheService) { this.cacheService = cacheService; } @Override public void preStart() throws Exception { super.preStart(); calculateActor = getContext() .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME)); timerActor = getContext().actorSelection(ActorUtil.getActorAddress(TimerActor.ACTOR_NAME)); saveActor = getContext().actorSelection(ActorUtil.getActorAddress(SavePeriodActor.ACTOR_NAME)); } @Override public void onReceive(Object message) { if (message instanceof LoadCalcIndexMessage) { IndexStorage storage = ((LoadCalcIndexMessage) message).getIndexStorage(); loadIndex(storage); } else if (message instanceof UnloadCalcIndexMessage) { String actorId = ((UnloadCalcIndexMessage) message).getActorId(); unloadIndex(actorId); } else if (message instanceof LinkMessage) { LinkMessage linkMessage = (LinkMessage) message; cacheService.cachePostIndex(linkMessage.getActorId(), linkMessage.getPostActorId()); } else if (message instanceof UnlinkMessage) { UnlinkMessage unlinkMessage = (UnlinkMessage) message; cacheService.removePostIndexCache(unlinkMessage.getActorId(), unlinkMessage.getPostActorId()); } else if (message instanceof CalculateMessage) { calculate((CalculateMessage) message); } } private void calculate(CalculateMessage message) { try { IndexStorage indexStorage = cacheService.getIndexStorageCache(message.getActorId()); if (StringUtils.isBlank(indexStorage.getCalcText())) { return; } List postActorIds = cacheService.getPostActorIds(message.getActorId()); FelContext calcContext = FunctionEngine.getInstance().getContext(); calcContext.set("timeType", message.getTimeType().name()); calcContext.set("timeCode", message.getTimeCode()); Object value = FunctionEngine.getInstance().eval(indexStorage.getCalcText(), calcContext); SaveMessage saveMessage = new SaveMessage(); saveMessage.setIndexId(indexStorage.getIndexId()); saveMessage.setTimeType(message.getTimeType()); saveMessage.setTimeCode(message.getTimeCode()); if (value != null) { saveMessage.setValue(Double.parseDouble(String.valueOf(value))); } else { saveMessage.setValue(0d); } saveMessage.getPostActorIds().addAll(postActorIds); saveActor.tell(saveMessage, getSelf()); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } } private void unloadIndex(String actorId) { IndexStorage indexStorage = cacheService.getIndexStorageCache(actorId); RegisterTimeMessage message = new RegisterTimeMessage(actorId, indexStorage.getTimeType(), RegisterType.UNREGISTER); timerActor.tell(message, getSelf()); indexStorage.getParamIndex().forEach(param -> { String preActorId = ActorUtil.buildActorId(param, indexStorage.getTimeType()); calculateActor.tell(new UnlinkMessage(preActorId, actorId), getSelf()); }); cacheService.removeIndexCache(actorId); } private void loadIndex(IndexStorage storage) { cacheService.cacheIndexStorage(storage); String actorId = ActorUtil.buildActorId(storage); storage.getParamIndex().forEach(param -> { String preActorId = ActorUtil.buildActorId(param, storage.getTimeType()); calculateActor.tell(new LinkMessage(preActorId, actorId), getSelf()); }); RegisterTimeMessage message = new RegisterTimeMessage(actorId, storage.getTimeType(), RegisterType.REGISTER); timerActor.tell(message, getSelf()); } }