利用swoole搭建mqtt服务器
一个基于swoole的mqtt 服务端,支持异步操作。支持QoS 0、QoS 1、QoS 2。支持MQTT5.0版本,其他版本没测试。
·
环境: php7.2
swoole : 4.x
1. composer安装MQTT库
composer require simps/mqtt
2.在根目录创建Server.php
<?php
use Simps\MQTT\ProtocolV5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Types;
class Server
{
protected static $host = '0.0.0.0';
protected static $port = 1883;
protected $_mqtt;
public function __construct()
{
try{
@ini_set('default_socket_timeout', -1);
require_once __DIR__ . '/vendor/autoload.php';
$this->_mqtt = new Swoole\Server(self::$host, self::$port, SWOOLE_BASE);
$this->_mqtt->set(['open_mqtt_protocol' => true, // 启用 MQTT 协议
'worker_num' => 2,
'reactor_num' => 4,
'reload_async' => true,//设置异步重启开关。设置为 true 时,将启用异步安全重启特性,Worker 进程会等待异步事件完成后再退出
'dispatch_mode' => 3,//数据包分发策略。【默认值:2】
'daemonize' => false,//是否后台运行
'open_cpu_affinity' => true,//启用 CPU 亲和性设置
'package_max_length' => 2 * 1024 * 1024,
]);
$this->_mqtt->on('connect', [$this, 'onConnect']);
$this->_mqtt->on('receive', [$this, 'onReceive']);
$this->_mqtt->on('close', [$this, 'onClose']);
$this->_mqtt->start();
}catch(Exception $e){
echo $e->getMessage();
}
}
// public function getInstance()
// {
// if(is_null(self::$_instance)){
// self::$_instance = new self();
// }
// return self::$_instance;
// }
public function onConnect($server, $fd, $from_id)
{
echo "客户端{$fd}进来了.\n";
}
public function onReceive(Swoole\Server $server, $fd, $from_id, $data)
{
try {
// debug
$data = ProtocolV5::unpack($data);
// print_r($data);
if (is_array($data) && isset($data['type'])) {
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
switch ($data['type']) {
case Types::CONNECT: // 客户端链接
// Check protocol_name
if ($data['protocol_name'] != 'MQTT') {
//$server->close($fd);
return false;
}
//用户名密码连接
if (!isset($data['user_name']) || empty($data['user_name']) || !isset($data['password']) || empty($data['password'])) {
//$server->close($fd);
return false;//用户密码为空
}
// Check connection information, etc.
//遗嘱
if (isset($data['will']['topic']) && !empty($data['will']['topic'])) {
$redis->hSet('will', $fd, json_encode(['message' => $data['will']['message'], 'qos' => $data['will']['qos'], 'topic' => $data['will']['topic'], 'retain' => $data['will']['retain']]));
}
$redis->del($fd);
$server->send($fd, ProtocolV5::pack([
'type' => Types::CONNACK,
'code' => 0,
'session_present' => 0,
'properties' => [
'maximum_packet_size' => 1048576,
'retain_available' => true,
'shared_subscription_available' => true,
'subscription_identifier_available' => true,
'topic_alias_maximum' => 65535, //0
'wildcard_subscription_available' => true,
],
]));//确认连接请求
$redis->close();
break;
case Types::PINGREQ: // 心跳服务
$redis->del($fd);
if ($server->exist($fd)) {
$server->send($fd, ProtocolV5::pack(['type' => Types::PINGRESP]));
}else{
$server->close();
}
$redis->close();
break;
case Types::DISCONNECT: // 转发服务
// if (!$server->exist($fd)) {
// //$server->close($fd);
// }
if ($server->exist($fd)) {
//转发遗嘱
$will = $redis->hGet('will', $fd);
if (!empty($will)) {
$will = json_decode($will, true);
foreach ($server->connections as $sub_fd) {
if ($sub_fd != $fd) {
//发送所有相同的订阅主题
$server->send($sub_fd, \Simps\MQTT\Protocol\V3::pack([
'type' => 3,
'topic' => $will['topic'],
'message' => $will['message'],
'dup' => 0,
'qos' => $will['qos'],
'retain' => $will['retain'],
'message_id' => $arr['message_id'] ?? 0,
]));
}
}
if ($will['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数
$server->send($fd, ProtocolV5::pack([
'type' => Types::PUBACK,
'message_id' => $data['message_id'] ?? ''
]));
}
}
$redis->hdel('will', $fd);
$redis->del($fd);
$redis->close();
//$server->close($fd);
}
break;
case Types::PUBLISH: // //发布,接收信息
// Send to subscribers
foreach ($server->connections as $sub_fd) {
$publish = $redis->sIsMember($sub_fd, $arr['topic']);
if ($publish !== false) {
if ($sub_fd != $fd) {
//发送所有相同的订阅主题
$server->send($sub_fd, ProtocolV5::pack([
'type' => $data['type'],
'topic' => $data['topic'],
'message' => $data['message'],
'dup' => $data['dup'],
'qos' => $data['qos'],
'retain' => $data['retain'],
'message_id' => $data['message_id'] ?? ''
]));
}
}
}
if ($data['qos'] === 1) {//发布消息的服务质量,即:保证消息传递的次数
$server->send($fd, ProtocolV5::pack([
'type' => Types::PUBACK,
'message_id' => $data['message_id'] ?? ''
]));
}
$redis->close();
break;
case Types::SUBSCRIBE: // /订阅
$payload = [];
$redis->del($fd);//防止旧数据
foreach ($data['topics'] as $k => $option) {
$redis->sAdd();
$qos = $option['qos'];
if (is_numeric($qos) && $qos < 3) {
$payload[] = $qos;
} else {
$payload[] = \Simps\MQTT\Hex\ReasonCode::QOS_NOT_SUPPORTED;
}
}
$redis->close();
$server->send($fd, ProtocolV5::pack([
'type' => Types::SUBACK,
'message_id' => $data['message_id'] ?? '',
// 'payload' => $payload,
'codes' => $payload
]));
break;
case Types::UNSUBSCRIBE:
$server->send($fd, ProtocolV5::pack([
'type' => Types::UNSUBACK,
'message_id' => $data['message_id'] ?? ''
]));
$redis->del($fd);
// $this->close();
break;
}
} else {
//$server->close($fd);
}
} catch (\Throwable $e) {
echo "\033[0;31mError: {$e->getMessage()}\033[0m\r\n";
//$server->close($fd);
}
}
public function onTask()
{
//file_put_contents(__DIR__.'/data.log', $data);
}
public function onClose($server, $fd)
{
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->hdel('will', $fd);
$redis->del($fd);
$redis->close();
echo "客户端{$fd}离开了.\n";
}
}
new Server();
更多推荐
所有评论(0)