一、需求

        通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。

二、实现

1、新建一个demo数据库并添加几条数据来进行测试

 

站点设备信息表

 

存放监听数据表

2、创建一个springboot项目,开始编写Java代码

(1)创建springboot

            具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)

(2)配置配置文件

server.port=13010
spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username = root
spring.datasource.password = 123456
spring.datasource.driver-class-name = com.mysql.jdbc.Driver
spring.datasource.type = com.zaxxer.hikari.HikariDataSource
spring.mqtt.url = tcp://127.0.0.1:61613
consumer.client.id = iot_mqtt
spring.mqtt.username = admin
spring.mqtt.password = password
spring.mqtt.completionTimeout = 3000

(3)pom文件导入相关jar包

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.iotmqtt</groupId>
    <artifactId>iot-mqtt-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>
        <!--spring-mqtt-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
        <!-- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- MySQL -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- mybatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.1</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.1</version>
        </dependency>
        <!--poi-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.16</version>
        </dependency>
        <!-- lombok slf4j日志-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

(3)创建MqttConfig,配置mqtt连接,编写topic更新方法

package com.iotmqtt.iotmqttdemo.config;
import com.iotmqtt.iotmqttdemo.mapper.DemoMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Configuration
@EnableAutoConfiguration
public class MqttConfig {

    private Log log = LogFactory.getLog(MqttConfig.class);
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.url}")
    private String hostUrl;
    @Value("${consumer.client.id}")
    private String clientId;
    @Value("${spring.mqtt.completionTimeout}")
    private static int completionTimeout ;
    private static MqttClient client;
    @Autowired
    DemoMapper demoMapper;
    /**
     * 添加topic
     * @param topic
     */
    public void addToipc(String topic) throws MqttException {
        client.subscribe(topic);
    }

    /**
     * 删除topic
     * @param topic
     */
    public void removeTopic(String topic) throws MqttException {
        client.unsubscribe(topic);
    }

    public void init() throws MqttException {
        //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
        if(client==null) {
            client = new MqttClient(hostUrl, clientId, new MqttDefaultFilePersistence());
        }
        MqttConnectOptions options = new MqttConnectOptions();
        //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        //这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        //设置连接的用户名
        options.setUserName(username);
        //设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(20);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        //回调
        client.setCallback(new MqttCallback() {
            @Override
            public void messageArrived(String topicName, MqttMessage message) throws Exception {
                //subscribe后得到的消息会执行到这里面
                process(topicName,message.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                //publish后会执行到这里
            }

            @Override
            public void connectionLost(Throwable cause) {
                // //连接丢失后,一般在这里面进行重连
                try {
                    init();
                }catch (Exception e){
                    e.printStackTrace();
                }

            }
        });
        //链接
        client.connect(options);
        //订阅
        client.subscribe(demoMapper.getTopics());
        //取消订阅
        //client.unsubscribe(topicStr);
    }

    /**
     * 自己写发布消息的方法,然后循环调用。
     */
    public void PushMsg(String deviceNum, String msg){
        //tpoic
        String topic = deviceNum;
        MqttMessage m=new MqttMessage();
        m.setRetained(true);
        m.setPayload(msg.getBytes());
        try {
            log.info("主题:"+topic+"-----内容:"+m);
            client.publish(topic, m);
        }catch(Exception e){
            System.out.println("发布消息失败-->"+msg);
            e.printStackTrace();
        }

    }

    /**
     * 接受数据后处理数据方法
     * @param topicName
     * @param message
     */
    void process(String topicName,String message){
        //处理数据代码
    }

}

(4)编写获得站点/设备topic的方法和保存数据的方法

mapper

package com.iotmqtt.iotmqttdemo.mapper;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface DemoMapper {

    @Select("select topics from site_info")
    String[] getTopics();
    @Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})")
    void insert(SiteData siteData);
}

 

(5)创建controller编写更新topic接口

package com.iotmqtt.iotmqttdemo.controller;
import com.iotmqtt.iotmqttdemo.config.MqttConfig;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping({"/demo"})
public class DemoController {

    @Autowired
    DemoService demoService;
    @Autowired
    MqttConfig mqttConfig;
    @ApiOperation(value = "删除topic", notes = "")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
    })
    @GetMapping("/removeTopic")
    public String removeTopic(String topic) throws MqttException {
        mqttConfig.removeTopic(topic);
        return "删除成功";
    }

    @ApiOperation(value = "新增topic", notes = "")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String")
    })
    @GetMapping("/addTopic")
    public String addTopic(String topic) throws MqttException {
        mqttConfig.addToipc(topic);
        return "添加成功";
    }

}

三、测试代码

1、服务启动后,使用mqtt连接工具发送数据测试功能

(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据

 

发送数据测试

(2)修改一个站点的topic,调用更新接口再此发送数据测试

 

修改1站点topic

 

执行更新topic接口

修改后重新发送数据

(4)检查数据更新情况

 

可以看到我们数据已经可以正常上来

 

关注公众号 Z丶learn 回复 mqttdemo 获得源码demo

微信公众号

 

 

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐