liulingling.177216
2024-08-26 349f1cfc5fa77fbc636d542df0d8050fddec48c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.dingzhuo.compute.engine.actor.indexcalc;
 
import static org.apache.commons.lang3.StringUtils.equalsIgnoreCase;
 
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.actor.UntypedAbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.dingzhuo.compute.engine.message.ExecuteType;
import com.dingzhuo.compute.engine.message.calculation.CalculateMessage;
import com.dingzhuo.compute.engine.message.save.SaveMessage;
import com.dingzhuo.compute.engine.utils.ActorUtil;
import com.dingzhuo.compute.engine.utils.ServiceProvicer;
import com.dingzhuo.energy.common.utils.time.TimeManager;
import com.dingzhuo.energy.dataservice.domain.DataItem;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
 
@Component("savePeriodActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SavePeriodActor extends UntypedAbstractActor {
 
  public static final String ACTOR_NAME = "savePeriodActor";
  LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  private final ActorSystem actorSystem;
  private Cancellable timer;
  private ActorSelection calculateActor;
  private ConcurrentHashMap<String, SaveMessage> cacheData = new ConcurrentHashMap<>();
 
  public SavePeriodActor(ActorSystem actorSystem) {
    this.actorSystem = actorSystem;
  }
 
  @Override
  public void preStart() throws Exception {
    super.preStart();
    calculateActor = getContext()
        .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME));
    FiniteDuration interval = Duration.create(10, TimeUnit.SECONDS);
    FiniteDuration delay = Duration.Zero();
    timer = actorSystem.scheduler()
        .scheduleAtFixedRate(delay, interval, self(), Message.TIMER, actorSystem.dispatcher(),
            self());
  }
 
  @Override
  public void postStop() throws Exception {
    super.postStop();
    if (timer != null) {
      timer.cancel();
    }
  }
 
  @Override
  public void onReceive(Object message) {
    if (message instanceof Message) {
      if (message == Message.TIMER) {
        doSave();
      }
    } else if (message instanceof SaveMessage) {
      SaveMessage saveMessage = (SaveMessage) message;
      String key = String.format("%s:%s", saveMessage.getIndexId(), saveMessage.getTimeCode());
      cacheData.put(key, saveMessage);
    }
  }
 
  private void doSave() {
    if (!cacheData.isEmpty()) {
      List<SaveMessage> saveData = new ArrayList<>();
      List<String> needRemoveKeys = new ArrayList<>();
      List<CalculateMessage> postIndexMessage = new ArrayList<>();
      cacheData.forEach((key, value) -> {
        needRemoveKeys.add(key);
        saveData.add(value);
        value.getPostActorIds().forEach(actorId -> {
          if (!equalsIgnoreCase(actorId,
              ActorUtil.buildActorId(value.getIndexId(), value.getTimeType()))) {
            CalculateMessage message =
                new CalculateMessage(actorId, value.getTimeCode(),
                    value.getTimeType(),
                    ExecuteType.TIMER);
            postIndexMessage.add(message);
          }
        });
      });
 
      for (String key : needRemoveKeys) {
        cacheData.remove(key);
      }
 
      savePeriodData(saveData);
      postIndexMessage.forEach(message -> calculateActor.tell(message, getSelf()));
    }
  }
 
  private void savePeriodData(List<SaveMessage> saveData) {
    List<DataItem> dataItems = new ArrayList<>();
    saveData.forEach(data -> {
      String timeCode = data.getTimeCode();
      DataItem dataItem = new DataItem();
      dataItem.setIndexId(data.getIndexId());
      dataItem.setTimeCode(timeCode);
      dataItem.setTimeType(data.getTimeType());
      dataItem.setBeginTime(TimeManager.getBeginTime(timeCode));
      dataItem.setEndTime(TimeManager.getEndTime(timeCode));
      dataItem.setDataTime(TimeManager.getTime(timeCode));
      dataItem.setValue(data.getValue());
      dataItems.add(dataItem);
    });
 
    try {
      ServiceProvicer.getPeriodDataService().save(dataItems);
    } catch (Exception ex) {
      log.error("批量保存失败!" + ex.getMessage());
      dataItems.forEach(item -> {
        try {
          if (item != null) {
            ServiceProvicer.getPeriodDataService().save(item);
          }
        } catch (Exception singleEx) {
          log.error("单个保存失败!" + singleEx.getMessage());
        }
      });
    }
 
    log.error(DateTime.now().toString("yyyy-MM-dd HH:mm:ss.SSS"));
  }
 
  private enum Message {
    /**
     * 时间触发
     */
    TIMER
  }
}