.gitignore
@@ -1,24 +1,38 @@ # Compiled class file *.class target/ !.mvn/wrapper/maven-wrapper.jar !**/src/main/**/target/ !**/src/test/**/target/ # Log file *.log ### IntelliJ IDEA ### .idea/modules.xml .idea/jarRepositories.xml .idea/compiler.xml .idea/libraries/ *.iws *.iml *.ipr # BlueJ files *.ctxt ### Eclipse ### .apt_generated .classpath .factorypath .project .settings .springBeans .sts4-cache # Mobile Tools for Java (J2ME) .mtj.tmp/ ### NetBeans ### /nbproject/private/ /nbbuild/ /dist/ /nbdist/ /.nb-gradle/ build/ !**/src/main/**/build/ !**/src/test/**/build/ # Package Files # *.jar *.war *.nar *.ear *.zip *.tar.gz *.rar ### VS Code ### .vscode/ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* replay_pid* ### Mac OS ### .DS_Store pom.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,143 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhitan</groupId> <artifactId>MQTTGateway</artifactId> <version>1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.15</version> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <integration.version>3.4.3</integration.version> <druid.version>1.2.20</druid.version> <mybatis-plus.version>3.5.6</mybatis-plus.version> <influxdb-client.version>6.6.0</influxdb-client.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <artifactId>spring-boot-starter-tomcat</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- é¿éæ°æ®åºè¿æ¥æ± --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis-plus.version}</version> </dependency> <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>${influxdb-client.version}</version> </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib</artifactId> <version>1.8.20</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> <version>26.0.2</version> <scope>compile</scope> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>https://maven.aliyun.com/repository/central</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>ali-public</id> <name>ali-public</name> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.5.15</version> <configuration> <fork>true</fork> <!-- å¦ææ²¡æè¯¥é ç½®ï¼devtoolsä¸ä¼çæ --> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> <finalName>${project.artifactId}</finalName> </build> </project> src/main/java/com/zhitan/MQTTGatewayApplication.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,17 @@ package com.zhitan; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * application */ @SpringBootApplication @MapperScan(basePackages = {"com.zhitan.mapper"}) public class MQTTGatewayApplication { public static void main(String[] args) { SpringApplication.run(MQTTGatewayApplication.class, args); System.out.println("MQTT Gateway Application Started"); } } src/main/java/com/zhitan/config/DruidConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,82 @@ package com.zhitan.config; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import com.alibaba.druid.spring.boot.autoconfigure.properties.DruidStatProperties; import com.alibaba.druid.util.Utils; import com.zhitan.config.properties.DruidProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.servlet.*; import javax.sql.DataSource; import java.io.IOException; /** * druid é ç½®å¤æ°æ®æº * * @author zhitan */ @Configuration public class DruidConfig { @Bean @ConfigurationProperties("spring.datasource.druid.master") public DataSource masterDataSource(DruidProperties druidProperties) { DruidDataSource dataSource = DruidDataSourceBuilder.create().build(); return druidProperties.dataSource(dataSource); } @Bean @ConfigurationProperties("spring.datasource.druid.slave") @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true") public DataSource slaveDataSource(DruidProperties druidProperties) { DruidDataSource dataSource = DruidDataSourceBuilder.create().build(); return druidProperties.dataSource(dataSource); } /** * å»é¤çæ§é¡µé¢åºé¨ç广å */ @SuppressWarnings({"rawtypes", "unchecked"}) @Bean @ConditionalOnProperty(name = "spring.datasource.druid.statViewServlet.enabled", havingValue = "true") public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties) { // è·åwebçæ§é¡µé¢çåæ° DruidStatProperties.StatViewServlet config = properties.getStatViewServlet(); // æåcommon.jsçé ç½®è·¯å¾ String pattern = config.getUrlPattern() != null ? config.getUrlPattern() : "/druid/*"; String commonJsPattern = pattern.replaceAll("\\*", "js/common.js"); final String filePath = "support/http/resources/js/common.js"; // å建filterè¿è¡è¿æ»¤ Filter filter = new Filter() { @Override public void init(javax.servlet.FilterConfig filterConfig) throws ServletException { } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { chain.doFilter(request, response); // éç½®ç¼å²åºï¼ååºå¤´ä¸ä¼è¢«éç½® response.resetBuffer(); // è·åcommon.js String text = Utils.readFromResource(filePath); // æ£åæ¿æ¢banner, é¤å»åºé¨ç广åä¿¡æ¯ text = text.replaceAll("<a.*?banner\"></a><br/>", ""); text = text.replaceAll("powered.*?shrek.wang</a>", ""); response.getWriter().write(text); } @Override public void destroy() { } }; FilterRegistrationBean registrationBean = new FilterRegistrationBean(); registrationBean.setFilter(filter); registrationBean.addUrlPatterns(commonJsPattern); return registrationBean; } } src/main/java/com/zhitan/config/influxdb/InfluxdbConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,17 @@ package com.zhitan.config.influxdb; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "influxdb") public class InfluxdbConfig { private String host; private String org; private String bucket; private String token; private String measurement; private boolean enable; } src/main/java/com/zhitan/config/mqtt/MqttConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,42 @@ package com.zhitan.config.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @Configuration public class MqttConfig { @Value("${spring.mqtt.broker-url}") private String brokerUrl; @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.timeout}") private int timeout; @Value("${spring.mqtt.keep-alive}") private int keepAlive; // é ç½®MQTT客æ·ç«¯å·¥å @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{brokerUrl}); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepAlive); options.setAutomaticReconnect(true); // èªå¨éè¿ factory.setConnectionOptions(options); return factory; } } src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,44 @@ package com.zhitan.config.mqtt; import com.zhitan.handler.MqttMessageHandler; import com.zhitan.service.IDataService; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; @Configuration public class MqttInboundConfig { private final MqttPahoClientFactory mqttClientFactory; private final IDataService dataService; @Value("${spring.mqtt.client-id}") private String clientId; @Value("${spring.mqtt.default-topic}") private String defaultTopic; public MqttInboundConfig(MqttPahoClientFactory mqttClientFactory, IDataService dataService) { this.mqttClientFactory = mqttClientFactory; this.dataService = dataService; } // è®¢é æ¶æ¯éé å¨ @Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter() { return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic); } // å®ä¹æ¶æ¯å¤çæµ @Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(inboundAdapter()) .handle(new MqttMessageHandler(dataService)) .get(); } } src/main/java/com/zhitan/config/mybatis/MyBatisPlusConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,44 @@ package com.zhitan.config.mybatis; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.BlockAttackInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.annotation.EnableTransactionManagement; /** * mybatis plusé ç½® */ @Configuration @EnableTransactionManagement(proxyTargetClass = true) public class MyBatisPlusConfig { /** * ä¸ç¼åäºç¼éµå¾ªmybatisçè§å,éè¦è®¾ç½® * MybatisConfiguration#useDeprecatedExecutor = false é¿å ç¼åä¸ä¸åºç°é®é¢ */ @Bean public MybatisPlusInterceptor paginationInterceptor() { MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); // ä¹è§éæä»¶ interceptor.addInnerInterceptor(optimisticLockerInnerInterceptor()); // 黿æä»¶ interceptor.addInnerInterceptor(blockAttackInnerInterceptor()); return interceptor; } /** * ä¹è§éæä»¶ https://baomidou.com/guide/interceptor-optimistic-locker.html */ public OptimisticLockerInnerInterceptor optimisticLockerInnerInterceptor() { return new OptimisticLockerInnerInterceptor(); } /** * 妿æ¯å¯¹å ¨è¡¨çå é¤ææ´æ°æä½ï¼å°±ä¼ç»æ¢è¯¥æä½ https://baomidou.com/guide/interceptor-block-attack.html */ public BlockAttackInnerInterceptor blockAttackInnerInterceptor() { return new BlockAttackInnerInterceptor(); } } src/main/java/com/zhitan/config/properties/DruidProperties.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,89 @@ package com.zhitan.config.properties; import com.alibaba.druid.pool.DruidDataSource; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * druid é ç½®å±æ§ * * @author zhitan */ @Configuration public class DruidProperties { @Value("${spring.datasource.druid.initialSize}") private int initialSize; @Value("${spring.datasource.druid.minIdle}") private int minIdle; @Value("${spring.datasource.druid.maxActive}") private int maxActive; @Value("${spring.datasource.druid.maxWait}") private int maxWait; @Value("${spring.datasource.druid.connectTimeout}") private int connectTimeout; @Value("${spring.datasource.druid.socketTimeout}") private int socketTimeout; @Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}") private int timeBetweenEvictionRunsMillis; @Value("${spring.datasource.druid.minEvictableIdleTimeMillis}") private int minEvictableIdleTimeMillis; @Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}") private int maxEvictableIdleTimeMillis; @Value("${spring.datasource.druid.validationQuery}") private String validationQuery; @Value("${spring.datasource.druid.testWhileIdle}") private boolean testWhileIdle; @Value("${spring.datasource.druid.testOnBorrow}") private boolean testOnBorrow; @Value("${spring.datasource.druid.testOnReturn}") private boolean testOnReturn; public DruidDataSource dataSource(DruidDataSource datasource) { /* é ç½®åå§å大å°ãæå°ãæå¤§ */ datasource.setInitialSize(initialSize); datasource.setMaxActive(maxActive); datasource.setMinIdle(minIdle); /* é ç½®è·åè¿æ¥çå¾ è¶ æ¶çæ¶é´ */ datasource.setMaxWait(maxWait); /* é 置驱å¨è¿æ¥è¶ æ¶æ¶é´ï¼æ£æµæ°æ®åºå»ºç«è¿æ¥çè¶ æ¶æ¶é´ï¼å使¯æ¯«ç§ */ datasource.setConnectTimeout(connectTimeout); /* é ç½®ç½ç»è¶ æ¶æ¶é´ï¼çå¾ æ°æ®åºæä½å®æçç½ç»è¶ æ¶æ¶é´ï¼å使¯æ¯«ç§ */ datasource.setSocketTimeout(socketTimeout); /* é ç½®é´éå¤ä¹ æè¿è¡ä¸æ¬¡æ£æµï¼æ£æµéè¦å ³éç空é²è¿æ¥ï¼å使¯æ¯«ç§ */ datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); /* é ç½®ä¸ä¸ªè¿æ¥å¨æ± 䏿å°ãæå¤§çåçæ¶é´ï¼å使¯æ¯«ç§ */ datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis); /* * ç¨æ¥æ£æµè¿æ¥æ¯å¦ææçsqlï¼è¦æ±æ¯ä¸ä¸ªæ¥è¯¢è¯å¥ï¼å¸¸ç¨select 'x'ã * 妿validationQuery为nullï¼testOnBorrowãtestOnReturnãtestWhileIdleé½ä¸ä¼èµ·ä½ç¨ã */ datasource.setValidationQuery(validationQuery); /* 建议é 置为trueï¼ä¸å½±åæ§è½ï¼å¹¶ä¸ä¿è¯å®å ¨æ§ãç³è¯·è¿æ¥çæ¶åæ£æµï¼ å¦æç©ºé²æ¶é´å¤§äºtimeBetweenEvictionRunsMillisï¼æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææã */ datasource.setTestWhileIdle(testWhileIdle); /* ç³è¯·è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªé ç½®ä¼é使§è½ã */ datasource.setTestOnBorrow(testOnBorrow); /* å½è¿è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªé ç½®ä¼é使§è½ã */ datasource.setTestOnReturn(testOnReturn); return datasource; } } src/main/java/com/zhitan/config/redis/RedisConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,42 @@ package com.zhitan.config.redis; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); // ä½¿ç¨ Jackson2JsonRedisSerializer åºååå¼ Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); // 设置 Key å Value çåºååè§å // Key 使ç¨å符串åºåå template.setKeySerializer(new StringRedisSerializer()); // Value ä½¿ç¨ JSON åºåå template.setValueSerializer(serializer); // Hash Key 使ç¨å符串åºåå template.setHashKeySerializer(new StringRedisSerializer()); // Hash Value ä½¿ç¨ JSON åºå template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } } src/main/java/com/zhitan/handler/MqttMessageHandler.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,40 @@ package com.zhitan.handler; import com.fasterxml.jackson.databind.ObjectMapper; import com.zhitan.model.entity.ElectricPower; import com.zhitan.service.IDataService; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * mqtt æ¶æ¯å¤çç±» */ @Slf4j public class MqttMessageHandler implements MessageHandler { private final IDataService dataService; public MqttMessageHandler(IDataService dataService) { this.dataService = dataService; } @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); String payload = (String) message.getPayload(); System.out.println("Received message from topic " + topic + ": " + payload); // dataService.writeTimeSeriesData(payload); // ObjectMapper objectMapper = new ObjectMapper(); try { // å° JSON å符串转æ¢ä¸º SensorData 对象 ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class); dataService.writeTimeSeriesData(electricPower); } catch (Exception e) { log.error(e.getMessage()); } } } src/main/java/com/zhitan/influxdb/InfluxdbRepository.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,76 @@ package com.zhitan.influxdb; import com.influxdb.LogLevel; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; import java.util.List; /** * influxdbçåºç¡æå¡ * * @author Silence * @version 1.0 */ @Slf4j @Repository public class InfluxdbRepository { protected InfluxdbConfig config; protected InfluxDBClient client; @Autowired public InfluxdbRepository(InfluxdbConfig config) { this.config = config; init(); } /** * åå§å */ private void init() { if (config.isEnable()) { if (null == client) { client = InfluxDBClientFactory.create(config.getHost(), config.getToken().toCharArray(), config.getOrg(), config.getBucket()) .enableGzip() .setLogLevel(LogLevel.BASIC); } if (!client.ping()) { log.error("宿¶åºè¿æ¥å¤±è´¥"); } else { log.info("宿¶åºè¿æ¥æå"); } } else { log.debug("æ¶åºåºä¸å¯ç¨"); } } /** * åå ¥å个ç¹ä½ */ public void writePoint(Point point) { if (null == point) { return; } WriteApiBlocking writeApi = client.getWriteApiBlocking(); writeApi.writePoint(point); } /** * åå ¥å¤ä¸ªç¹ä½ */ public void writePoints(List<Point> points) { if (null == points || points.isEmpty()) { return; } WriteApiBlocking writeApi = client.getWriteApiBlocking(); writeApi.writePoints(points); } } src/main/java/com/zhitan/mapper/CommonMapper.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,17 @@ package com.zhitan.mapper; import com.zhitan.model.IndexTemplate; import org.apache.ibatis.annotations.Mapper; import java.util.List; /** * common mapper */ public interface CommonMapper { /** * è·åç¹ä½æ¨¡æ¿ */ List<IndexTemplate> getIndexTemplate(); } src/main/java/com/zhitan/model/IndexTemplate.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,26 @@ package com.zhitan.model; import lombok.Data; /** * æ¨¡æ¿ */ @Data public class IndexTemplate { /** * code */ private String code; /** * name */ private String name; /** * deviceType */ private Integer deviceType; /** * gatewayKey */ private String gatewayKey; } src/main/java/com/zhitan/model/entity/ElectricPower.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,84 @@ package com.zhitan.model.entity; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; /** * çµè¡¨çæ°æ®ç»æ */ @Data public class ElectricPower { @JsonProperty("SN") private String sn; @JsonProperty("Pt") private double pt; @JsonProperty("Ua") private double ua; @JsonProperty("Ub") private double ub; @JsonProperty("Uc") private double uc; @JsonProperty("Uab") private double uab; @JsonProperty("Ubc") private double ubc; @JsonProperty("Uca") private double uca; @JsonProperty("Ia") private double ia; @JsonProperty("Ib") private double ib; @JsonProperty("Ic") private double ic; @JsonProperty("Pw") private double pw; @JsonProperty("Pwa") private double pwa; @JsonProperty("Pwb") private double pwb; @JsonProperty("Pwc") private double pwc; @JsonProperty("Pq") private double pq; @JsonProperty("Pqa") private double pqa; @JsonProperty("Pqb") private double pqb; @JsonProperty("Pqc") private double pqc; @JsonProperty("Q") private double q; @JsonProperty("Qa") private double qa; @JsonProperty("Qb") private double qb; @JsonProperty("Qc") private double qc; @JsonProperty("Time") private String time; @JsonProperty("Type") private int type; } src/main/java/com/zhitan/redis/RedisCache.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,183 @@ package com.zhitan.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.*; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; /** * spring redis å·¥å ·ç±» * * @author ruoyi **/ @SuppressWarnings(value = {"unchecked", "rawtypes"}) @Component public class RedisCache { private final RedisTemplate redisTemplate; @Autowired public RedisCache(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * ç¼ååºæ¬ç对象ï¼IntegerãStringãå®ä½ç±»ç * * @param key ç¼åçé®å¼ * @param value ç¼åçå¼ * @return ç¼åç对象 */ public <T> ValueOperations<String, T> setCacheObject(String key, T value) { ValueOperations<String, T> operation = redisTemplate.opsForValue(); operation.set(key, value); return operation; } /** * ç¼ååºæ¬ç对象ï¼IntegerãStringãå®ä½ç±»ç * * @param key ç¼åçé®å¼ * @param value ç¼åçå¼ * @param timeout æ¶é´ * @param timeUnit æ¶é´é¢ç²åº¦ * @return ç¼åç对象 */ public <T> ValueOperations<String, T> setCacheObject(String key, T value, Integer timeout, TimeUnit timeUnit) { ValueOperations<String, T> operation = redisTemplate.opsForValue(); operation.set(key, value, timeout, timeUnit); return operation; } /** * è·å¾ç¼åçåºæ¬å¯¹è±¡ã * * @param key ç¼åé®å¼ * @return ç¼åé®å¼å¯¹åºçæ°æ® */ public <T> T getCacheObject(String key) { ValueOperations<String, T> operation = redisTemplate.opsForValue(); return operation.get(key); } /** * å é¤å个对象 * * @param key */ public void deleteObject(String key) { redisTemplate.delete(key); } /** * å é¤éå对象 * * @param collection */ public void deleteObject(Collection collection) { redisTemplate.delete(collection); } /** * ç¼åListæ°æ® * * @param key ç¼åçé®å¼ * @param dataList å¾ ç¼åçListæ°æ® * @return ç¼åç对象 */ public <T> ListOperations<String, T> setCacheList(String key, List<T> dataList) { ListOperations listOperation = redisTemplate.opsForList(); if (null != dataList) { int size = dataList.size(); for (int i = 0; i < size; i++) { listOperation.leftPush(key, dataList.get(i)); } } return listOperation; } /** * è·å¾ç¼åçlist对象 * * @param key ç¼åçé®å¼ * @return ç¼åé®å¼å¯¹åºçæ°æ® */ public <T> List<T> getCacheList(String key) { List<T> dataList = new ArrayList<T>(); ListOperations<String, T> listOperation = redisTemplate.opsForList(); Long size = listOperation.size(key); for (int i = 0; i < size; i++) { dataList.add(listOperation.index(key, i)); } return dataList; } /** * ç¼åSet * * @param key ç¼åé®å¼ * @param dataSet ç¼åçæ°æ® * @return ç¼åæ°æ®ç对象 */ public <T> BoundSetOperations<String, T> setCacheSet(String key, Set<T> dataSet) { BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key); Iterator<T> it = dataSet.iterator(); while (it.hasNext()) { setOperation.add(it.next()); } return setOperation; } /** * è·å¾ç¼åçset * * @param key * @return */ public <T> Set<T> getCacheSet(String key) { Set<T> dataSet = new HashSet<T>(); BoundSetOperations<String, T> operation = redisTemplate.boundSetOps(key); dataSet = operation.members(); return dataSet; } /** * ç¼åMap * * @param key * @param dataMap * @return */ public <T> HashOperations<String, String, T> setCacheMap(String key, Map<String, T> dataMap) { HashOperations hashOperations = redisTemplate.opsForHash(); if (null != dataMap) { for (Map.Entry<String, T> entry : dataMap.entrySet()) { hashOperations.put(key, entry.getKey(), entry.getValue()); } } return hashOperations; } /** * è·å¾ç¼åçMap * * @param key * @return */ public <T> Map<String, T> getCacheMap(String key) { Map<String, T> map = redisTemplate.opsForHash().entries(key); return map; } /** * è·å¾ç¼åçåºæ¬å¯¹è±¡å表 * * @param pattern å符串åç¼ * @return 对象å表 */ public Collection<String> keys(String pattern) { return redisTemplate.keys(pattern); } } src/main/java/com/zhitan/service/IDataService.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,24 @@ package com.zhitan.service; import com.zhitan.model.entity.ElectricPower; import org.jetbrains.annotations.NotNull; /** * æ°æ®service */ public interface IDataService { /** * åå ¥æ°æ®-éå卿åå ¥keyå¯ä¸ç * * @param jsonString jsonæ ¼å¼çæ°æ® */ void writeTimeSeriesData(@NotNull String jsonString); /** * åå ¥çµåç¸å ³æ°æ®-åºå®æ ¼å¼ï¼å¯èªå®ä¹ä¿®æ¹ * * @param electricPower åºå®æ ¼å¼çæ°æ® */ void writeTimeSeriesData(@NotNull ElectricPower electricPower); } src/main/java/com/zhitan/service/impl/DataServiceImpl.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,133 @@ package com.zhitan.service.impl; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import com.zhitan.model.IndexTemplate; import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; import com.zhitan.redis.RedisCache; import com.zhitan.service.IDataService; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.Field; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * æ°æ®service */ @Slf4j @Service public class DataServiceImpl implements IDataService { private final String TAG = "tag"; private final String FIELD_VALUE = "value"; private final InfluxdbRepository repository; private final InfluxdbConfig influxdbConfig; private final CommonMapper commonMapper; private final RedisCache redisCache; @Autowired public DataServiceImpl(InfluxdbRepository repository, InfluxdbConfig influxdbConfig, CommonMapper commonMapper, RedisCache redisCache) { this.repository = repository; this.influxdbConfig = influxdbConfig; this.commonMapper = commonMapper; this.redisCache = redisCache; } /** * åå ¥æ¶åºæ°æ® * * @param jsonString jsonæ°æ® */ @Override public void writeTimeSeriesData(@NotNull String jsonString) { if (jsonString.isEmpty()) { return; } List<Point> points = new ArrayList<>(); JsonObject jsonObject = JsonParser.parseString(jsonString).getAsJsonObject(); for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) { String key = entry.getKey(); JsonElement value = entry.getValue(); JsonPrimitive primitive = value.getAsJsonPrimitive(); Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, key) .time(Instant.now(), WritePrecision.S); if (primitive.isJsonPrimitive()) { if (primitive.isNumber()) { point.addField(FIELD_VALUE, value.getAsDouble()); points.add(point); } else if (primitive.isString()) { //point.addField(FIELD_VALUE, value.getAsString()); } else if (primitive.isBoolean()) { //point.addField(FIELD_VALUE, value.getAsBoolean()); } } } repository.writePoints(points); } /** * åå ¥çµåç¸å ³æ°æ®-åºå®æ ¼å¼ï¼å¯èªå®ä¹ä¿®æ¹ * * @param electricPower åºå®æ ¼å¼çæ°æ® */ @Override public void writeTimeSeriesData(@NotNull ElectricPower electricPower) { List<IndexTemplate> templates = getIndexTemplate(); // è·åç±»ä¸ææå£°æçåæ®µ Field[] fields = electricPower.getClass().getDeclaredFields(); List<Point> points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); // è®¾ç½®åæ®µå¯è®¿é®ï¼å 许访é®ç§æå段 field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // è·ååæ®µå¼ double value = field.getDouble(electricPower); point.addField(FIELD_VALUE, value); points.add(point); } catch (IllegalAccessException e) { log.error("è·å屿§å¼å¤±è´¥:{}", e.getMessage()); } } } } repository.writePoints(points); } /** * è·åç¹ä½æ¨¡æ¿ */ protected List<IndexTemplate> getIndexTemplate() { String TEMPLATE_KEY = "template"; List<IndexTemplate> result = redisCache.getCacheList(TEMPLATE_KEY); if (result == null || result.isEmpty()) { result = commonMapper.getIndexTemplate(); redisCache.setCacheList(TEMPLATE_KEY, result); } return result; } } src/main/resources/application-dev.yml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,78 @@ spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver druid: master: url: jdbc:postgresql://localhost:5432/energy username: postgres password: postgres # åå§è¿æ¥æ° initialSize: 5 # æå°è¿æ¥æ± æ°é minIdle: 10 # æå¤§è¿æ¥æ± æ°é maxActive: 20 # é ç½®è·åè¿æ¥çå¾ è¶ æ¶çæ¶é´ maxWait: 60000 # é ç½®è¿æ¥è¶ æ¶æ¶é´ connectTimeout: 30000 # é ç½®ç½ç»è¶ æ¶æ¶é´ socketTimeout: 60000 # é ç½®é´éå¤ä¹ æè¿è¡ä¸æ¬¡æ£æµï¼æ£æµéè¦å ³éç空é²è¿æ¥ï¼å使¯æ¯«ç§ timeBetweenEvictionRunsMillis: 60000 # é ç½®ä¸ä¸ªè¿æ¥å¨æ± 䏿å°çåçæ¶é´ï¼å使¯æ¯«ç§ minEvictableIdleTimeMillis: 300000 # é ç½®ä¸ä¸ªè¿æ¥å¨æ± 䏿大çåçæ¶é´ï¼å使¯æ¯«ç§ maxEvictableIdleTimeMillis: 900000 # é ç½®æ£æµè¿æ¥æ¯å¦ææ validationQuery: SELECT 1 testWhileIdle: true testOnBorrow: false testOnReturn: false webStatFilter: enabled: true statViewServlet: enabled: true # 设置ç½ååï¼ä¸å¡«åå 许ææè®¿é® allow: url-pattern: /druid/* # æ§å¶å°ç®¡çç¨æ·ååå¯ç login-username: admin login-password: 123456 filter: stat: enabled: true # æ ¢SQLè®°å½ log-slow-sql: true slow-sql-millis: 1000 merge-sql: true wall: config: multi-statement-allow: true mqtt: # èªå®ä¹ client-id: 202503181042 # å ¬å ±MQTTæå¡å¨ï¼ç产ç¯å¢éè¦æ¿æ¢ä¸ºèªå·±ç broker-url: tcp://broker.emqx.io username: password: #é»è®¤è®¢é çä¸»é¢ default-topic: zhitan timeout: 30 keep-alive: 60 redis: database: 0 host: localhost port: 6379 password: influxdb: host: "http://localhost:8086" #ä¿®æ¹ä¸ºèªå·±çæ¶åºåºè®¿é®org org: "org" #ä¿®æ¹ä¸ºèªå·±çæ¶åºåºbucket bucket: "bucket" #ä¿®æ¹ä¸ºèªå·±çæ¶åºåºè®¿é®token token: "token" measurement: data enable: true src/main/resources/application.yml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,24 @@ spring: profiles: active: dev # æå¡æ¨¡å devtools: restart: # çé¨ç½²å¼å ³ enabled: false # æ¥å¿é ç½® logging: level: com.zhitan: debug org.springframework: warn mybatis-plus: type-aliases-package: com.zhitan.entity mapper-locations: classpath*:/mapper/*Mapper.xml # å è½½å ¨å±çé ç½®æä»¶ config-location: classpath:/mybatis/mybatis-config.xml global-config: db-config: logic-delete-field: del_flag # å ¨å±é»è¾å é¤å段å logic-delete-value: 2 # é»è¾å·²å é¤å¼ logic-not-delete-value: 0 # é»è¾æªå é¤å¼ src/main/resources/mapper/CommonMapper.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,11 @@ <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.zhitan.mapper.CommonMapper"> <select id="getIndexTemplate" resultType="com.zhitan.model.IndexTemplate"> SELECT code, "name", device_type, gateway_key FROM daq_template; </select> </mapper> src/main/resources/mybatis/mybatis-config.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,20 @@ <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <!-- å ¨å±åæ° --> <settings> <!-- ä½¿å ¨å±çæ å°å¨å¯ç¨æç¦ç¨ç¼å --> <setting name="cacheEnabled" value="true"/> <!-- å 许JDBC æ¯æèªå¨çæä¸»é® --> <setting name="useGeneratedKeys" value="true"/> <!-- é ç½®é»è®¤çæ§è¡å¨.SIMPLEå°±æ¯æ®éæ§è¡å¨;REUSEæ§è¡å¨ä¼éç¨é¢å¤çè¯å¥(prepared statements);BATCHæ§è¡å¨å°éç¨è¯å¥å¹¶æ§è¡æ¹éæ´æ° --> <setting name="defaultExecutorType" value="SIMPLE"/> <!-- æå® MyBatis æç¨æ¥å¿çå ·ä½å®ç° --> <setting name="logImpl" value="SLF4J"/> <!-- 使ç¨é©¼å³°å½åæ³è½¬æ¢å段 --> <setting name="mapUnderscoreToCamelCase" value="true"/> </settings> </configuration>