1.添加依赖

<dependency>
    <groupId>org.jetlinks</groupId>
    <artifactId>netty-mqtt-client</artifactId>
    <version>1.0.0</version>
</dependency>

2.源码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.CharsetUtil;
import org.jetlinks.mqtt.client.*;

public class MqttClientTest {

    public static String username = "admin";
    public static String password = "123456";
    public static String url = "localhost";
    public static int port = 1884;
    public static String clientId = "client9527";
    public static int completionTimeout = 300;
    public static int keepAliveInterval = 20;

    public static MqttClientConfig getMqttClientConfig() {
        MqttClientConfig mqttClientConfig = new MqttClientConfig();
        mqttClientConfig.setClientId(clientId);
        mqttClientConfig.setUsername(username);
        mqttClientConfig.setPassword(password);
        mqttClientConfig.setTimeoutSeconds(completionTimeout);
        mqttClientConfig.setRetryInterval(keepAliveInterval);
        mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1);
        mqttClientConfig.setReconnect(true);
        return mqttClientConfig;
    }

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);

        MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(),null);
        mqttClient.setEventLoop(loop);
        mqttClient.setCallback(new MqttClientCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                cause.printStackTrace();
            }

            @Override
            public void onSuccessfulReconnect() {
                System.out.println("客户端已重连");
            }
        });
        mqttClient.connect(url, port).await();
        System.out.println("客户端连接成功");
        String test = "I am client";
        byte[] bytes = test.getBytes();
        ByteBuf byteBuf = Unpooled.copiedBuffer(bytes);
        mqttClient.publish("test/pub/001",byteBuf, MqttQoS.AT_MOST_ONCE,false);
        System.out.println("消息已发布");
        //#为多层通配符,+为单层通配符
        mqttClient.on("test/sub/#", (topic, payload) -> {
                System.out.println("消息主题:" + topic);
                System.out.println("消息内容:" + payload.toString(CharsetUtil.UTF_8));
            });
    }
}

3.启动测试

客户端连接成功
消息已发布

如下图已经收到主题为test/pub/001的消息
在这里插入图片描述
发送主题为test/sub/001的消息
在这里插入图片描述
查看控制台,消息已收到

消息主题:test/sub/001
消息内容:I am client9526
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐