前言


本文的原文连接是:
https://blog.csdn.net/freewebsys/article/details/108971807
fly-iot飞凡物联专栏:
https://blog.csdn.net/freewebsys/category_12219758.html

未经博主允许不得转载。
博主CSDN地址是:https://blog.csdn.net/freewebsys
博主掘金地址是:https://juejin.cn/user/585379920479288
博主知乎地址是:https://www.zhihu.com/people/freewebsystem

1,关于钩子ExHook


钩子
钩子 (Hooks) 是一种常见的扩展机制,允许开发者在特定的事件点执行自定义代码。
EMQX 支持钩子机制,通过拦截模块间的函数调用、消息传递、事件传递您可以灵活地修改或扩展系统功能。
多语言钩子扩展(ExHook)

https://www.emqx.io/docs/zh/v5/extensions/hooks.html

EMQX 绝大部分功能都是通过钩子实现的,比如:

  • 在消息发布时对消息进行多步的流式处理(编码/解码等)。
  • 在消息发布时根据配置对消息进行缓存。
  • 使用钩子的阻塞机制实现消息的延迟发布。

在这里插入图片描述

EMQX 以一个客户端在其生命周期内事件为基础,预置了大量的钩子:

名称 说明 执行时机
client.connect 处理连接报文 服务端收到客户端的连接报文时
client.connack 下发连接应答 服务端准备下发连接应答报文时
client.connected 成功接入 客户端认证完成并成功接入系统后
client.disconnected 连接断开 客户端连接层在准备关闭时
client.authenticate 连接认证 执行完 client.connect
client.authorize 发布订阅鉴权 执行 发布/订阅 操作前
client.subscribe 订阅主题 收到订阅报文后,执行 client.authorize 鉴权前
client.unsubscribe 取消订阅 收到取消订阅报文后
session.created 会话创建 client.connected 执行完成,且创建新的会话后
session.subscribed 会话订阅主题 完成订阅操作后
session.unsubscribed 会话取消订阅 完成取消订阅操作后
session.resumed 会话恢复 client.connected 执行完成,且成功恢复旧的会话信息后
session.discarded 会话被移除 会话由于被移除而终止后
session.takenover 会话被接管 会话由于被接管而终止后
session.terminated 会话终止 会话由于其他原因被终止后
message.publish 消息发布 服务端在发布(路由)消息前
message.delivered 消息投递 消息准备投递到客户端前
message.acked 消息回执 服务端在收到客户端发回的消息 ACK 后
message.dropped 消息丢弃 发布出的消息被丢弃后

参数说明:

名称 入参 返回
client.connect ConnInfo:客户端连接层参数
Props:MQTT v5.0 连接报文的 Properties 属性
新的 Props
client.connack ConnInfo:客户端连接层参数
Rc:返回码
Props:MQTT v5.0 连接应答报文的 Properties 属性
新的 Props
client.connected ClientInfo:客户端信息参数
ConnInfo: 客户端连接层参数
-
client.disconnected ClientInfo:客户端信息参数
ConnInfo:客户端连接层参数
ReasonCode:错误码
-
client.authenticate ClientInfo:客户端信息参数
AuthNResult:认证结果
新的 AuthNResult
client.authorize ClientInfo:客户端信息参数
Topic:发布/订阅的主题
PubSub:发布或订阅
AuthZResult:授权结果
新的 AuthZResult
client.subscribe ClientInfo:客户端信息参数
Props:MQTT v5.0 订阅报文的 Properties 参数
TopicFilters:需订阅的主题列表
新的 TopicFilters
client.unsubscribe ClientInfo:客户端信息参数
Props:MQTT v5.0 取消订阅报文的 Properties 参数
TopicFilters:需取消订阅的主题列表
新的 TopicFilters
session.created ClientInfo:客户端信息参数
SessInfo:会话信息
-
session.subscribed ClientInfo:客户端信息参数
Topic:订阅的主题
SubOpts:订阅操作的配置选项
-
session.unsubscribed ClientInfo:客户端信息参数
Topic:取消订阅的主题
SubOpts:取消订阅操作的配置选项
-
session.resumed ClientInfo:客户端信息参数
SessInfo:会话信息
-
session.discarded ClientInfo:客户端信息参数
SessInfo:会话信息
-

2,使用python开发ExHook服务


项目代码参考,需要4.3 + 以上的版本支持,我这里使用的是5.1的版本:

https://github.com/emqx/emqx-extension-examples

安装python的 gRPC 和 gRPC Tools:

git clone https://github.com/emqx/emqx-extension-examples
cd exhook-svr-python
python3 -m pip install grpcio grpcio-tools

编译 *.proto 并生产服务代码:

python3 -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/exhook.proto

运行服务,端口 9000

python3 exhook_server.py

python 的代码空实现,直接展示数据:

# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC exhook server."""

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value

import grpc

import exhook_pb2
import exhook_pb2_grpc

class HookProvider(exhook_pb2_grpc.HookProviderServicer):

    def OnProviderLoaded(self, request, context):
        print("OnProviderLoaded:", request)
        specs = [exhook_pb2.HookSpec(name="client.connect"),
                 exhook_pb2.HookSpec(name="client.connack"),
                 exhook_pb2.HookSpec(name="client.connected"),
                 exhook_pb2.HookSpec(name="client.disconnected"),
                 exhook_pb2.HookSpec(name="client.authenticate"),
                 exhook_pb2.HookSpec(name="client.authorize"),
                 exhook_pb2.HookSpec(name="client.subscribe"),
                 exhook_pb2.HookSpec(name="client.unsubscribe"),

                 exhook_pb2.HookSpec(name="session.created"),
                 exhook_pb2.HookSpec(name="session.subscribed"),
                 exhook_pb2.HookSpec(name="session.unsubscribed"),
                 exhook_pb2.HookSpec(name="session.resumed"),
                 exhook_pb2.HookSpec(name="session.discarded"),
                 exhook_pb2.HookSpec(name="session.takenover"),
                 exhook_pb2.HookSpec(name="session.terminated"),

                 exhook_pb2.HookSpec(name="message.publish"),
                 exhook_pb2.HookSpec(name="message.delivered"),
                 exhook_pb2.HookSpec(name="message.acked"),
                 exhook_pb2.HookSpec(name="message.dropped")
                ]
        return exhook_pb2.LoadedResponse(hooks=specs)

    def OnProviderUnloaded(self, request, context):
        print("OnProviderUnloaded:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnect(self, request, context):
        print("OnClientConnect:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnack(self, request, context):
        print("OnClientConnack:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnected(self, request, context):
        print("OnClientConnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientDisconnected(self, request, context):
        print("OnClientDisconnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientAuthenticate(self, request, context):
        print("OnClientAuthenticate:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientAuthorize(self, request, context):
        print("OnClientAuthorize:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientSubscribe(self, request, context):
        print("OnClientSubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientUnsubscribe(self, request, context):
        print("OnClientUnsubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionCreated(self, request, context):
        print("OnSessionCreated:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionSubscribed(self, request, context):
        print("OnSessionSubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionUnsubscribed(self, request, context):
        print("OnSessionUnsubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionResumed(self, request, context):
        print("OnSessionResumed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionDiscarded(self, request, context):
        print("OnSessionDiscarded:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTakenover(self, request, context):
        print("OnSessionTakenover:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTerminated(self, request, context):
        print("OnSessionTerminated:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessagePublish(self, request, context):
        print("OnMessagePublish:", request)
        nmsg = request.message
        #nmsg.payload = b"hardcode payload by exhook-svr-python :)"

        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
        return reply

    ## case2: stop publish the 't/d' messages
    #def OnMessagePublish(self, request, context):
    #    nmsg = request.message
    #    if nmsg.topic == 't/d':
    #        nmsg.payload = b""
    #        nmsg.headers['allow_publish'] = b"false"
    #
    #    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
    #    return reply

    def OnMessageDelivered(self, request, context):
        print("OnMessageDelivered:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageDropped(self, request, context):
        print("OnMessageDropped:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageAcked(self, request, context):
        print("OnMessageAcked:", request)
        return exhook_pb2.EmptySuccess()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
    server.add_insecure_port('[::]:9000')
    server.start()

    print("Started gRPC server on [::]:9000")

    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

3,通过emqx的管理界面配置ExHook的gRPC服务


配置服务和端口地址:
在这里插入图片描述
服务可以访问成功,并且可以注册显示钩子的数量是 19 说明代码运行成功。
在这里插入图片描述
19个钩子服务
在这里插入图片描述
当启动钩子,设备上下线的时候可以触发gRPC服务的调用:

4,创建一个设备,当设备上线下线,展示消息日志


当钩子启用的触发的消息:

OnProviderLoaded: broker {
  version: "5.1.2"
  sysdescr: "EMQX"
  uptime: 35319623
  datetime: "2023-07-30T10:32:07.659816096+00:00"
}
meta {
  node: "emqx@node.emqx.io"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

当设备连接出发的消息:

OnClientConnect: conninfo {
  node: "emqx@node.emqx.io"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  proto_name: "MQTT"
  proto_ver: "5"
  keepalive: 60
}
props {
  name: "Session-Expiry-Interval"
  value: "0"
}
meta {
  node: "emqx@node.emqx.io"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

当设备鉴权触发信息:

OnClientAuthenticate: clientinfo {
  node: "emqx@node.emqx.io"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "emqx@node.emqx.io"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

当设备连接成功:

OnClientConnected: clientinfo {
  node: "emqx@node.emqx.io"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
meta {
  node: "emqx@node.emqx.io"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

当消息发送触发信息:

OnMessageDelivered: clientinfo {
  node: "emqx@node.emqx.io"
  clientid: "emqx_OTIxNz"
  peerhost: "172.18.0.1"
  sockport: 8083
  protocol: "mqtt"
  anonymous: true
}
message {
  node: "emqx@node.emqx.io"
  id: "000601B1E4DB27A06EE4010006430002"
  from: "emqx_OTIxNz"
  topic: "testtopic/1"
  payload: "{ \"msg\": \"hello\" }"
  timestamp: 1690713420670
  headers {
    key: "peerhost"
    value: "172.18.0.1"
  }
  headers {
    key: "protocol"
    value: "mqtt"
  }
  headers {
    key: "username"
    value: ""
  }
}
meta {
  node: "emqx@node.emqx.io"
  version: "5.1.2"
  sysdescr: "EMQX"
  cluster_name: "emqxcl"
}

其他还有很多消息,只是简单的看了几个消息。

5,总结和ActorCloud数据进行更新


剩下的事情就是把数据入库了。找到那个设备,然后把设备的在线状态更新下:

postgresql:

update devices set "deviceStatus" = 1 where "deviceName" =  '11111111' ;

然后设备的在线状态就同步了;

在这里插入图片描述
继续把同步代码补齐,然后连接数据库,修改数据。

本文的原文连接是:
https://blog.csdn.net/freewebsys/article/details/108971807

在这里插入图片描述

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐