环境: php7.2

swoole : 4.x

1. composer安装MQTT库

composer require simps/mqtt

2.在根目录创建Server.php

<?php

use Simps\MQTT\ProtocolV5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Types;

class Server
{
    protected static $host = '0.0.0.0';
    protected static $port = 1883;
    protected $_mqtt;

    public function __construct()
    {
        try{
            @ini_set('default_socket_timeout', -1);
            require_once __DIR__ . '/vendor/autoload.php';

            $this->_mqtt = new Swoole\Server(self::$host, self::$port, SWOOLE_BASE);
            $this->_mqtt->set(['open_mqtt_protocol' => true, // 启用 MQTT 协议
            'worker_num' => 2,
            'reactor_num' => 4,
            'reload_async' => true,//设置异步重启开关。设置为 true 时,将启用异步安全重启特性,Worker 进程会等待异步事件完成后再退出
            'dispatch_mode' => 3,//数据包分发策略。【默认值:2】
            'daemonize' => false,//是否后台运行
            'open_cpu_affinity' => true,//启用 CPU 亲和性设置
                'package_max_length' => 2 * 1024 * 1024,
            ]);
            $this->_mqtt->on('connect', [$this, 'onConnect']);
            $this->_mqtt->on('receive', [$this, 'onReceive']);
            $this->_mqtt->on('close', [$this, 'onClose']);
            $this->_mqtt->start();
        }catch(Exception $e){
            echo $e->getMessage();
        }
    }

    // public function getInstance()
    // {
    //     if(is_null(self::$_instance)){
    //         self::$_instance = new self();
    //     }

    //     return self::$_instance;
    // }

    public function onConnect($server, $fd, $from_id)
    {
        
        echo "客户端{$fd}进来了.\n";
    }

    public function onReceive(Swoole\Server $server, $fd, $from_id, $data)
    {
        try {
            // debug
            $data = ProtocolV5::unpack($data);
            // print_r($data);
            if (is_array($data) && isset($data['type'])) {
                $redis = new Redis();
                $redis->connect('127.0.0.1', 6379);
                switch ($data['type']) {
                    case Types::CONNECT: // 客户端链接
                        // Check protocol_name
                        if ($data['protocol_name'] != 'MQTT') {
                            //$server->close($fd);
                            return false;
                        }
                        //用户名密码连接
                        if (!isset($data['user_name']) || empty($data['user_name']) || !isset($data['password']) || empty($data['password'])) {
                            //$server->close($fd);
                            return false;//用户密码为空
                        }
                        
                        // Check connection information, etc.
                        //遗嘱
                        if (isset($data['will']['topic']) && !empty($data['will']['topic'])) {
                            $redis->hSet('will', $fd, json_encode(['message' => $data['will']['message'], 'qos' => $data['will']['qos'], 'topic' => $data['will']['topic'], 'retain' => $data['will']['retain']]));
                        }

                        $redis->del($fd);
                        $server->send($fd, ProtocolV5::pack([
                            'type' => Types::CONNACK,
                            'code' => 0,
                            'session_present' => 0,
                            'properties' => [
                                'maximum_packet_size' => 1048576,
                                'retain_available' => true,
                                'shared_subscription_available' => true,
                                'subscription_identifier_available' => true,
                                'topic_alias_maximum' => 65535, //0
                                'wildcard_subscription_available' => true,
                            ],
                        ]));//确认连接请求
                        $redis->close();
                        break;
                    case Types::PINGREQ: // 心跳服务
                        $redis->del($fd);
                        if ($server->exist($fd)) {
                            $server->send($fd, ProtocolV5::pack(['type' => Types::PINGRESP]));
                        }else{
                            $server->close();
                        }
                        $redis->close();
                        break;
                    case Types::DISCONNECT: // 转发服务
                        // if (!$server->exist($fd)) {
                        //     //$server->close($fd);
                        // }
                        if ($server->exist($fd)) {
                            //转发遗嘱
                            $will = $redis->hGet('will', $fd);
                            if (!empty($will)) {
                                $will = json_decode($will, true);
                                foreach ($server->connections as $sub_fd) {
                                    if ($sub_fd != $fd) {
                                        //发送所有相同的订阅主题
                                        $server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack([
                                            'type' => 3,
                                            'topic' => $will['topic'],
                                            'message' => $will['message'],
                                            'dup' => 0,
                                            'qos' => $will['qos'],
                                            'retain' => $will['retain'],
                                            'message_id' => $arr['message_id'] ?? 0,
                                        ]));
                                    }
                                }
                                if ($will['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数
                                    $server->send($fd, ProtocolV5::pack([
                                        'type' => Types::PUBACK,
                                        'message_id' => $data['message_id'] ?? ''
                                    ]));
                                }
                            }
                            $redis->hdel('will', $fd);
                            $redis->del($fd);
                            $redis->close();
                            //$server->close($fd);
                        }
                        
                        break;
                    case Types::PUBLISH: // //发布,接收信息
                        // Send to subscribers
                        foreach ($server->connections as $sub_fd) {
                            $publish = $redis->sIsMember($sub_fd, $arr['topic']);
                            if ($publish !== false) {
                                if ($sub_fd != $fd) {
                                    //发送所有相同的订阅主题
                                    $server->send($sub_fd, ProtocolV5::pack([
                                        'type' => $data['type'],
                                        'topic' => $data['topic'],
                                        'message' => $data['message'],
                                        'dup' => $data['dup'],
                                        'qos' => $data['qos'],
                                        'retain' => $data['retain'],
                                        'message_id' => $data['message_id'] ?? ''
                                    ]));
                                }
                            }
                        }

                        if ($data['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数
                            $server->send($fd, ProtocolV5::pack([
                                'type' => Types::PUBACK,
                                'message_id' => $data['message_id'] ?? ''
                            ]));
                        }
                        $redis->close();
                        break;
                    case Types::SUBSCRIBE: // /订阅
                        $payload = [];
                        $redis->del($fd);//防止旧数据
                        foreach ($data['topics'] as $k => $option) {
                            $redis->sAdd();
                            $qos = $option['qos'];
                            if (is_numeric($qos) && $qos < 3) {
                                $payload[] = $qos;
                            } else {
                                $payload[] = \Simps\MQTT\Hex\ReasonCode::QOS_NOT_SUPPORTED;
                            }
                        }
                        $redis->close();
                        $server->send($fd, ProtocolV5::pack([
                            'type' => Types::SUBACK,
                            'message_id' => $data['message_id'] ?? '',
                            // 'payload' => $payload,
                            'codes' =>  $payload
                        ]));
                        break;
                    case Types::UNSUBSCRIBE:
                        $server->send($fd, ProtocolV5::pack([
                            'type' => Types::UNSUBACK,
                            'message_id' => $data['message_id'] ?? ''
                        ]));
                        $redis->del($fd);
                        // $this->close();
                        break;
                }
            } else {
                //$server->close($fd);
            }
        } catch (\Throwable $e) {
            echo "\033[0;31mError: {$e->getMessage()}\033[0m\r\n";
            //$server->close($fd);
        }
    }

    
    public function onTask()
    {
        //file_put_contents(__DIR__.'/data.log', $data);
    }

    public function onClose($server, $fd)
    {
        $redis = new Redis();
        $redis->connect('127.0.0.1', 6379);
        
        $redis->hdel('will', $fd);
        $redis->del($fd);
        $redis->close();
        
        echo "客户端{$fd}离开了.\n";
    }
}

new Server();

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐