springboot中使用mqtt
通过以上步骤,Spring Boot 即可快速集成 MQTT,实现发布/订阅功能。根据实际需求调整 QoS、持久化和安全配置。
博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌
博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦
🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。
🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟
Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》https://blog.csdn.net/qq_57756904/category_12173599.html
有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。
最后再送一句:最好是学会了,而不是学废了!!
2
在 Spring Boot 中使用 MQTT 可以通过集成 Eclipse Paho 或 HiveMQ 等客户端库实现。以下是完整的整合步骤,包括配置、发布和订阅消息的示例。
1. 添加 MQTT 依赖
在 pom.xml
中添加 Paho MQTT 客户端依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
2. 配置 MQTT 连接
2.1 配置文件(application.yml)
mqtt:
broker-url: tcp://broker.hivemq.com:1883 # 公共MQTT Broker
client-id: spring-boot-client-${random.uuid}
username: (可选)
password: (可选)
default-topic: test/topic # 默认发布/订阅的Topic
2.2 MQTT 配置类
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true); // 是否持久化会话
factory.setConnectionOptions(options);
return factory;
}
}
3. 发布消息(Publisher)
3.1 创建 MQTT 消息网关
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}
3.2 配置出站通道
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
@Configuration
public class MqttOutboundConfig {
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory factory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("publisher-client", factory);
handler.setAsync(true); // 异步发送
handler.setDefaultTopic("${mqtt.default-topic}"); // 默认Topic
return handler;
}
}
3.3 使用示例(Controller 发布消息)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/publish")
public String publish(@RequestParam String message, @RequestParam(required = false) String topic) {
mqttGateway.sendToMqtt(message, topic != null ? topic : "test/topic");
return "Message published: " + message;
}
}
4. 订阅消息(Subscriber)
4.1 配置入站通道
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@Configuration
public class MqttInboundConfig {
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {
return new MqttPahoMessageDrivenChannelAdapter(
"subscriber-client",
factory,
"${mqtt.default-topic}");
}
@Bean
public IntegrationFlow mqttInboundFlow(MqttPahoMessageDrivenChannelAdapter adapter) {
return IntegrationFlows.from(adapter)
.handle(message -> {
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
System.out.println("Received: Topic=" + topic + ", Payload=" + payload);
})
.get();
}
}
5. 测试 MQTT 功能
5.1 测试发布消息
curl -X POST "http://localhost:8080/publish?message=HelloMQTT&topic=test/topic"
5.2 查看订阅日志
控制台会输出接收到的消息:
Received: Topic=test/topic, Payload=HelloMQTT
6. 进阶配置
6.1 QoS 和 Retained 消息
// 在发布消息时指定 QoS 和 Retained
mqttGateway.sendToMqtt(
message,
topic,
MqttUtil.buildMqttMessageProperties(1, true) // QoS=1, Retained=true
);
6.2 使用 MQTT 5.0
升级依赖到 org.eclipse.paho.client.mqttv5
,并调整配置类:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv5</artifactId>
<version>1.2.5</version>
</dependency>
7. 常见问题
7.1 连接失败
-
检查 Broker URL 和网络连通性。
-
确认用户名/密码正确(如果有认证)。
7.2 消息未接收
-
检查 Topic 是否匹配(大小写敏感)。
-
确认订阅者 Client ID 唯一(避免冲突)。
7.3 性能优化
-
使用异步发送(
setAsync(true)
)。 -
共享连接(避免为每个发布/订阅创建独立客户端)。
8. 完整代码示例
GitHub 示例项目:spring-boot-mqtt-demo
通过以上步骤,Spring Boot 即可快速集成 MQTT,实现发布/订阅功能。根据实际需求调整 QoS、持久化和安全配置。
3
更多推荐
所有评论(0)