博主介绍:✌全网粉丝5W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌

博主作品:《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis-plus+MySQL+Vue等前后端分离项目,可以在左边的分类专栏找到更多项目。《Uniapp项目案例》有几个有uniapp教程,企业实战开发。《微服务实战》专栏是本人的实战经验总结,《Spring家族及微服务系列》专注Spring、SpringMVC、SpringBoot、SpringCloud系列、Nacos等源码解读、热门面试题、架构设计等。除此之外还有不少文章等你来细细品味,更多惊喜等着你哦

🍅uniapp微信小程序🍅面试题软考题免费使用,还可以使用微信支付,扫码加群。由于维护成本问题得不到解决,可能将停止线上维护。

🍅文末获取联系🍅精彩专栏推荐订阅👇🏻👇🏻 不然下次找不到哟

Java项目案例《100套》
https://blog.csdn.net/qq_57756904/category_12173599.html
uniapp小程序《100套》

https://blog.csdn.net/qq_57756904/category_12173599.html

有需求代码永远写不完,而方法才是破解之道,抖音有实战视频课程,某马某千等培训都是2万左右,甚至广东有本科院校单单一年就得3万4年就12万学费,而且还没有包括吃饭的钱。所以很划算了。另外博客左侧有源码阅读专栏,对于求职有很大帮助,当然对于工作也是有指导意义等。在大城市求职,你面试来回一趟多多少少都在12块左右,而且一般不会一次性就通过,还得面试几家。而如果你对源码以及微服务等有深度认识,这无疑给你的面试添砖加瓦更上一层楼。

最后再送一句:最好是学会了,而不是学废了!!

2

在 Spring Boot 中使用 MQTT 可以通过集成 Eclipse Paho 或 HiveMQ 等客户端库实现。以下是完整的整合步骤,包括配置、发布和订阅消息的示例。


1. 添加 MQTT 依赖

在 pom.xml 中添加 Paho MQTT 客户端依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

2. 配置 MQTT 连接

2.1 配置文件(application.yml)

mqtt:
  broker-url: tcp://broker.hivemq.com:1883  # 公共MQTT Broker
  client-id: spring-boot-client-${random.uuid}
  username: (可选)
  password: (可选)
  default-topic: test/topic  # 默认发布/订阅的Topic

2.2 MQTT 配置类

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true); // 是否持久化会话
        factory.setConnectionOptions(options);
        return factory;
    }
}

3. 发布消息(Publisher)

3.1 创建 MQTT 消息网关

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}

3.2 配置出站通道

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;

@Configuration
public class MqttOutboundConfig {

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("publisher-client", factory);
        handler.setAsync(true); // 异步发送
        handler.setDefaultTopic("${mqtt.default-topic}"); // 默认Topic
        return handler;
    }
}

3.3 使用示例(Controller 发布消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @PostMapping("/publish")
    public String publish(@RequestParam String message, @RequestParam(required = false) String topic) {
        mqttGateway.sendToMqtt(message, topic != null ? topic : "test/topic");
        return "Message published: " + message;
    }
}

4. 订阅消息(Subscriber)

4.1 配置入站通道

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

@Configuration
public class MqttInboundConfig {

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {
        return new MqttPahoMessageDrivenChannelAdapter(
                "subscriber-client", 
                factory, 
                "${mqtt.default-topic}");
    }

    @Bean
    public IntegrationFlow mqttInboundFlow(MqttPahoMessageDrivenChannelAdapter adapter) {
        return IntegrationFlows.from(adapter)
                .handle(message -> {
                    String payload = message.getPayload().toString();
                    String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                    System.out.println("Received: Topic=" + topic + ", Payload=" + payload);
                })
                .get();
    }
}

5. 测试 MQTT 功能

5.1 测试发布消息

curl -X POST "http://localhost:8080/publish?message=HelloMQTT&topic=test/topic"

5.2 查看订阅日志

控制台会输出接收到的消息:

Received: Topic=test/topic, Payload=HelloMQTT

6. 进阶配置

6.1 QoS 和 Retained 消息

// 在发布消息时指定 QoS 和 Retained
mqttGateway.sendToMqtt(
    message, 
    topic, 
    MqttUtil.buildMqttMessageProperties(1, true) // QoS=1, Retained=true
);

6.2 使用 MQTT 5.0

升级依赖到 org.eclipse.paho.client.mqttv5,并调整配置类:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv5</artifactId>
    <version>1.2.5</version>
</dependency>

7. 常见问题

7.1 连接失败

  • 检查 Broker URL 和网络连通性。

  • 确认用户名/密码正确(如果有认证)。

7.2 消息未接收

  • 检查 Topic 是否匹配(大小写敏感)。

  • 确认订阅者 Client ID 唯一(避免冲突)。

7.3 性能优化

  • 使用异步发送(setAsync(true))。

  • 共享连接(避免为每个发布/订阅创建独立客户端)。


8. 完整代码示例

GitHub 示例项目:spring-boot-mqtt-demo


通过以上步骤,Spring Boot 即可快速集成 MQTT,实现发布/订阅功能。根据实际需求调整 QoS、持久化和安全配置。

3

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐