CoderSilence
2025-03-31 1d92915d0a858a43677a4c7d0d67795e043d2b43
first commit
已添加21个文件
已修改1个文件
1288 ■■■■■ 文件已修改
.gitignore 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/MQTTGatewayApplication.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/DruidConfig.java 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/influxdb/InfluxdbConfig.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/mqtt/MqttConfig.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/mybatis/MyBatisPlusConfig.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/properties/DruidProperties.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/redis/RedisConfig.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/handler/MqttMessageHandler.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/influxdb/InfluxdbRepository.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/mapper/CommonMapper.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/model/IndexTemplate.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/model/entity/ElectricPower.java 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/redis/RedisCache.java 183 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/service/IDataService.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/service/impl/DataServiceImpl.java 133 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application.yml 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/mapper/CommonMapper.xml 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/mybatis/mybatis-config.xml 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -1,24 +1,38 @@
# Compiled class file
*.class
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
# Log file
*.log
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
# BlueJ files
*.ctxt
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
# Mobile Tools for Java (J2ME)
.mtj.tmp/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
### VS Code ###
.vscode/
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
replay_pid*
### Mac OS ###
.DS_Store
pom.xml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zhitan</groupId>
    <artifactId>MQTTGateway</artifactId>
    <version>1.0</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.15</version>
    </parent>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <integration.version>3.4.3</integration.version>
        <druid.version>1.2.20</druid.version>
        <mybatis-plus.version>3.5.6</mybatis-plus.version>
        <influxdb-client.version>6.6.0</influxdb-client.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!-- é˜¿é‡Œæ•°æ®åº“连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <dependency>
            <groupId>com.influxdb</groupId>
            <artifactId>influxdb-client-java</artifactId>
            <version>${influxdb-client.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib</artifactId>
            <version>1.8.20</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>26.0.2</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <repositories>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>ali-public</id>
            <name>ali-public</name>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.5.15</version>
                <configuration>
                    <fork>true</fork> <!-- å¦‚果没有该配置,devtools不会生效 -->
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <finalName>${project.artifactId}</finalName>
    </build>
</project>
src/main/java/com/zhitan/MQTTGatewayApplication.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,17 @@
package com.zhitan;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * application
 */
@SpringBootApplication
@MapperScan(basePackages = {"com.zhitan.mapper"})
public class MQTTGatewayApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQTTGatewayApplication.class, args);
        System.out.println("MQTT Gateway Application Started");
    }
}
src/main/java/com/zhitan/config/DruidConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,82 @@
package com.zhitan.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.alibaba.druid.spring.boot.autoconfigure.properties.DruidStatProperties;
import com.alibaba.druid.util.Utils;
import com.zhitan.config.properties.DruidProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.servlet.*;
import javax.sql.DataSource;
import java.io.IOException;
/**
 * druid é…ç½®å¤šæ•°æ®æº
 *
 * @author zhitan
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource masterDataSource(DruidProperties druidProperties) {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(dataSource);
    }
    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave")
    @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
    public DataSource slaveDataSource(DruidProperties druidProperties) {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(dataSource);
    }
    /**
     * åŽ»é™¤ç›‘æŽ§é¡µé¢åº•éƒ¨çš„å¹¿å‘Š
     */
    @SuppressWarnings({"rawtypes", "unchecked"})
    @Bean
    @ConditionalOnProperty(name = "spring.datasource.druid.statViewServlet.enabled", havingValue = "true")
    public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties) {
        // èŽ·å–web监控页面的参数
        DruidStatProperties.StatViewServlet config = properties.getStatViewServlet();
        // æå–common.js的配置路径
        String pattern = config.getUrlPattern() != null ? config.getUrlPattern() : "/druid/*";
        String commonJsPattern = pattern.replaceAll("\\*", "js/common.js");
        final String filePath = "support/http/resources/js/common.js";
        // åˆ›å»ºfilter进行过滤
        Filter filter = new Filter() {
            @Override
            public void init(javax.servlet.FilterConfig filterConfig) throws ServletException {
            }
            @Override
            public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
                    throws IOException, ServletException {
                chain.doFilter(request, response);
                // é‡ç½®ç¼“冲区,响应头不会被重置
                response.resetBuffer();
                // èŽ·å–common.js
                String text = Utils.readFromResource(filePath);
                // æ­£åˆ™æ›¿æ¢banner, é™¤åŽ»åº•éƒ¨çš„å¹¿å‘Šä¿¡æ¯
                text = text.replaceAll("<a.*?banner\"></a><br/>", "");
                text = text.replaceAll("powered.*?shrek.wang</a>", "");
                response.getWriter().write(text);
            }
            @Override
            public void destroy() {
            }
        };
        FilterRegistrationBean registrationBean = new FilterRegistrationBean();
        registrationBean.setFilter(filter);
        registrationBean.addUrlPatterns(commonJsPattern);
        return registrationBean;
    }
}
src/main/java/com/zhitan/config/influxdb/InfluxdbConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,17 @@
package com.zhitan.config.influxdb;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "influxdb")
public class InfluxdbConfig {
    private String host;
    private String org;
    private String bucket;
    private String token;
    private String measurement;
    private boolean enable;
}
src/main/java/com/zhitan/config/mqtt/MqttConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,42 @@
package com.zhitan.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttConfig {
    @Value("${spring.mqtt.broker-url}")
    private String brokerUrl;
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.timeout}")
    private int timeout;
    @Value("${spring.mqtt.keep-alive}")
    private int keepAlive;
    // é…ç½®MQTT客户端工厂
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepAlive);
        options.setAutomaticReconnect(true); // è‡ªåŠ¨é‡è¿ž
        factory.setConnectionOptions(options);
        return factory;
    }
}
src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,44 @@
package com.zhitan.config.mqtt;
import com.zhitan.handler.MqttMessageHandler;
import com.zhitan.service.IDataService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@Configuration
public class MqttInboundConfig {
    private final MqttPahoClientFactory mqttClientFactory;
    private final IDataService dataService;
    @Value("${spring.mqtt.client-id}")
    private String clientId;
    @Value("${spring.mqtt.default-topic}")
    private String defaultTopic;
    public MqttInboundConfig(MqttPahoClientFactory mqttClientFactory, IDataService dataService) {
        this.mqttClientFactory = mqttClientFactory;
        this.dataService = dataService;
    }
    // è®¢é˜…消息适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter() {
        return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic);
    }
    // å®šä¹‰æ¶ˆæ¯å¤„理流
    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(inboundAdapter())
                .handle(new MqttMessageHandler(dataService))
                .get();
    }
}
src/main/java/com/zhitan/config/mybatis/MyBatisPlusConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,44 @@
package com.zhitan.config.mybatis;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.BlockAttackInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
 * mybatis plus配置
 */
@Configuration
@EnableTransactionManagement(proxyTargetClass = true)
public class MyBatisPlusConfig {
    /**
     * ä¸€ç¼“和二缓遵循mybatis的规则,需要设置
     * MybatisConfiguration#useDeprecatedExecutor = false é¿å…ç¼“存万一出现问题
     */
    @Bean
    public MybatisPlusInterceptor paginationInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        // ä¹è§‚锁插件
        interceptor.addInnerInterceptor(optimisticLockerInnerInterceptor());
        // é˜»æ–­æ’ä»¶
        interceptor.addInnerInterceptor(blockAttackInnerInterceptor());
        return interceptor;
    }
    /**
     * ä¹è§‚锁插件 https://baomidou.com/guide/interceptor-optimistic-locker.html
     */
    public OptimisticLockerInnerInterceptor optimisticLockerInnerInterceptor() {
        return new OptimisticLockerInnerInterceptor();
    }
    /**
     * å¦‚果是对全表的删除或更新操作,就会终止该操作 https://baomidou.com/guide/interceptor-block-attack.html
     */
    public BlockAttackInnerInterceptor blockAttackInnerInterceptor() {
        return new BlockAttackInnerInterceptor();
    }
}
src/main/java/com/zhitan/config/properties/DruidProperties.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,89 @@
package com.zhitan.config.properties;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
 * druid é…ç½®å±žæ€§
 *
 * @author zhitan
 */
@Configuration
public class DruidProperties {
    @Value("${spring.datasource.druid.initialSize}")
    private int initialSize;
    @Value("${spring.datasource.druid.minIdle}")
    private int minIdle;
    @Value("${spring.datasource.druid.maxActive}")
    private int maxActive;
    @Value("${spring.datasource.druid.maxWait}")
    private int maxWait;
    @Value("${spring.datasource.druid.connectTimeout}")
    private int connectTimeout;
    @Value("${spring.datasource.druid.socketTimeout}")
    private int socketTimeout;
    @Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
    private int timeBetweenEvictionRunsMillis;
    @Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
    private int minEvictableIdleTimeMillis;
    @Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
    private int maxEvictableIdleTimeMillis;
    @Value("${spring.datasource.druid.validationQuery}")
    private String validationQuery;
    @Value("${spring.datasource.druid.testWhileIdle}")
    private boolean testWhileIdle;
    @Value("${spring.datasource.druid.testOnBorrow}")
    private boolean testOnBorrow;
    @Value("${spring.datasource.druid.testOnReturn}")
    private boolean testOnReturn;
    public DruidDataSource dataSource(DruidDataSource datasource) {
        /* é…ç½®åˆå§‹åŒ–大小、最小、最大 */
        datasource.setInitialSize(initialSize);
        datasource.setMaxActive(maxActive);
        datasource.setMinIdle(minIdle);
        /* é…ç½®èŽ·å–è¿žæŽ¥ç­‰å¾…è¶…æ—¶çš„æ—¶é—´ */
        datasource.setMaxWait(maxWait);
        /* é…ç½®é©±åŠ¨è¿žæŽ¥è¶…æ—¶æ—¶é—´ï¼Œæ£€æµ‹æ•°æ®åº“å»ºç«‹è¿žæŽ¥çš„è¶…æ—¶æ—¶é—´ï¼Œå•ä½æ˜¯æ¯«ç§’ */
        datasource.setConnectTimeout(connectTimeout);
        /* é…ç½®ç½‘络超时时间,等待数据库操作完成的网络超时时间,单位是毫秒 */
        datasource.setSocketTimeout(socketTimeout);
        /* é…ç½®é—´éš”多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        /* é…ç½®ä¸€ä¸ªè¿žæŽ¥åœ¨æ± ä¸­æœ€å°ã€æœ€å¤§ç”Ÿå­˜çš„æ—¶é—´ï¼Œå•位是毫秒 */
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
        /*
         * ç”¨æ¥æ£€æµ‹è¿žæŽ¥æ˜¯å¦æœ‰æ•ˆçš„sql,要求是一个查询语句,常用select 'x'。
         * å¦‚æžœvalidationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
         */
        datasource.setValidationQuery(validationQuery);
        /* å»ºè®®é…ç½®ä¸ºtrue,不影响性能,并且保证安全性。申请连接的时候检测,
        å¦‚果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
        datasource.setTestWhileIdle(testWhileIdle);
        /* ç”³è¯·è¿žæŽ¥æ—¶æ‰§è¡ŒvalidationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnBorrow(testOnBorrow);
        /* å½’还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnReturn(testOnReturn);
        return datasource;
    }
}
src/main/java/com/zhitan/config/redis/RedisConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,42 @@
package com.zhitan.config.redis;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        // ä½¿ç”¨ Jackson2JsonRedisSerializer åºåˆ—化值
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(objectMapper);
        // è®¾ç½® Key å’Œ Value çš„序列化规则
        // Key ä½¿ç”¨å­—符串序列化
        template.setKeySerializer(new StringRedisSerializer());
        // Value ä½¿ç”¨ JSON åºåˆ—化
        template.setValueSerializer(serializer);
        // Hash Key ä½¿ç”¨å­—符串序列化
        template.setHashKeySerializer(new StringRedisSerializer());
        // Hash Value ä½¿ç”¨ JSON åºåˆ—
        template.setHashValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }
}
src/main/java/com/zhitan/handler/MqttMessageHandler.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,40 @@
package com.zhitan.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.service.IDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
 * mqtt æ¶ˆæ¯å¤„理类
 */
@Slf4j
public class MqttMessageHandler implements MessageHandler {
    private final IDataService dataService;
    public MqttMessageHandler(IDataService dataService) {
        this.dataService = dataService;
    }
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
        String payload = (String) message.getPayload();
        System.out.println("Received message from topic " + topic + ": " + payload);
//        dataService.writeTimeSeriesData(payload);
//
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            // å°† JSON å­—符串转换为 SensorData å¯¹è±¡
            ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class);
            dataService.writeTimeSeriesData(electricPower);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}
src/main/java/com/zhitan/influxdb/InfluxdbRepository.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,76 @@
package com.zhitan.influxdb;
import com.influxdb.LogLevel;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.write.Point;
import com.zhitan.config.influxdb.InfluxdbConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
 * influxdb的基础服务
 *
 * @author Silence
 * @version 1.0
 */
@Slf4j
@Repository
public class InfluxdbRepository {
    protected InfluxdbConfig config;
    protected InfluxDBClient client;
    @Autowired
    public InfluxdbRepository(InfluxdbConfig config) {
        this.config = config;
        init();
    }
    /**
     * åˆå§‹åŒ–
     */
    private void init() {
        if (config.isEnable()) {
            if (null == client) {
                client = InfluxDBClientFactory.create(config.getHost(), config.getToken().toCharArray(),
                                config.getOrg(), config.getBucket())
                        .enableGzip()
                        .setLogLevel(LogLevel.BASIC);
            }
            if (!client.ping()) {
                log.error("实时库连接失败");
            } else {
                log.info("实时库连接成功");
            }
        } else {
            log.debug("时序库不可用");
        }
    }
    /**
     * å†™å…¥å•个点位
     */
    public void writePoint(Point point) {
        if (null == point) {
            return;
        }
        WriteApiBlocking writeApi = client.getWriteApiBlocking();
        writeApi.writePoint(point);
    }
    /**
     * å†™å…¥å¤šä¸ªç‚¹ä½
     */
    public void writePoints(List<Point> points) {
        if (null == points || points.isEmpty()) {
            return;
        }
        WriteApiBlocking writeApi = client.getWriteApiBlocking();
        writeApi.writePoints(points);
    }
}
src/main/java/com/zhitan/mapper/CommonMapper.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,17 @@
package com.zhitan.mapper;
import com.zhitan.model.IndexTemplate;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
 * common mapper
 */
public interface CommonMapper {
    /**
     * èŽ·å–ç‚¹ä½æ¨¡æ¿
     */
    List<IndexTemplate> getIndexTemplate();
}
src/main/java/com/zhitan/model/IndexTemplate.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,26 @@
package com.zhitan.model;
import lombok.Data;
/**
 * æ¨¡æ¿
 */
@Data
public class IndexTemplate {
    /**
     * code
     */
    private String code;
    /**
     * name
     */
    private String name;
    /**
     * deviceType
     */
    private Integer deviceType;
    /**
     * gatewayKey
     */
    private String gatewayKey;
}
src/main/java/com/zhitan/model/entity/ElectricPower.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,84 @@
package com.zhitan.model.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
/**
 * ç”µè¡¨çš„æ•°æ®ç»“æž„
 */
@Data
public class ElectricPower {
    @JsonProperty("SN")
    private String sn;
    @JsonProperty("Pt")
    private double pt;
    @JsonProperty("Ua")
    private double ua;
    @JsonProperty("Ub")
    private double ub;
    @JsonProperty("Uc")
    private double uc;
    @JsonProperty("Uab")
    private double uab;
    @JsonProperty("Ubc")
    private double ubc;
    @JsonProperty("Uca")
    private double uca;
    @JsonProperty("Ia")
    private double ia;
    @JsonProperty("Ib")
    private double ib;
    @JsonProperty("Ic")
    private double ic;
    @JsonProperty("Pw")
    private double pw;
    @JsonProperty("Pwa")
    private double pwa;
    @JsonProperty("Pwb")
    private double pwb;
    @JsonProperty("Pwc")
    private double pwc;
    @JsonProperty("Pq")
    private double pq;
    @JsonProperty("Pqa")
    private double pqa;
    @JsonProperty("Pqb")
    private double pqb;
    @JsonProperty("Pqc")
    private double pqc;
    @JsonProperty("Q")
    private double q;
    @JsonProperty("Qa")
    private double qa;
    @JsonProperty("Qb")
    private double qb;
    @JsonProperty("Qc")
    private double qc;
    @JsonProperty("Time")
    private String time;
    @JsonProperty("Type")
    private int type;
}
src/main/java/com/zhitan/redis/RedisCache.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,183 @@
package com.zhitan.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
 * spring redis å·¥å…·ç±»
 *
 * @author ruoyi
 **/
@SuppressWarnings(value = {"unchecked", "rawtypes"})
@Component
public class RedisCache {
    private final RedisTemplate redisTemplate;
    @Autowired
    public RedisCache(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    /**
     * ç¼“存基本的对象,Integer、String、实体类等
     *
     * @param key   ç¼“存的键值
     * @param value ç¼“存的值
     * @return ç¼“存的对象
     */
    public <T> ValueOperations<String, T> setCacheObject(String key, T value) {
        ValueOperations<String, T> operation = redisTemplate.opsForValue();
        operation.set(key, value);
        return operation;
    }
    /**
     * ç¼“存基本的对象,Integer、String、实体类等
     *
     * @param key      ç¼“存的键值
     * @param value    ç¼“存的值
     * @param timeout  æ—¶é—´
     * @param timeUnit æ—¶é—´é¢—粒度
     * @return ç¼“存的对象
     */
    public <T> ValueOperations<String, T> setCacheObject(String key, T value, Integer timeout, TimeUnit timeUnit) {
        ValueOperations<String, T> operation = redisTemplate.opsForValue();
        operation.set(key, value, timeout, timeUnit);
        return operation;
    }
    /**
     * èŽ·å¾—ç¼“å­˜çš„åŸºæœ¬å¯¹è±¡ã€‚
     *
     * @param key ç¼“存键值
     * @return ç¼“存键值对应的数据
     */
    public <T> T getCacheObject(String key) {
        ValueOperations<String, T> operation = redisTemplate.opsForValue();
        return operation.get(key);
    }
    /**
     * åˆ é™¤å•个对象
     *
     * @param key
     */
    public void deleteObject(String key) {
        redisTemplate.delete(key);
    }
    /**
     * åˆ é™¤é›†åˆå¯¹è±¡
     *
     * @param collection
     */
    public void deleteObject(Collection collection) {
        redisTemplate.delete(collection);
    }
    /**
     * ç¼“å­˜List数据
     *
     * @param key      ç¼“存的键值
     * @param dataList å¾…缓存的List数据
     * @return ç¼“存的对象
     */
    public <T> ListOperations<String, T> setCacheList(String key, List<T> dataList) {
        ListOperations listOperation = redisTemplate.opsForList();
        if (null != dataList) {
            int size = dataList.size();
            for (int i = 0; i < size; i++) {
                listOperation.leftPush(key, dataList.get(i));
            }
        }
        return listOperation;
    }
    /**
     * èŽ·å¾—ç¼“å­˜çš„list对象
     *
     * @param key ç¼“存的键值
     * @return ç¼“存键值对应的数据
     */
    public <T> List<T> getCacheList(String key) {
        List<T> dataList = new ArrayList<T>();
        ListOperations<String, T> listOperation = redisTemplate.opsForList();
        Long size = listOperation.size(key);
        for (int i = 0; i < size; i++) {
            dataList.add(listOperation.index(key, i));
        }
        return dataList;
    }
    /**
     * ç¼“å­˜Set
     *
     * @param key     ç¼“存键值
     * @param dataSet ç¼“存的数据
     * @return ç¼“存数据的对象
     */
    public <T> BoundSetOperations<String, T> setCacheSet(String key, Set<T> dataSet) {
        BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
        Iterator<T> it = dataSet.iterator();
        while (it.hasNext()) {
            setOperation.add(it.next());
        }
        return setOperation;
    }
    /**
     * èŽ·å¾—ç¼“å­˜çš„set
     *
     * @param key
     * @return
     */
    public <T> Set<T> getCacheSet(String key) {
        Set<T> dataSet = new HashSet<T>();
        BoundSetOperations<String, T> operation = redisTemplate.boundSetOps(key);
        dataSet = operation.members();
        return dataSet;
    }
    /**
     * ç¼“å­˜Map
     *
     * @param key
     * @param dataMap
     * @return
     */
    public <T> HashOperations<String, String, T> setCacheMap(String key, Map<String, T> dataMap) {
        HashOperations hashOperations = redisTemplate.opsForHash();
        if (null != dataMap) {
            for (Map.Entry<String, T> entry : dataMap.entrySet()) {
                hashOperations.put(key, entry.getKey(), entry.getValue());
            }
        }
        return hashOperations;
    }
    /**
     * èŽ·å¾—ç¼“å­˜çš„Map
     *
     * @param key
     * @return
     */
    public <T> Map<String, T> getCacheMap(String key) {
        Map<String, T> map = redisTemplate.opsForHash().entries(key);
        return map;
    }
    /**
     * èŽ·å¾—ç¼“å­˜çš„åŸºæœ¬å¯¹è±¡åˆ—è¡¨
     *
     * @param pattern å­—符串前缀
     * @return å¯¹è±¡åˆ—表
     */
    public Collection<String> keys(String pattern) {
        return redisTemplate.keys(pattern);
    }
}
src/main/java/com/zhitan/service/IDataService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,24 @@
package com.zhitan.service;
import com.zhitan.model.entity.ElectricPower;
import org.jetbrains.annotations.NotNull;
/**
 * æ•°æ®service
 */
public interface IDataService {
    /**
     * å†™å…¥æ•°æ®-适合动态写入key唯一的
     *
     * @param jsonString json格式的数据
     */
    void writeTimeSeriesData(@NotNull String jsonString);
    /**
     * å†™å…¥ç”µåŠ›ç›¸å…³æ•°æ®-固定格式,可自定义修改
     *
     * @param electricPower å›ºå®šæ ¼å¼çš„æ•°æ®
     */
    void writeTimeSeriesData(@NotNull ElectricPower electricPower);
}
src/main/java/com/zhitan/service/impl/DataServiceImpl.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,133 @@
package com.zhitan.service.impl;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.config.influxdb.InfluxdbConfig;
import com.zhitan.model.IndexTemplate;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.influxdb.InfluxdbRepository;
import com.zhitan.mapper.CommonMapper;
import com.zhitan.redis.RedisCache;
import com.zhitan.service.IDataService;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * æ•°æ®service
 */
@Slf4j
@Service
public class DataServiceImpl implements IDataService {
    private final String TAG = "tag";
    private final String FIELD_VALUE = "value";
    private final InfluxdbRepository repository;
    private final InfluxdbConfig influxdbConfig;
    private final CommonMapper commonMapper;
    private final RedisCache redisCache;
    @Autowired
    public DataServiceImpl(InfluxdbRepository repository, InfluxdbConfig influxdbConfig,
                           CommonMapper commonMapper, RedisCache redisCache) {
        this.repository = repository;
        this.influxdbConfig = influxdbConfig;
        this.commonMapper = commonMapper;
        this.redisCache = redisCache;
    }
    /**
     * å†™å…¥æ—¶åºæ•°æ®
     *
     * @param jsonString json数据
     */
    @Override
    public void writeTimeSeriesData(@NotNull String jsonString) {
        if (jsonString.isEmpty()) {
            return;
        }
        List<Point> points = new ArrayList<>();
        JsonObject jsonObject = JsonParser.parseString(jsonString).getAsJsonObject();
        for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
            String key = entry.getKey();
            JsonElement value = entry.getValue();
            JsonPrimitive primitive = value.getAsJsonPrimitive();
            Point point = Point
                    .measurement(influxdbConfig.getMeasurement())
                    .addTag(TAG, key)
                    .time(Instant.now(), WritePrecision.S);
            if (primitive.isJsonPrimitive()) {
                if (primitive.isNumber()) {
                    point.addField(FIELD_VALUE, value.getAsDouble());
                    points.add(point);
                } else if (primitive.isString()) {
                    //point.addField(FIELD_VALUE, value.getAsString());
                } else if (primitive.isBoolean()) {
                    //point.addField(FIELD_VALUE, value.getAsBoolean());
                }
            }
        }
        repository.writePoints(points);
    }
    /**
     * å†™å…¥ç”µåŠ›ç›¸å…³æ•°æ®-固定格式,可自定义修改
     *
     * @param electricPower å›ºå®šæ ¼å¼çš„æ•°æ®
     */
    @Override
    public void writeTimeSeriesData(@NotNull ElectricPower electricPower) {
        List<IndexTemplate> templates = getIndexTemplate();
        // èŽ·å–ç±»ä¸­æ‰€æœ‰å£°æ˜Žçš„å­—æ®µ
        Field[] fields = electricPower.getClass().getDeclaredFields();
        List<Point> points = new ArrayList<>();
        for (Field field : fields) {
            IndexTemplate indexTemplate = templates.stream().filter(template ->
                            field.getName().equalsIgnoreCase(template.getGatewayKey()))
                    .findFirst().orElse(null);
            if (indexTemplate != null) {
                Point point = Point
                        .measurement(influxdbConfig.getMeasurement())
                        .addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode())
                        .time(Instant.now(), WritePrecision.S);
                // è®¾ç½®å­—段可访问,允许访问私有字段
                field.setAccessible(true);
                if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
                    try {
                        // èŽ·å–å­—æ®µå€¼
                        double value = field.getDouble(electricPower);
                        point.addField(FIELD_VALUE, value);
                        points.add(point);
                    } catch (IllegalAccessException e) {
                        log.error("获取属性值失败:{}", e.getMessage());
                    }
                }
            }
        }
        repository.writePoints(points);
    }
    /**
     * èŽ·å–ç‚¹ä½æ¨¡æ¿
     */
    protected List<IndexTemplate> getIndexTemplate() {
        String TEMPLATE_KEY = "template";
        List<IndexTemplate> result = redisCache.getCacheList(TEMPLATE_KEY);
        if (result == null || result.isEmpty()) {
            result = commonMapper.getIndexTemplate();
            redisCache.setCacheList(TEMPLATE_KEY, result);
        }
        return result;
    }
}
src/main/resources/application-dev.yml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,78 @@
spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    druid:
      master:
        url: jdbc:postgresql://localhost:5432/energy
        username: postgres
        password: postgres
      # åˆå§‹è¿žæŽ¥æ•°
      initialSize: 5
      # æœ€å°è¿žæŽ¥æ± æ•°é‡
      minIdle: 10
      # æœ€å¤§è¿žæŽ¥æ± æ•°é‡
      maxActive: 20
      # é…ç½®èŽ·å–è¿žæŽ¥ç­‰å¾…è¶…æ—¶çš„æ—¶é—´
      maxWait: 60000
      # é…ç½®è¿žæŽ¥è¶…æ—¶æ—¶é—´
      connectTimeout: 30000
      # é…ç½®ç½‘络超时时间
      socketTimeout: 60000
      # é…ç½®é—´éš”多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
      timeBetweenEvictionRunsMillis: 60000
      # é…ç½®ä¸€ä¸ªè¿žæŽ¥åœ¨æ± ä¸­æœ€å°ç”Ÿå­˜çš„æ—¶é—´ï¼Œå•位是毫秒
      minEvictableIdleTimeMillis: 300000
      # é…ç½®ä¸€ä¸ªè¿žæŽ¥åœ¨æ± ä¸­æœ€å¤§ç”Ÿå­˜çš„æ—¶é—´ï¼Œå•位是毫秒
      maxEvictableIdleTimeMillis: 900000
      # é…ç½®æ£€æµ‹è¿žæŽ¥æ˜¯å¦æœ‰æ•ˆ
      validationQuery: SELECT 1
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      webStatFilter:
        enabled: true
      statViewServlet:
        enabled: true
        # è®¾ç½®ç™½åå•,不填则允许所有访问
        allow:
        url-pattern: /druid/*
        # æŽ§åˆ¶å°ç®¡ç†ç”¨æˆ·åå’Œå¯†ç 
        login-username: admin
        login-password: 123456
      filter:
        stat:
          enabled: true
          # æ…¢SQL记录
          log-slow-sql: true
          slow-sql-millis: 1000
          merge-sql: true
        wall:
          config:
            multi-statement-allow: true
  mqtt:
    # è‡ªå®šä¹‰
    client-id: 202503181042
    # å…¬å…±MQTT服务器,生产环境需要替换为自己的
    broker-url: tcp://broker.emqx.io
    username:
    password:
    #默认订阅的主题
    default-topic: zhitan
    timeout: 30
    keep-alive: 60
  redis:
    database: 0
    host: localhost
    port: 6379
    password:
influxdb:
  host: "http://localhost:8086"
  #修改为自己的时序库访问org
  org: "org"
  #修改为自己的时序库bucket
  bucket: "bucket"
  #修改为自己的时序库访问token
  token: "token"
  measurement: data
  enable: true
src/main/resources/application.yml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,24 @@
spring:
  profiles:
    active: dev
  # æœåŠ¡æ¨¡å—
  devtools:
    restart:
      # çƒ­éƒ¨ç½²å¼€å…³
      enabled: false
# æ—¥å¿—配置
logging:
  level:
    com.zhitan: debug
    org.springframework: warn
mybatis-plus:
  type-aliases-package: com.zhitan.entity
  mapper-locations: classpath*:/mapper/*Mapper.xml
  # åŠ è½½å…¨å±€çš„é…ç½®æ–‡ä»¶
  config-location: classpath:/mybatis/mybatis-config.xml
  global-config:
    db-config:
      logic-delete-field: del_flag # å…¨å±€é€»è¾‘删除字段名
      logic-delete-value: 2 # é€»è¾‘已删除值
      logic-not-delete-value: 0 # é€»è¾‘未删除值
src/main/resources/mapper/CommonMapper.xml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.zhitan.mapper.CommonMapper">
    <select id="getIndexTemplate" resultType="com.zhitan.model.IndexTemplate">
        SELECT code,
               "name",
               device_type,
               gateway_key
        FROM daq_template;
    </select>
</mapper>
src/main/resources/mybatis/mybatis-config.xml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <!-- å…¨å±€å‚æ•° -->
    <settings>
        <!-- ä½¿å…¨å±€çš„æ˜ å°„器启用或禁用缓存 -->
        <setting name="cacheEnabled" value="true"/>
        <!-- å…è®¸JDBC æ”¯æŒè‡ªåŠ¨ç”Ÿæˆä¸»é”® -->
        <setting name="useGeneratedKeys" value="true"/>
        <!-- é…ç½®é»˜è®¤çš„æ‰§è¡Œå™¨.SIMPLE就是普通执行器;REUSE执行器会重用预处理语句(prepared statements);BATCH执行器将重用语句并执行批量更新 -->
        <setting name="defaultExecutorType" value="SIMPLE"/>
        <!-- æŒ‡å®š MyBatis æ‰€ç”¨æ—¥å¿—的具体实现 -->
        <setting name="logImpl" value="SLF4J"/>
        <!-- ä½¿ç”¨é©¼å³°å‘½åæ³•转换字段 -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
    </settings>
</configuration>