Docker安装emqx

安装emqx

获取 docker 镜像 最新

docker pull emqx/emqx

启动容器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx

1883 连接端口
18083 后台端口 ip:18083
默认用户 admin
默认密码 public
首次登录会提示修改密码

在这里插入图片描述

.net6 连接

添加nuget包

MQTTnet

封装

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using System.Text;
	/// <summary>
    /// mqtt客户端
    /// </summary>
    public class MqttClientBase
    {
        /// <summary>
        /// 客户端
        /// </summary>
        public IMqttClient client;

        /// <summary>
        /// 订阅主题列表
        /// </summary>
        public List<MqttTopicFilter> Topics;



        /// <summary>
        /// 初始化客户端
        /// </summary>
        /// <param name="server">服务器地址</param>
        /// <param name="port">服务器端口 一般为1883</param>
        /// <param name="username">用户名</param>
        /// <param name="password">密码</param>
        /// <param name="cliendID">客户端id</param>
        /// <param name="OnConnected">连接成功事件</param>
        public MqttClientBase(string server,int port,string username,string password,string cliendID, Action<MqttClientConnectedEventArgs>? OnConnected = null)
        {
            //创建客户端
            client = new MqttFactory().CreateMqttClient();

            //订阅主题列表
            Topics = new List<MqttTopicFilter>();

            //客户端参数
            var mqttOptions = new MqttClientOptions()
            {
                ClientId = cliendID,//客户端id
                ChannelOptions = new MqttClientTcpOptions()
                {
                    Server = server,//服务器地址
                    Port = port//服务器端口  一般为1883
                },
                //设置用户和密码
                Credentials = new MqttClientCredentials(username, Encoding.UTF8.GetBytes(password)),
                CleanSession = false,
                //设置心跳
                KeepAlivePeriod = TimeSpan.FromSeconds(30),
            };

            if (OnConnected != null)
            {
                //Mqtt客户端连接成功
                client.ConnectedAsync += (arg) =>
                {
                    OnConnected(arg);
                    return Task.CompletedTask;
                };
            }


            //连接服务器,这里需要等待异步完成连接,否则后面会报错
            var res=client.ConnectAsync(mqttOptions).Result;

        }

        /// <summary>
        /// 连接断开 事件
        /// </summary>
        public void OnDisconnected(Action<MqttClientDisconnectedEventArgs> action)
        {
            //Mqtt客户端连接断开
            client.DisconnectedAsync += (arg) =>
            {
                action(arg);
                return Task.CompletedTask;
            };
        }

        /// <summary>
        /// 接收到消息事件
        /// 参数1     topic       订阅主题
        /// 参数2     data        解析成字符串的内容
        /// 参数3     arg         原始消息内容
        /// </summary>
        public void OnMessage(Action<string, string?, MqttApplicationMessageReceivedEventArgs> action)
        {

            //接收消息
            client.ApplicationMessageReceivedAsync += (arg) => {
                var buff = arg.ApplicationMessage.Payload;
                var data = buff.BToString();
                action(arg.ApplicationMessage.Topic, data ?? null, arg);
                return Task.CompletedTask;
            };
        }



        /// <summary>
        /// 添加订阅主题
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task AddTopic(string topic)
        {
            //将主题名称加入列表
            Topics.Add(new MqttTopicFilter() { Topic = topic });

            //更新订阅
            client.SubscribeAsync(new MqttClientSubscribeOptions()
            {
                TopicFilters = Topics
            });

        }


        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic">主题</param>
        /// <param name="content">内容</param>
        /// <returns></returns>
        public async Task Publish(string topic, string content)
        {

            client.PublishAsync(new MqttApplicationMessage()
            {
                Topic = topic,
                Payload = Encoding.UTF8.GetBytes(content)
            });
        }

    }

使用

创建客户端
MqttClientBase mqtt = new MqttClientBase("ip", 端口, "用户名", "密码", "客户端id");

// 带连接成功事件
//MqttClientBase mqtt = new MqttClientBase("ip", 端口, "用户名", "密码", "客户端id", arg => {
//    Console.WriteLine("连接成功");
//});

添加订阅
mqtt.AddTopic("666");
mqtt.AddTopic("777");
连接断开事件
mqtt.OnDisconnected(arg => {
    Console.WriteLine("连接断开");
});
接收订阅消息事件
mqtt.OnMessage((topic,data,arg) => {
    Console.WriteLine($"{topic}===={data}");
});
发布消息
mqtt.Publish("666",x);
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐