springboot+mqtt+apache apollo,监听信息并可以动态更改topic
一、需求通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。二、实现1、新建一个demo数据库并添加几条数据来进行测试站点设备信息表存放监听数据表2、创建一个springboot项目,开始编写Java代码(1)创建springboot具体创建过程略,可参考文章使用IDEA创建一个springboot项
·
一、需求
通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。
二、实现
1、新建一个demo数据库并添加几条数据来进行测试
站点设备信息表
存放监听数据表
2、创建一个springboot项目,开始编写Java代码
(1)创建springboot
具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)
(2)配置配置文件
server.port=13010 spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull spring.datasource.username = root spring.datasource.password = 123456 spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.type = com.zaxxer.hikari.HikariDataSource spring.mqtt.url = tcp://127.0.0.1:61613 consumer.client.id = iot_mqtt spring.mqtt.username = admin spring.mqtt.password = password spring.mqtt.completionTimeout = 3000
(3)pom文件导入相关jar包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.iotmqtt</groupId> <artifactId>iot-mqtt-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <!--spring-mqtt--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.0</version> </dependency> <!-- swagger --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.4.1</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.1</version> </dependency> <!--poi--> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.16</version> </dependency> <!-- lombok slf4j日志--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
(3)创建MqttConfig,配置mqtt连接,编写topic更新方法
package com.iotmqtt.iotmqttdemo.config; import com.iotmqtt.iotmqttdemo.mapper.DemoMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; @Component @Configuration @EnableAutoConfiguration public class MqttConfig { private Log log = LogFactory.getLog(MqttConfig.class); @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${consumer.client.id}") private String clientId; @Value("${spring.mqtt.completionTimeout}") private static int completionTimeout ; private static MqttClient client; @Autowired DemoMapper demoMapper; /** * 添加topic * @param topic */ public void addToipc(String topic) throws MqttException { client.subscribe(topic); } /** * 删除topic * @param topic */ public void removeTopic(String topic) throws MqttException { client.unsubscribe(topic); } public void init() throws MqttException { //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, //MemoryPersistence设置clientid的保存形式,默认为以内存保存 if(client==null) { client = new MqttClient(hostUrl, clientId, new MqttDefaultFilePersistence()); } MqttConnectOptions options = new MqttConnectOptions(); //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, //这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); //设置连接的用户名 options.setUserName(username); //设置连接的密码 options.setPassword(password.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(20); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //回调 client.setCallback(new MqttCallback() { @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { //subscribe后得到的消息会执行到这里面 process(topicName,message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish后会执行到这里 } @Override public void connectionLost(Throwable cause) { // //连接丢失后,一般在这里面进行重连 try { init(); }catch (Exception e){ e.printStackTrace(); } } }); //链接 client.connect(options); //订阅 client.subscribe(demoMapper.getTopics()); //取消订阅 //client.unsubscribe(topicStr); } /** * 自己写发布消息的方法,然后循环调用。 */ public void PushMsg(String deviceNum, String msg){ //tpoic String topic = deviceNum; MqttMessage m=new MqttMessage(); m.setRetained(true); m.setPayload(msg.getBytes()); try { log.info("主题:"+topic+"-----内容:"+m); client.publish(topic, m); }catch(Exception e){ System.out.println("发布消息失败-->"+msg); e.printStackTrace(); } } /** * 接受数据后处理数据方法 * @param topicName * @param message */ void process(String topicName,String message){ //处理数据代码 }
}
(4)编写获得站点/设备topic的方法和保存数据的方法
mapper
package com.iotmqtt.iotmqttdemo.mapper; import com.iotmqtt.iotmqttdemo.bean.SiteData; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Select; @Mapper public interface DemoMapper { @Select("select topics from site_info") String[] getTopics(); @Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})") void insert(SiteData siteData); }
(5)创建controller编写更新topic接口
package com.iotmqtt.iotmqttdemo.controller; import com.iotmqtt.iotmqttdemo.config.MqttConfig; import com.iotmqtt.iotmqttdemo.service.DemoService; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping({"/demo"}) public class DemoController { @Autowired DemoService demoService; @Autowired MqttConfig mqttConfig; @ApiOperation(value = "删除topic", notes = "") @ApiImplicitParams({ @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String") }) @GetMapping("/removeTopic") public String removeTopic(String topic) throws MqttException { mqttConfig.removeTopic(topic); return "删除成功"; } @ApiOperation(value = "新增topic", notes = "") @ApiImplicitParams({ @ApiImplicitParam(name = "topic", value = "阿波罗topic", required = false, paramType = "String") }) @GetMapping("/addTopic") public String addTopic(String topic) throws MqttException { mqttConfig.addToipc(topic); return "添加成功"; } }
三、测试代码
1、服务启动后,使用mqtt连接工具发送数据测试功能
(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据
发送数据测试
(2)修改一个站点的topic,调用更新接口再此发送数据测试
修改1站点topic
执行更新topic接口
修改后重新发送数据
(4)检查数据更新情况
可以看到我们数据已经可以正常上来
关注公众号 Z丶learn 回复 mqttdemo 获得源码demo
微信公众号
更多推荐
所有评论(0)