docker安装部署emqx以及mqtt(emqx)Java代码连接客户端案例
docker命令直接拉取镜像

docker pull emqx/emqx:5.3.0


2.运行以下命令启动 Docker 容器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.3.0


3.创建挂载目录(需要持久化数据)

mkdir -p /home/admin/emqx


4.复制容器文件到挂载目录

docker cp emqx:/opt/emqx/bin /home/admin/emqx
docker cp emqx:/opt/emqx/etc /home/admin/emqx
docker cp emqx:/opt/emqx/lib /home/admin/emqx
docker cp emqx:/opt/emqx/data /home/admin/emqx
docker cp emqx:/opt/emqx/log /home/admin/emqx



5.修改权限

chown -R 1000:1000 /home/admin/emqx/
chmod -R 777 /home/admin/emqx/


6.删除掉之前运行的emqx容器

docker rm -f emqx


7.重新运行


8.直接浏览器输入服务地址http://xxx:18083/
默认账户密码admin/public

9.emqx客户端连接java代码案例
maven依赖       

 <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>


创建 MQTT 连接
本文将使用 EMQX 提供的 免费公共 MQTT 服务器,该服务基于 EMQX 的 MQTT 云平台 创建。服务器接入信息如下:

Broker: broker.emqx.io(中国用户可以使用 broker-cn.emqx.io)
TCP Port: 1883
SSL/TLS Port: 888
普通 TCP 连接
设置 MQTT Broker 基本连接参数,用户名、密码为非必选参数。

String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";


然后创建 MQTT 客户端并连接。

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);


参数说明:

MqttClient: 同步调用客户端,使用阻塞方法通信。
MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息,使其能够传递到指定的 QoS。
MqttConnectOptions: 连接选项,用于指定连接的参数,下面列举一些常见的方法。
setUserName: 设置用户名
setPassword: 设置密码
setCleanSession: 设置是否清除会话
setKeepAliveInterval: 设置心跳间隔
setConnectionTimeout: 设置连接超时时间
setAutomaticReconnect: 设置是否自动重连
qos:

0-表示消息最多只能传递一次(零次或一次)。消息不会持久化到磁盘,也不会在网络上得到确认。此QoS是最快的,但只应用于没有价值的消息-请注意,如果服务器无法处理消息(例如,存在授权问题),则会进行MqttCallback。交付完成(IMqttDeliveryToken)。也被称为“火和遗忘”。
1-表示消息应至少传递一次(一次或多次)。只有当消息可以持久化时,才能安全地传递消息,因此应用程序必须使用MqttConnectOptions提供持久化方法。如果未指定持久性机制,则在客户端发生故障时将不会传递消息。该消息将在网络上得到确认。这是默认的QoS。
2-表示消息应传递一次。该消息将被持久化到磁盘,并将在整个网络中进行两阶段确认。只有能够持久化,该消息才能安全传递,因此应用程序必须使用MqttConnectOptions提供持久化方法。如果未指定持久性机制,则在客户端发生故障时将不会传递消息。

完整代码

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttSample {
    public static void main(String[] args) {
        String topic = "test/topic";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = MqttClient.generateClientId();
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName("emqx_user");
        connOpts.setPassword("emqx_password".toCharArray());
        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            client.setCallback(new SampleCallback());
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);
            client.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(topic, message);
            System.out.println("Message published");
            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }

}
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐