liulingling.177216
2024-08-26 349f1cfc5fa77fbc636d542df0d8050fddec48c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.dingzhuo.compute.engine.actor.indexcalc;
 
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.UntypedAbstractActor;
import akka.cluster.sharding.ClusterSharding;
import com.dingzhuo.compute.engine.function.FunctionEngine;
import com.dingzhuo.compute.engine.message.calculation.CalculateMessage;
import com.dingzhuo.compute.engine.message.calculation.LinkMessage;
import com.dingzhuo.compute.engine.message.calculation.LoadCalcIndexMessage;
import com.dingzhuo.compute.engine.message.calculation.UnlinkMessage;
import com.dingzhuo.compute.engine.message.calculation.UnloadCalcIndexMessage;
import com.dingzhuo.compute.engine.message.save.SaveMessage;
import com.dingzhuo.compute.engine.message.timer.RegisterTimeMessage;
import com.dingzhuo.compute.engine.message.timer.RegisterType;
import com.dingzhuo.compute.engine.utils.ActorUtil;
import com.dingzhuo.compute.engine.utils.CacheService;
import com.dingzhuo.energy.common.utils.StringUtils;
import com.dingzhuo.energy.data.model.domain.IndexStorage;
import com.greenpineyu.fel.context.FelContext;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
 
/**
 * @author fanxinfu
 */
@Component("indexCalcActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class CalculationIndexActor extends UntypedAbstractActor {
 
  public static final String ACTOR_NAME = "indexCalcActor";
  private final Logger logger = LoggerFactory.getLogger(this.getClass());
  private final CacheService cacheService;
  private ActorSelection calculateActor;
  private ActorSelection timerActor;
  private ActorSelection saveActor;
 
  public CalculationIndexActor(CacheService cacheService) {
    this.cacheService = cacheService;
  }
 
  @Override
  public void preStart() throws Exception {
    super.preStart();
    calculateActor = getContext()
        .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME));
    timerActor = getContext().actorSelection(ActorUtil.getActorAddress(TimerActor.ACTOR_NAME));
    saveActor = getContext().actorSelection(ActorUtil.getActorAddress(SavePeriodActor.ACTOR_NAME));
  }
 
  @Override
  public void onReceive(Object message) {
    if (message instanceof LoadCalcIndexMessage) {
      IndexStorage storage = ((LoadCalcIndexMessage) message).getIndexStorage();
      loadIndex(storage);
    } else if (message instanceof UnloadCalcIndexMessage) {
      String actorId = ((UnloadCalcIndexMessage) message).getActorId();
      unloadIndex(actorId);
    } else if (message instanceof LinkMessage) {
      LinkMessage linkMessage = (LinkMessage) message;
      cacheService.cachePostIndex(linkMessage.getActorId(), linkMessage.getPostActorId());
    } else if (message instanceof UnlinkMessage) {
      UnlinkMessage unlinkMessage = (UnlinkMessage) message;
      cacheService.removePostIndexCache(unlinkMessage.getActorId(), unlinkMessage.getPostActorId());
    } else if (message instanceof CalculateMessage) {
      calculate((CalculateMessage) message);
    }
  }
 
  private void calculate(CalculateMessage message) {
    try {
      IndexStorage indexStorage = cacheService.getIndexStorageCache(message.getActorId());
      if (StringUtils.isBlank(indexStorage.getCalcText())) {
        return;
      }
 
      List<String> postActorIds = cacheService.getPostActorIds(message.getActorId());
      FelContext calcContext = FunctionEngine.getInstance().getContext();
      calcContext.set("timeType", message.getTimeType().name());
      calcContext.set("timeCode", message.getTimeCode());
      Object value = FunctionEngine.getInstance().eval(indexStorage.getCalcText(), calcContext);
      SaveMessage saveMessage = new SaveMessage();
      saveMessage.setIndexId(indexStorage.getIndexId());
      saveMessage.setTimeType(message.getTimeType());
      saveMessage.setTimeCode(message.getTimeCode());
      if (value != null) {
        saveMessage.setValue(Double.parseDouble(String.valueOf(value)));
      } else {
        saveMessage.setValue(0d);
      }
      saveMessage.getPostActorIds().addAll(postActorIds);
 
      saveActor.tell(saveMessage, getSelf());
    } catch (Exception ex) {
      logger.error(ex.getMessage(), ex);
    }
  }
 
  private void unloadIndex(String actorId) {
    IndexStorage indexStorage = cacheService.getIndexStorageCache(actorId);
    RegisterTimeMessage message =
        new RegisterTimeMessage(actorId, indexStorage.getTimeType(), RegisterType.UNREGISTER);
    timerActor.tell(message, getSelf());
    indexStorage.getParamIndex().forEach(param -> {
      String preActorId = ActorUtil.buildActorId(param, indexStorage.getTimeType());
      calculateActor.tell(new UnlinkMessage(preActorId, actorId), getSelf());
    });
    cacheService.removeIndexCache(actorId);
  }
 
  private void loadIndex(IndexStorage storage) {
    cacheService.cacheIndexStorage(storage);
    String actorId = ActorUtil.buildActorId(storage);
    storage.getParamIndex().forEach(param -> {
      String preActorId = ActorUtil.buildActorId(param, storage.getTimeType());
      calculateActor.tell(new LinkMessage(preActorId, actorId), getSelf());
    });
 
    RegisterTimeMessage message =
        new RegisterTimeMessage(actorId, storage.getTimeType(), RegisterType.REGISTER);
    timerActor.tell(message, getSelf());
  }
}