基于 MQTT 协议的 C# 实现,包括封装类、经典案例和使用 Demo。MQTT(Message Queuing Telemetry Transport)是一种轻量级、发布-订阅模式的通信协议,广泛用于物联网设备通信。本实现使用 MQTTnet 库,提供异步支持,并展示一个经典案例:物联网设备状态监控。


1. MQTT 协议简介

MQTT 是一种基于 TCP/IP 的协议,适合低带宽、高延迟或不稳定的网络环境。核心概念:

  • 发布/订阅:客户端向主题(Topic)发布消息,其他客户端订阅该主题接收消息。

  • QoS(服务质量):支持 0(最多一次)、1(至少一次)、2(仅一次)三种级别。

  • Broker:消息代理,负责路由消息(如 Mosquitto、EMQX)。

  • 轻量高效:适合资源受限的设备。


2. 封装 MQTT 客户端类

以下是封装的 MqttClientHelper 类,基于 MQTTnet 库,支持连接、发布、订阅和断开功能。

MqttClientHelper 类代码

csharp

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Serilog;

public class MqttClientHelper : IDisposable
{
    private readonly IMqttClient _mqttClient;
    private readonly MqttClientOptions _options;
    private readonly ILogger _logger;

    public event EventHandler<(string Topic, string Message)> MessageReceived; // 消息接收事件

    public MqttClientHelper(string broker, int port, string clientId, string username = null, string password = null)
    {
        // 初始化 MQTT 客户端
        var factory = new MqttFactory();
        _mqttClient = factory.CreateMqttClient();

        // 配置连接选项
        _options = new MqttClientOptionsBuilder()
            .WithTcpServer(broker, port)
            .WithCredentials(username, password)
            .WithClientId(clientId)
            .Build();

        // 初始化 Serilog
        _logger = Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Debug()
            .WriteTo.Console()
            .WriteTo.File("logs/mqtt_log_.txt", rollingInterval: RollingInterval.Day)
            .CreateLogger();

        // 注册事件
        _mqttClient.ConnectedAsync += async e => _logger.Information("已连接到 MQTT Broker: {Broker}:{Port}", broker, port);
        _mqttClient.DisconnectedAsync += async e => _logger.Warning("与 MQTT Broker 断开连接: {Reason}", e.Exception?.Message);
        _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceivedAsync;
    }

    // 异步连接
    public async Task ConnectAsync()
    {
        try
        {
            await _mqttClient.ConnectAsync(_options, CancellationToken.None);
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "连接 MQTT Broker 失败");
            throw;
        }
    }

    // 异步断开连接
    public async Task DisconnectAsync()
    {
        try
        {
            await _mqttClient.DisconnectAsync();
            _logger.Information("已断开 MQTT Broker 连接");
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "断开 MQTT Broker 连接失败");
            throw;
        }
    }

    // 异步发布消息
    public async Task PublishAsync(string topic, string payload, int qos = 1, bool retain = false)
    {
        try
        {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(Encoding.UTF8.GetBytes(payload))
                .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                .WithRetainFlag(retain)
                .Build();

            await _mqttClient.PublishAsync(message, CancellationToken.None);
            _logger.Information("发布消息: Topic={Topic}, Payload={Payload}", topic, payload);
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "发布消息失败: Topic={Topic}", topic);
            throw;
        }
    }

    // 异步订阅主题
    public async Task SubscribeAsync(string topic, int qos = 1)
    {
        try
        {
            var topicFilter = new MqttTopicFilterBuilder()
                .WithTopic(topic)
                .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                .Build();

            await _mqttClient.SubscribeAsync(new MqttClientSubscribeOptions
            {
                TopicFilters = new[] { topicFilter }
            });
            _logger.Information("已订阅主题: {Topic}", topic);
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "订阅主题失败: {Topic}", topic);
            throw;
        }
    }

    // 处理接收到的消息
    private Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        string topic = e.ApplicationMessage.Topic;
        string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
        _logger.Information("收到消息: Topic={Topic}, Payload={Payload}", topic, payload);
        MessageReceived?.Invoke(this, (topic, payload));
        return Task.CompletedTask;
    }

    // 实现 IDisposable
    public void Dispose()
    {
        try
        {
            DisconnectAsync().GetAwaiter().GetResult();
            _mqttClient?.Dispose();
            _logger.Information("MqttClientHelper 已释放");
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "释放 MqttClientHelper 失败");
        }
    }
}

关键特性

  1. 异步支持:所有操作(连接、发布、订阅、断开)使用 async/await。

  2. Serilog 日志:记录连接状态、消息发布/接收和错误信息。

  3. 事件驱动:通过 MessageReceived 事件通知收到消息。

  4. 灵活配置:支持 Broker 地址、端口、用户名/密码和 QoS 设置。

NuGet 依赖

在项目中添加以下 NuGet 包:

Install-Package MQTTnet
Install-Package Serilog
Install-Package Serilog.Sinks.Console
Install-Package Serilog.Sinks.File

3. 经典案例:物联网设备状态监控

场景描述

  • 需求:监控多个物联网设备(如温度传感器)的状态,设备定期向 MQTT 主题发布温度数据,客户端订阅主题并显示数据。

  • Broker:使用公共 MQTT Broker(如 broker.hivemq.com)或本地 Mosquitto。

  • 主题:sensors/temperature/device1(设备 1 的温度数据)。

  • 消息格式:JSON 格式,例如 {"deviceId": "device1", "temperature": 25.5}。

实现思路

  • 客户端订阅 sensors/temperature/#(通配符,接收所有设备温度)。

  • 设备发布温度数据到特定主题。

  • 客户端解析 JSON 数据并显示。


4. 使用 Demo

以下是一个控制台程序,展示如何使用 MqttClientHelper 实现设备状态监控。

Demo 代码

csharp

using System;
using System.Text.Json;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // 初始化 MQTT 客户端,连接到公共 Broker
        using var mqttClient = new MqttClientHelper(
            broker: "broker.hivemq.com",
            port: 1883,
            clientId: Guid.NewGuid().ToString()
        );

        // 注册消息接收事件
        mqttClient.MessageReceived += (sender, e) =>
        {
            try
            {
                var json = JsonDocument.Parse(e.Message);
                string deviceId = json.RootElement.GetProperty("deviceId").GetString();
                double temperature = json.RootElement.GetProperty("temperature").GetDouble();
                Console.WriteLine($"[{DateTime.Now}] 设备: {deviceId}, 温度: {temperature}°C, 主题: {e.Topic}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"解析消息失败: {ex.Message}");
            }
        };

        try
        {
            // 连接 Broker
            await mqttClient.ConnectAsync();
            Console.WriteLine("已连接到 MQTT Broker");

            // 订阅主题
            await mqttClient.SubscribeAsync("sensors/temperature/#", qos: 1);
            Console.WriteLine("已订阅 sensors/temperature/#");

            // 模拟设备发布消息(仅用于测试)
            await mqttClient.PublishAsync(
                topic: "sensors/temperature/device1",
                payload: "{\"deviceId\": \"device1\", \"temperature\": 25.5}",
                qos: 1
            );

            // 保持程序运行
            Console.WriteLine("按任意键退出...");
            Console.ReadKey();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"错误: {ex.Message}");
        }
        finally
        {
            await mqttClient.DisconnectAsync();
        }
    }
}

运行步骤

  1. 确保安装了 MQTTnet 和 Serilog NuGet 包。

  2. 运行程序,连接到 broker.hivemq.com:1883(无需用户名/密码)。

  3. 程序订阅 sensors/temperature/# 主题,并模拟发布一条温度数据。

  4. 接收到的消息会显示在控制台,并记录到日志文件(logs/mqtt_log_*.txt)。

示例输出

2025-06-06T08:42:15.1234567+08:00 [INF] 已连接到 MQTT Broker: broker.hivemq.com:1883
2025-06-06T08:42:15.2234567+08:00 [INF] 已订阅主题: sensors/temperature/#
2025-06-06T08:42:15.3234567+08:00 [INF] 发布消息: Topic=sensors/temperature/device1, Payload={"deviceId": "device1", "temperature": 25.5}
[2025-06-06 08:42:15] 设备: device1, 温度: 25.5°C, 主题: sensors/temperature/device1
按任意键退出...

5. WPF UI 集成

以下是一个简单的 WPF 应用程序,展示实时 MQTT 消息接收和发布。

XAML (MainWindow.xaml)

xaml

<Window x:Class="MqttWpfDemo.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="MQTT Monitor" Height="450" Width="800">
    <Grid Margin="10">
        <Grid.RowDefinitions>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="*"/>
            <RowDefinition Height="Auto"/>
        </Grid.RowDefinitions>

        <StackPanel Grid.Row="0" Orientation="Horizontal" Margin="0,0,0,10">
            <Button Content="连接" Width="100" Margin="5" Click="Connect_Click"/>
            <Button Content="断开" Width="100" Margin="5" Click="Disconnect_Click"/>
            <TextBox x:Name="TopicTextBox" Width="200" Margin="5" Text="sensors/temperature/#"/>
            <Button Content="订阅" Width="100" Margin="5" Click="Subscribe_Click"/>
        </StackPanel>

        <TextBox x:Name="MessageTextBox" Grid.Row="1" IsReadOnly="True" VerticalScrollBarVisibility="Auto" 
                 TextWrapping="Wrap" Margin="0,0,0,10"/>

        <StackPanel Grid.Row="2" Orientation="Horizontal">
            <TextBox x:Name="PublishTopicTextBox" Width="200" Margin="5" Text="sensors/temperature/device1"/>
            <TextBox x:Name="PublishMessageTextBox" Width="200" Margin="5" Text="{\"deviceId\": \"device1\", \"temperature\": 25.5}"/>
            <Button Content="发布" Width="100" Margin="5" Click="Publish_Click"/>
        </StackPanel>
    </Grid>
</Window>

C# 代码 (MainWindow.xaml.cs)

csharp

using System;
using System.Windows;
using System.Text.Json;

namespace MqttWpfDemo
{
    public partial class MainWindow : Window
    {
        private MqttClientHelper _mqttClient;

        public MainWindow()
        {
            InitializeComponent();
            _mqttClient = new MqttClientHelper("broker.hivemq.com", 1883, Guid.NewGuid().ToString());
            _mqttClient.MessageReceived += (sender, e) =>
            {
                Dispatcher.Invoke(() =>
                {
                    try
                    {
                        var json = JsonDocument.Parse(e.Message);
                        string deviceId = json.RootElement.GetProperty("deviceId").GetString();
                        double temperature = json.RootElement.GetProperty("temperature").GetDouble();
                        MessageTextBox.AppendText($"[{DateTime.Now}] 设备: {deviceId}, 温度: {temperature}°C, 主题: {e.Topic}\n");
                    }
                    catch
                    {
                        MessageTextBox.AppendText($"[{DateTime.Now}] 收到消息: {e.Message} (主题: {e.Topic})\n");
                    }
                });
            };
        }

        private async void Connect_Click(object sender, RoutedEventArgs e)
        {
            try
            {
                await _mqttClient.ConnectAsync();
                MessageBox.Show("已连接到 MQTT Broker");
            }
            catch (Exception ex)
            {
                MessageBox.Show($"连接失败: {ex.Message}");
            }
        }

        private async void Disconnect_Click(object sender, RoutedEventArgs e)
        {
            try
            {
                await _mqttClient.DisconnectAsync();
                MessageBox.Show("已断开连接");
            }
            catch (Exception ex)
            {
                MessageBox.Show($"断开失败: {ex.Message}");
            }
        }

        private async void Subscribe_Click(object sender, RoutedEventArgs e)
        {
            if (string.IsNullOrEmpty(TopicTextBox.Text)) return;

            try
            {
                await _mqttClient.SubscribeAsync(TopicTextBox.Text, qos: 1);
                MessageBox.Show($"已订阅主题: {TopicTextBox.Text}");
            }
            catch (Exception ex)
            {
                MessageBox.Show($"订阅失败: {ex.Message}");
            }
        }

        private async void Publish_Click(object sender, RoutedEventArgs e)
        {
            if (string.IsNullOrEmpty(PublishTopicTextBox.Text) || string.IsNullOrEmpty(PublishMessageTextBox.Text)) return;

            try
            {
                await _mqttClient.PublishAsync(PublishTopicTextBox.Text, PublishMessageTextBox.Text, qos: 1);
                MessageTextBox.AppendText($"[{DateTime.Now}] 发布: {PublishMessageTextBox.Text} 到 {PublishTopicTextBox.Text}\n");
            }
            catch (Exception ex)
            {
                MessageBox.Show($"发布失败: {ex.Message}");
            }
        }

        protected override void OnClosed(EventArgs e)
        {
            _mqttClient?.Dispose();
            base.OnClosed(e);
        }
    }
}

UI 功能

  • 连接/断开 MQTT Broker。

  • 订阅指定主题(如 sensors/temperature/#)。

  • 发布消息到指定主题。

  • 实时显示接收到的消息,解析 JSON 格式的温度数据。


6. 注意事项

  1. Broker 配置:

    • 使用公共 Broker(如 broker.hivemq.com)进行测试,或部署本地 Mosquitto/EMQX。

    • 如果 Broker 需要认证,设置 username 和 password。

  2. 主题设计:

    • 使用层次结构(如 sensors/temperature/device1)便于管理。

    • 通配符(# 和 +)适合订阅多个主题。

  3. QoS 选择:

    • QoS 0:快速但可能丢失消息。

    • QoS 1:确保消息到达,但可能重复。

    • QoS 2:确保消息仅到达一次,适合关键数据。

  4. 错误处理:

    • 捕获连接失败、发布/订阅错误。

    • 检查网络状态,处理断开重连(MQTTnet 支持自动重连)。

  5. 性能优化:

    • 避免订阅过多主题,降低 Broker 负载。

    • 使用异步方法防止 UI 阻塞。

  6. 日志管理:

    • Serilog 日志文件路径需确保写入权限。

    • 可添加其他 Sink(如数据库)存储日志。

  7. 安全性:

    • 生产环境使用 TLS(WithTls 方法)加密通信。

    • 配置用户名/密码或客户端证书。


7. 扩展建议

  • 断开重连:配置 MqttClientOptions 的 AutoReconnectDelay 实现自动重连。

  • 复杂协议:支持更复杂的消息格式(如 Protobuf)或自定义协议解析。

  • 数据可视化:在 WPF 中添加图表(如 LiveCharts)显示温度趋势。

  • 多设备管理:维护设备列表,动态订阅多个设备主题。


8. 总结

  • MqttClientHelper:封装了 MQTT 连接、发布、订阅功能,支持异步和日志记录。

  • 经典案例:实现物联网设备状态监控,订阅温度数据并显示。

  • WPF Demo:提供交互式界面,实时显示消息。

  • 注意事项:关注 Broker 配置、QoS 选择和错误处理。

如果您需要进一步扩展(如添加 TLS、断开重连或更复杂的 UI),请告诉我!

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐