基于 MQTT 协议的 C# 实现,包括封装类、经典案例和使用 Demo。MQTT是一种轻量级、发布-订阅模式的通信协议,广泛用于物联网设备通信
需求:监控多个物联网设备(如温度传感器)的状态,设备定期向 MQTT 主题发布温度数据,客户端订阅主题并显示数据。主题:sensors/temperature/device1(设备 1 的温度数据)。客户端订阅 sensors/temperature/#(通配符,接收所有设备温度)。QoS(服务质量):支持 0(最多一次)、1(至少一次)、2(仅一次)三种级别。发布/订阅:客户端向主题(Topic
基于 MQTT 协议的 C# 实现,包括封装类、经典案例和使用 Demo。MQTT(Message Queuing Telemetry Transport)是一种轻量级、发布-订阅模式的通信协议,广泛用于物联网设备通信。本实现使用 MQTTnet 库,提供异步支持,并展示一个经典案例:物联网设备状态监控。
1. MQTT 协议简介
MQTT 是一种基于 TCP/IP 的协议,适合低带宽、高延迟或不稳定的网络环境。核心概念:
-
发布/订阅:客户端向主题(Topic)发布消息,其他客户端订阅该主题接收消息。
-
QoS(服务质量):支持 0(最多一次)、1(至少一次)、2(仅一次)三种级别。
-
Broker:消息代理,负责路由消息(如 Mosquitto、EMQX)。
-
轻量高效:适合资源受限的设备。
2. 封装 MQTT 客户端类
以下是封装的 MqttClientHelper 类,基于 MQTTnet 库,支持连接、发布、订阅和断开功能。
MqttClientHelper 类代码
csharp
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
public class MqttClientHelper : IDisposable
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _options;
private readonly ILogger _logger;
public event EventHandler<(string Topic, string Message)> MessageReceived; // 消息接收事件
public MqttClientHelper(string broker, int port, string clientId, string username = null, string password = null)
{
// 初始化 MQTT 客户端
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
// 配置连接选项
_options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port)
.WithCredentials(username, password)
.WithClientId(clientId)
.Build();
// 初始化 Serilog
_logger = Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console()
.WriteTo.File("logs/mqtt_log_.txt", rollingInterval: RollingInterval.Day)
.CreateLogger();
// 注册事件
_mqttClient.ConnectedAsync += async e => _logger.Information("已连接到 MQTT Broker: {Broker}:{Port}", broker, port);
_mqttClient.DisconnectedAsync += async e => _logger.Warning("与 MQTT Broker 断开连接: {Reason}", e.Exception?.Message);
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceivedAsync;
}
// 异步连接
public async Task ConnectAsync()
{
try
{
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
}
catch (Exception ex)
{
_logger.Error(ex, "连接 MQTT Broker 失败");
throw;
}
}
// 异步断开连接
public async Task DisconnectAsync()
{
try
{
await _mqttClient.DisconnectAsync();
_logger.Information("已断开 MQTT Broker 连接");
}
catch (Exception ex)
{
_logger.Error(ex, "断开 MQTT Broker 连接失败");
throw;
}
}
// 异步发布消息
public async Task PublishAsync(string topic, string payload, int qos = 1, bool retain = false)
{
try
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(Encoding.UTF8.GetBytes(payload))
.WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
.WithRetainFlag(retain)
.Build();
await _mqttClient.PublishAsync(message, CancellationToken.None);
_logger.Information("发布消息: Topic={Topic}, Payload={Payload}", topic, payload);
}
catch (Exception ex)
{
_logger.Error(ex, "发布消息失败: Topic={Topic}", topic);
throw;
}
}
// 异步订阅主题
public async Task SubscribeAsync(string topic, int qos = 1)
{
try
{
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
.Build();
await _mqttClient.SubscribeAsync(new MqttClientSubscribeOptions
{
TopicFilters = new[] { topicFilter }
});
_logger.Information("已订阅主题: {Topic}", topic);
}
catch (Exception ex)
{
_logger.Error(ex, "订阅主题失败: {Topic}", topic);
throw;
}
}
// 处理接收到的消息
private Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
_logger.Information("收到消息: Topic={Topic}, Payload={Payload}", topic, payload);
MessageReceived?.Invoke(this, (topic, payload));
return Task.CompletedTask;
}
// 实现 IDisposable
public void Dispose()
{
try
{
DisconnectAsync().GetAwaiter().GetResult();
_mqttClient?.Dispose();
_logger.Information("MqttClientHelper 已释放");
}
catch (Exception ex)
{
_logger.Error(ex, "释放 MqttClientHelper 失败");
}
}
}
关键特性
-
异步支持:所有操作(连接、发布、订阅、断开)使用 async/await。
-
Serilog 日志:记录连接状态、消息发布/接收和错误信息。
-
事件驱动:通过 MessageReceived 事件通知收到消息。
-
灵活配置:支持 Broker 地址、端口、用户名/密码和 QoS 设置。
NuGet 依赖
在项目中添加以下 NuGet 包:
Install-Package MQTTnet
Install-Package Serilog
Install-Package Serilog.Sinks.Console
Install-Package Serilog.Sinks.File
3. 经典案例:物联网设备状态监控
场景描述
-
需求:监控多个物联网设备(如温度传感器)的状态,设备定期向 MQTT 主题发布温度数据,客户端订阅主题并显示数据。
-
Broker:使用公共 MQTT Broker(如 broker.hivemq.com)或本地 Mosquitto。
-
主题:sensors/temperature/device1(设备 1 的温度数据)。
-
消息格式:JSON 格式,例如 {"deviceId": "device1", "temperature": 25.5}。
实现思路
-
客户端订阅 sensors/temperature/#(通配符,接收所有设备温度)。
-
设备发布温度数据到特定主题。
-
客户端解析 JSON 数据并显示。
4. 使用 Demo
以下是一个控制台程序,展示如何使用 MqttClientHelper 实现设备状态监控。
Demo 代码
csharp
using System;
using System.Text.Json;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
// 初始化 MQTT 客户端,连接到公共 Broker
using var mqttClient = new MqttClientHelper(
broker: "broker.hivemq.com",
port: 1883,
clientId: Guid.NewGuid().ToString()
);
// 注册消息接收事件
mqttClient.MessageReceived += (sender, e) =>
{
try
{
var json = JsonDocument.Parse(e.Message);
string deviceId = json.RootElement.GetProperty("deviceId").GetString();
double temperature = json.RootElement.GetProperty("temperature").GetDouble();
Console.WriteLine($"[{DateTime.Now}] 设备: {deviceId}, 温度: {temperature}°C, 主题: {e.Topic}");
}
catch (Exception ex)
{
Console.WriteLine($"解析消息失败: {ex.Message}");
}
};
try
{
// 连接 Broker
await mqttClient.ConnectAsync();
Console.WriteLine("已连接到 MQTT Broker");
// 订阅主题
await mqttClient.SubscribeAsync("sensors/temperature/#", qos: 1);
Console.WriteLine("已订阅 sensors/temperature/#");
// 模拟设备发布消息(仅用于测试)
await mqttClient.PublishAsync(
topic: "sensors/temperature/device1",
payload: "{\"deviceId\": \"device1\", \"temperature\": 25.5}",
qos: 1
);
// 保持程序运行
Console.WriteLine("按任意键退出...");
Console.ReadKey();
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
finally
{
await mqttClient.DisconnectAsync();
}
}
}
运行步骤
-
确保安装了 MQTTnet 和 Serilog NuGet 包。
-
运行程序,连接到 broker.hivemq.com:1883(无需用户名/密码)。
-
程序订阅 sensors/temperature/# 主题,并模拟发布一条温度数据。
-
接收到的消息会显示在控制台,并记录到日志文件(logs/mqtt_log_*.txt)。
示例输出
2025-06-06T08:42:15.1234567+08:00 [INF] 已连接到 MQTT Broker: broker.hivemq.com:1883
2025-06-06T08:42:15.2234567+08:00 [INF] 已订阅主题: sensors/temperature/#
2025-06-06T08:42:15.3234567+08:00 [INF] 发布消息: Topic=sensors/temperature/device1, Payload={"deviceId": "device1", "temperature": 25.5}
[2025-06-06 08:42:15] 设备: device1, 温度: 25.5°C, 主题: sensors/temperature/device1
按任意键退出...
5. WPF UI 集成
以下是一个简单的 WPF 应用程序,展示实时 MQTT 消息接收和发布。
XAML (MainWindow.xaml)
xaml
<Window x:Class="MqttWpfDemo.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="MQTT Monitor" Height="450" Width="800">
<Grid Margin="10">
<Grid.RowDefinitions>
<RowDefinition Height="Auto"/>
<RowDefinition Height="*"/>
<RowDefinition Height="Auto"/>
</Grid.RowDefinitions>
<StackPanel Grid.Row="0" Orientation="Horizontal" Margin="0,0,0,10">
<Button Content="连接" Width="100" Margin="5" Click="Connect_Click"/>
<Button Content="断开" Width="100" Margin="5" Click="Disconnect_Click"/>
<TextBox x:Name="TopicTextBox" Width="200" Margin="5" Text="sensors/temperature/#"/>
<Button Content="订阅" Width="100" Margin="5" Click="Subscribe_Click"/>
</StackPanel>
<TextBox x:Name="MessageTextBox" Grid.Row="1" IsReadOnly="True" VerticalScrollBarVisibility="Auto"
TextWrapping="Wrap" Margin="0,0,0,10"/>
<StackPanel Grid.Row="2" Orientation="Horizontal">
<TextBox x:Name="PublishTopicTextBox" Width="200" Margin="5" Text="sensors/temperature/device1"/>
<TextBox x:Name="PublishMessageTextBox" Width="200" Margin="5" Text="{\"deviceId\": \"device1\", \"temperature\": 25.5}"/>
<Button Content="发布" Width="100" Margin="5" Click="Publish_Click"/>
</StackPanel>
</Grid>
</Window>
C# 代码 (MainWindow.xaml.cs)
csharp
using System;
using System.Windows;
using System.Text.Json;
namespace MqttWpfDemo
{
public partial class MainWindow : Window
{
private MqttClientHelper _mqttClient;
public MainWindow()
{
InitializeComponent();
_mqttClient = new MqttClientHelper("broker.hivemq.com", 1883, Guid.NewGuid().ToString());
_mqttClient.MessageReceived += (sender, e) =>
{
Dispatcher.Invoke(() =>
{
try
{
var json = JsonDocument.Parse(e.Message);
string deviceId = json.RootElement.GetProperty("deviceId").GetString();
double temperature = json.RootElement.GetProperty("temperature").GetDouble();
MessageTextBox.AppendText($"[{DateTime.Now}] 设备: {deviceId}, 温度: {temperature}°C, 主题: {e.Topic}\n");
}
catch
{
MessageTextBox.AppendText($"[{DateTime.Now}] 收到消息: {e.Message} (主题: {e.Topic})\n");
}
});
};
}
private async void Connect_Click(object sender, RoutedEventArgs e)
{
try
{
await _mqttClient.ConnectAsync();
MessageBox.Show("已连接到 MQTT Broker");
}
catch (Exception ex)
{
MessageBox.Show($"连接失败: {ex.Message}");
}
}
private async void Disconnect_Click(object sender, RoutedEventArgs e)
{
try
{
await _mqttClient.DisconnectAsync();
MessageBox.Show("已断开连接");
}
catch (Exception ex)
{
MessageBox.Show($"断开失败: {ex.Message}");
}
}
private async void Subscribe_Click(object sender, RoutedEventArgs e)
{
if (string.IsNullOrEmpty(TopicTextBox.Text)) return;
try
{
await _mqttClient.SubscribeAsync(TopicTextBox.Text, qos: 1);
MessageBox.Show($"已订阅主题: {TopicTextBox.Text}");
}
catch (Exception ex)
{
MessageBox.Show($"订阅失败: {ex.Message}");
}
}
private async void Publish_Click(object sender, RoutedEventArgs e)
{
if (string.IsNullOrEmpty(PublishTopicTextBox.Text) || string.IsNullOrEmpty(PublishMessageTextBox.Text)) return;
try
{
await _mqttClient.PublishAsync(PublishTopicTextBox.Text, PublishMessageTextBox.Text, qos: 1);
MessageTextBox.AppendText($"[{DateTime.Now}] 发布: {PublishMessageTextBox.Text} 到 {PublishTopicTextBox.Text}\n");
}
catch (Exception ex)
{
MessageBox.Show($"发布失败: {ex.Message}");
}
}
protected override void OnClosed(EventArgs e)
{
_mqttClient?.Dispose();
base.OnClosed(e);
}
}
}
UI 功能
-
连接/断开 MQTT Broker。
-
订阅指定主题(如 sensors/temperature/#)。
-
发布消息到指定主题。
-
实时显示接收到的消息,解析 JSON 格式的温度数据。
6. 注意事项
-
Broker 配置:
-
使用公共 Broker(如 broker.hivemq.com)进行测试,或部署本地 Mosquitto/EMQX。
-
如果 Broker 需要认证,设置 username 和 password。
-
-
主题设计:
-
使用层次结构(如 sensors/temperature/device1)便于管理。
-
通配符(# 和 +)适合订阅多个主题。
-
-
QoS 选择:
-
QoS 0:快速但可能丢失消息。
-
QoS 1:确保消息到达,但可能重复。
-
QoS 2:确保消息仅到达一次,适合关键数据。
-
-
错误处理:
-
捕获连接失败、发布/订阅错误。
-
检查网络状态,处理断开重连(MQTTnet 支持自动重连)。
-
-
性能优化:
-
避免订阅过多主题,降低 Broker 负载。
-
使用异步方法防止 UI 阻塞。
-
-
日志管理:
-
Serilog 日志文件路径需确保写入权限。
-
可添加其他 Sink(如数据库)存储日志。
-
-
安全性:
-
生产环境使用 TLS(WithTls 方法)加密通信。
-
配置用户名/密码或客户端证书。
-
7. 扩展建议
-
断开重连:配置 MqttClientOptions 的 AutoReconnectDelay 实现自动重连。
-
复杂协议:支持更复杂的消息格式(如 Protobuf)或自定义协议解析。
-
数据可视化:在 WPF 中添加图表(如 LiveCharts)显示温度趋势。
-
多设备管理:维护设备列表,动态订阅多个设备主题。
8. 总结
-
MqttClientHelper:封装了 MQTT 连接、发布、订阅功能,支持异步和日志记录。
-
经典案例:实现物联网设备状态监控,订阅温度数据并显示。
-
WPF Demo:提供交互式界面,实时显示消息。
-
注意事项:关注 Broker 配置、QoS 选择和错误处理。
如果您需要进一步扩展(如添加 TLS、断开重连或更复杂的 UI),请告诉我!
更多推荐
所有评论(0)