1、mq简介

概述

        MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的消息传输协议,主要用于低带宽、高延迟或不可靠的网络环境。它广泛应用于物联网(IoT)领域,用于设备与设备之间、设备与服务器之间的通信。

历史背景

  • 起源:MQTT协议最初由IBM的Andy Stanford-Clark和Arcom公司的Arlen Nipper于1999年发明,最初用于石油管道的远程监控系统。由于其高效、可靠和低带宽的特点,逐渐被广泛应用于各种工业和物联网场景。

  • 发展:2013年,MQTT协议被OASIS(结构化信息标准促进组织)批准为开放标准,进一步推动了其在全球范围内的普及。

协议特点

  • 轻量级:MQTT协议的头部信息最小只有2字节,相比HTTP等协议,大大减少了传输数据量,非常适合在带宽有限的网络中使用。

  • 低功耗:设备在空闲时可以进入低功耗模式,只有在需要发送或接收消息时才唤醒,这使得它非常适合电池供电的设备。

  • 高可靠性:通过QoS(服务质量)机制,MQTT可以确保消息的可靠传输。QoS分为三个级别:

    • QoS 0(最多一次):消息最多被发送一次,不保证消息一定到达。

    • QoS 1(至少一次):消息至少被发送一次,可能会重复到达。

    • QoS 2(恰好一次):消息恰好被发送一次,不会重复也不会丢失。

  • 双向通信:MQTT支持客户端到服务器的发布/订阅模式,同时也支持服务器到客户端的通信,非常适合物联网场景中的设备控制和数据上报。

工作原理

  • 发布/订阅模式:MQTT基于发布/订阅模式,客户端可以是发布者(Publisher)或订阅者(Subscriber)。发布者将消息发布到特定的主题(Topic),而订阅者则订阅这些主题来接收消息。服务器(Broker)负责将消息从发布者传递到订阅者。

    • 发布者:负责生成消息并将其发送到Broker。

    • 订阅者:负责从Broker接收消息。

    • Broker:作为消息的中转站,负责接收发布者的消息并将其分发给订阅者。

  • 连接建立:客户端通过TCP/IP连接到Broker,并通过CONNECT报文建立连接。连接建立后,客户端可以开始发布或订阅消息。

  • 消息传输:消息通过PUBLISH报文从发布者发送到Broker,再通过PUBLISH报文从Broker发送到订阅者。消息的传输过程受到QoS机制的控制。

应用场景

  • 物联网设备通信:MQTT是物联网中常用的通信协议,适用于智能家居、智能工厂、智能农业等场景。例如,智能家居设备可以通过MQTT将传感器数据发送到服务器,用户可以通过手机APP接收这些数据并控制设备。

  • 工业自动化:在工业自动化领域,MQTT可以用于设备之间的通信和数据采集。例如,生产线上的传感器可以通过MQTT将数据发送到监控系统,实现远程监控和故障诊断。

  • 移动应用:MQTT也可以用于移动应用的实时消息推送。例如,即时通讯应用可以通过MQTT实现消息的快速传输和低延迟推送。

优势与局限性

  • 优势

    • 低带宽和低功耗:适合资源受限的设备和网络环境。

    • 高可靠性:通过QoS机制确保消息传输的可靠性。

    • 简单易用:协议简单,易于实现和部署。

  • 局限性

    • 安全性有限:MQTT本身不提供加密机制,需要结合TLS/SSL等协议来实现安全传输。

    • 不适合高吞吐量场景:在高吞吐量的场景下,MQTT可能会出现性能瓶颈。

未来发展趋势

  • 与5G和边缘计算结合:随着5G和边缘计算的发展,MQTT将在物联网中发挥更大的作用。5G的低延迟和高带宽特性可以弥补MQTT在传输速度上的不足,而边缘计算可以将数据处理靠近设备端,减少对中心服务器的依赖。

  • 与其他协议融合:MQTT可能会与其他物联网协议(如CoAP、AMQP)融合,形成更完整的物联网通信解决方案。

2、bifromq、rocketmq mqtt、Mosquitto比较

功能特性

功能特性 Mosquitto RocketMQ MQTT BifroMQ
协议支持 MQTT 3.1、3.1.1、5.0 MQTT MQTT
客户端连接数 适合大规模连接(十万级) 一般服务器规模较小(万级) 支持大规模连接(千万级)
单客户端消息量 单个客户端消息量少,适合定时收发 单个客户端处理消息量大,注重吞吐量 高吞吐量,适合大规模消息处理
消息持久化 支持消息持久化到磁盘 支持 支持
QoS 支持 支持 QoS 0、1、2 支持 支持
多语言支持 支持多种语言(Java、C、Python 等) 支持 Java、C++、.NET 等 支持多种语言
安全性 支持 TLS/SSL 加密、用户名/密码认证、ACL 支持 RAM 授权 支持
多租户支持 不支持 不支持 支持
动态认证 不支持 不支持 不支持
数据桥接 不支持 不支持 不支持

性能

性能指标 Mosquitto RocketMQ MQTT BifroMQ
吞吐量 QoS 0:120k 消息/秒;QoS 1:80k 消息/秒 高吞吐量,适合大规模消息处理 极高的消息处理能力,低时延
连接数 单节点支持 10 万连接 单节点连接数较少 单节点支持数百万连接
延迟 在某些场景下延迟较高 低延迟 极低的消息时延

使用场景

使用场景 Mosquitto RocketMQ MQTT BifroMQ
物联网设备 适合资源受限的物联网设备 适合需要高吞吐量的物联网场景 适合大规模物联网设备连接
工业自动化 适合轻量级工业自动化 适合对吞吐量要求较高的工业场景 适合大规模工业物联网
智能家居 适合智能家居设备 适合智能家居设备 适合智能家居设备

社区与支持

社区与支持 Mosquitto RocketMQ MQTT BifroMQ
社区活跃度 拥有活跃的开源社区 社区活跃度相对较低 社区活跃度正在提升
技术支持 提供丰富的文档和社区支持 提供有限的社区支持 提供技术支持

总结

  • Mosquitto:适合资源受限的物联网设备和轻量级应用场景,具有低资源占用、简单易用和跨平台的特点。

  • RocketMQ MQTT:适合对吞吐量要求较高的物联网场景,支持高吞吐量和顺序消息。

  • BifroMQ:适合大规模物联网设备连接和高吞吐量场景,具有高性能、低时延和多租户支持的特点。

3、bifromq官网

地址:BifroMQ Open Source | BifroMQ.io - Multi-tenancy MQTT Broker

文档地址:BifroMQ | BifroMQ.io - Multi-tenancy MQTT Broker

github地址:https://github.com/baidu/bifromq

4、docker安装

安装文档:快速安装 | BifroMQ.io - Multi-tenancy MQTT Broker

docker run -d --name bifromq -p 1883:1883 bifromq/bifromq:latest

5、demo测试

引入依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>${org.eclipse.paho.client.mqttv3.version}</version>
</dependency>

代码

package com.ybw.rocketmq.demo.mqtt.baidu;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jetbrains.annotations.NotNull;

/**
 * 开源项目-Bifro
 *
 * @author ybw
 * @version V1.0
 * @className BifroMQClient
 * @date 2025/5/13
 **/
@Slf4j
public class BifroMQClient {
    public static void main(String[] args) {
        String broker = "tcp://localhost:1883"; // BifroMQ 服务地址
        String clientId = "JavaClient"; // 客户端ID
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            // 创建 MQTT 客户端实例
            MqttClient client = getMqttClient(broker, clientId, persistence);
            System.out.println("连接成功");

            // 订阅主题
            client.subscribe("test/topic", 0);
            System.out.println("订阅主题成功");

            // 发布消息
            MqttMessage message = new MqttMessage("Hello BifroMQ".getBytes());
            message.setQos(0);
            client.publish("test/topic", message);
            System.out.println("消息发布成功");

            // 等待一段时间接收消息
            Thread.sleep(5000);

            // 断开连接
            client.disconnect();
            System.out.println("断开连接");
        } catch (Exception e) {
            log.error("main error:", e);
        }
    }

    @NotNull
    private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException {
        MqttClient client = new MqttClient(broker, clientId, persistence);

        // 设置连接回调
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("连接丢失");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                System.out.println("收到消息:" + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("消息发送完成");
            }
        });

        // 连接到 BifroMQ
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        client.connect(options);
        return client;
    }
}

6、MQTT服务质量-QoS

        MQTT(Message Queuing Telemetry Transport)协议里有三种服务质量(QoS)级别,它们用来规定消息传递的可靠程度。 

QoS 0:最多一次(At most once)

  • 特性:消息只会发送一次,不会去确认消息是否送达。要是发送过程中出现丢包情况,也不会进行重发。
  • 适用场景:比较适合那种对丢包不太敏感的场景,像环境监测数据的采集,偶尔丢失一两条数据对整体结果影响不大;还有状态更新类的场景,比如设备在线状态的实时更新,错过一次更新后续也能获取到最新状态。
  • 优点:实现起来简单,消耗的网络资源少,消息传输延迟低。
  • 缺点:无法保证消息一定能到达接收端。

QoS 1:至少一次(At least once)

  • 特性:使用 PUBACK 确认机制。发送方会一直重发消息,直到收到接收方的确认回执,不过这可能会导致消息重复。
  • 适用场景:适用于需要保证消息送达,但对消息重复不太敏感的场景,例如传感器数据的收集,偶尔重复一两条数据对分析结果影响较小;还有通知类的场景,如系统告警通知,重要的是能收到通知,多收到一次影响不大。
  • 优点:能保证消息不丢失。
  • 缺点:可能会出现消息重复的情况,需要接收方自行处理重复消息。

QoS 2:恰好一次(Exactly once)

  • 特性:采用四次握手的确认机制(PUBREC、PUBREL、PUBCOMP),能确保消息只被送达一次,不会出现重复。
  • 适用场景:适合对消息准确性要求很高的场景,比如金融交易,每一笔交易都必须准确无误;还有计费系统,多算或者少算费用都会造成严重后果。
  • 优点:能保证消息的唯一性和可靠性。
  • 缺点:实现起来比较复杂,会增加网络延迟,消耗更多的系统资源。

QoS 选择建议

  • 资源受限的设备:如果设备的计算能力、内存或者网络带宽有限,可以选择 QoS 0 或者 QoS 1。
  • 高可靠性需求的场景:像涉及到关键业务数据传输的场景,应优先考虑 QoS 2。
  • 权衡延迟和可靠性:如果希望在保证消息可靠传输的同时,尽量降低延迟,可以选择 QoS 1。

示例对比

  • QoS 0:类似于 UDP 协议,只管发送消息,不考虑消息是否能到达。
  • QoS 1:就好像发送短信,发送方会一直尝试发送,直到收到对方的已读回执,但可能会因为网络问题导致短信重复发送。
  • QoS 2:类似于银行转账,会有一系列的确认流程来确保资金准确无误地到达对方账户。

理解这三种 QoS 级别对于设计可靠的 MQTT 应用至关重要,你要根据具体的业务需求来权衡选择合适的 QoS 级别。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐