1 搭建MQTT服务器

1.1 Dockerfile 内容

FROM rabbitmq:3.11.6-management
COPY install_rabbitmq_plus.sh /usr/local/
RUN  chmod 777 /usr/local/install_rabbitmq_plus.sh 
RUN  /bin/sh /usr/local/install_rabbitmq_plus.sh 

1.2 容器中需要安装插件的命令  放在 install_rabbitmq_plus.sh 文件中

rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt

1.3 构件容器需要用到的yml配置 docker-compose-RabbitMQ.yaml

version: "3.6"
services:
  #服务
  rabbitmq:
    build:          #镜像构建
      context:  .   #构建镜像时所在的资源路径
      dockerfile: Dockerfile    #构建镜像时需要的dockerfile文件路径
    ports:
      - 5672:5672
      - 15672:15672
      # mqtt端口
      - 15675:15675
      - 1883:1883
    volumes:
      - ./data/rabbitmq:/var/lib/rabbitmq
    environment:
      - TZ=Asia/Shanghai
      - RABBITMQ_DEFAULT_USER=root
      - RABBITMQ_DEFAULT_PASS=1234
    restart: always

 1.4 docker-compose 启动RabbitMQ以及MQTT插件服务

docker-compose -f docker-compose-RabbitMQ.yaml up --build  -d

2  .net core 实现


using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
public class XJ_MQTT
    {
        public MqttClient GetClient()
        {
            MqttClient mqttClient = new MQTTnet.MqttFactory().CreateMqttClient() as MqttClient;
            mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync1;
            mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
            mqttClient.ConnectingAsync += MqttClient_ConnectingAsync;
            mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
            mqttClient.InspectPacketAsync += MqttClient_InspectPacketAsync;
            try
            {
                MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder();
                optionsBuilder.WithTcpServer("mqtt的服务器IP", 1883);
                string id = Guid.NewGuid().ToString("N");
                optionsBuilder.WithClientId(id);
                optionsBuilder.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);
                optionsBuilder.WithCredentials("root", "1234");
                mqttClient.ConnectAsync(optionsBuilder.Build()).Wait();
            }
            catch (Exception e)
            {
                Console.WriteLine($"连接到MQTT服务器失败" + Environment.NewLine + e.Message + Environment.NewLine);

            }
            return mqttClient;
        }

        private Task MqttClient_InspectPacketAsync(MQTTnet.Diagnostics.InspectMqttPacketEventArgs arg)
        {

            Console.WriteLine($"MqttClient_InspectPacketAsync :{UTF8Encoding.UTF8.GetString(arg.Buffer)}" + Environment.NewLine);
            return Task.CompletedTask;
        }

        private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            Console.WriteLine("已断开MQTT服务器" + Environment.NewLine);
            return Task.CompletedTask;
        }

        private Task MqttClient_ConnectingAsync(MqttClientConnectingEventArgs arg)
        {

            Console.WriteLine("链接MQTT服务器中...." + Environment.NewLine);
            return Task.CompletedTask;
        }

        private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine("已连接到MQTT服务器" + Environment.NewLine);
            return Task.CompletedTask;
        }

        private Task MqttClient_ApplicationMessageReceivedAsync1(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine($">>ClientId:{arg.ClientId} \r\nTopic:{arg.ApplicationMessage.Topic}\r\nPayload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}{Environment.NewLine}" + Environment.NewLine);
            return Task.CompletedTask;
        }

    }

2.1 实现发布 

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;


var mq = new XJ_MQTT();
var client = mq.GetClient();
while (true)
{
    string input = Console.ReadLine();
    client.PublishStringAsync("testTpic", "payload" + Guid.NewGuid().ToString("N") + ":" + input);
}

2.2 实现订阅

using MQTTnet.Packets;
var mq = new XJ_MQTT();
var client = mq.GetClient();
client.SubscribeAsync(new MQTTnet.Client.MqttClientSubscribeOptions()
{
    TopicFilters = new List<MqttTopicFilter>() { new MqttTopicFilter() { Topic = "testTpic" } }
});
while (true)
    Console.ReadLine();

3 运行效果

 

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐