package com.dingzhuo.compute.engine.actor.indexcalc; import static org.apache.commons.lang3.StringUtils.equalsIgnoreCase; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.UntypedAbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import com.dingzhuo.compute.engine.message.ExecuteType; import com.dingzhuo.compute.engine.message.calculation.CalculateMessage; import com.dingzhuo.compute.engine.message.save.SaveMessage; import com.dingzhuo.compute.engine.utils.ActorUtil; import com.dingzhuo.compute.engine.utils.ServiceProvicer; import com.dingzhuo.energy.common.utils.time.TimeManager; import com.dingzhuo.energy.dataservice.domain.DataItem; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.joda.time.DateTime; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @Component("savePeriodActor") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class SavePeriodActor extends UntypedAbstractActor { public static final String ACTOR_NAME = "savePeriodActor"; LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); private final ActorSystem actorSystem; private Cancellable timer; private ActorSelection calculateActor; private ConcurrentHashMap cacheData = new ConcurrentHashMap<>(); public SavePeriodActor(ActorSystem actorSystem) { this.actorSystem = actorSystem; } @Override public void preStart() throws Exception { super.preStart(); calculateActor = getContext() .actorSelection(ActorUtil.getActorAddress(CalculationIndexActor.ACTOR_NAME)); FiniteDuration interval = Duration.create(10, TimeUnit.SECONDS); FiniteDuration delay = Duration.Zero(); timer = actorSystem.scheduler() .scheduleAtFixedRate(delay, interval, self(), Message.TIMER, actorSystem.dispatcher(), self()); } @Override public void postStop() throws Exception { super.postStop(); if (timer != null) { timer.cancel(); } } @Override public void onReceive(Object message) { if (message instanceof Message) { if (message == Message.TIMER) { doSave(); } } else if (message instanceof SaveMessage) { SaveMessage saveMessage = (SaveMessage) message; String key = String.format("%s:%s", saveMessage.getIndexId(), saveMessage.getTimeCode()); cacheData.put(key, saveMessage); } } private void doSave() { if (!cacheData.isEmpty()) { List saveData = new ArrayList<>(); List needRemoveKeys = new ArrayList<>(); List postIndexMessage = new ArrayList<>(); cacheData.forEach((key, value) -> { needRemoveKeys.add(key); saveData.add(value); value.getPostActorIds().forEach(actorId -> { if (!equalsIgnoreCase(actorId, ActorUtil.buildActorId(value.getIndexId(), value.getTimeType()))) { CalculateMessage message = new CalculateMessage(actorId, value.getTimeCode(), value.getTimeType(), ExecuteType.TIMER); postIndexMessage.add(message); } }); }); for (String key : needRemoveKeys) { cacheData.remove(key); } savePeriodData(saveData); postIndexMessage.forEach(message -> calculateActor.tell(message, getSelf())); } } private void savePeriodData(List saveData) { List dataItems = new ArrayList<>(); saveData.forEach(data -> { String timeCode = data.getTimeCode(); DataItem dataItem = new DataItem(); dataItem.setIndexId(data.getIndexId()); dataItem.setTimeCode(timeCode); dataItem.setTimeType(data.getTimeType()); dataItem.setBeginTime(TimeManager.getBeginTime(timeCode)); dataItem.setEndTime(TimeManager.getEndTime(timeCode)); dataItem.setDataTime(TimeManager.getTime(timeCode)); dataItem.setValue(data.getValue()); dataItems.add(dataItem); }); try { ServiceProvicer.getPeriodDataService().save(dataItems); } catch (Exception ex) { log.error("批量保存失败!" + ex.getMessage()); dataItems.forEach(item -> { try { if (item != null) { ServiceProvicer.getPeriodDataService().save(item); } } catch (Exception singleEx) { log.error("单个保存失败!" + singleEx.getMessage()); } }); } log.error(DateTime.now().toString("yyyy-MM-dd HH:mm:ss.SSS")); } private enum Message { /** * 时间触发 */ TIMER } }