一、轻量级内网穿透场景介绍

Socket.IO(或 WebSocket) 可以被用作一种 “内网穿透”方式的实现手段,虽然它的原始用途是“实时通信”。

考虑下面这种场景:

  • 内网设备不能被公网访问(没有公网 IP,无法 NAT 映射

  • 但内网设备可以主动访问公网

这种情况没办法调用到局域网内的服务器,因为没有公网ip,无法或难以实现NAT映射

此时可以

  • 在公网搭建服务端,内网设备作为客户端主动连接 Socket.IO 服务端

  • 后续你只需向服务端发消息,就能实时下发指令到内网设备

这就构成了 伪反向连接 能力,达到内网穿透效果

实现原理:

通信方向 说明
内网客户端主动连接服务端(建立长连接) Socket.IO 是双向的,内网客户端可以接收消息
公网服务端主动发消息到连接 即使服务端在公网,通过 socketio 的 emit 就能推送到内网客户端
整个过程绕过 NAT 限制 因为连接是由内网“发起”的,NAT 是允许的,且连接保持长开

二、socketio介绍

Socket.IO 本质上是:

  • 基于 WebSocket 封装的通信库

  • 初始握手用 HTTP + polling

  • 成功协商后,优先使用 WebSocket 通信

  • 如果浏览器或网络环境不支持 WebSocket,会 fallback 到 HTTP 长轮询(long-polling)

2.1 房间概念

想要理解实时通信,先要搞清楚房间的概念(这是抽象概念,socketio对房间概念进行具体实现)。像qq、微信的群聊,就相当于将一群客户丢到一个具体房间(比如1001),在这个房间内任意对象发消息,所有人都能收到。而私聊就可以理解为单独开一个房间,房间内只有两个对象,这时任意对象说话只有对方能听到。

房间类型

概念 说明
Room(房间) Socket.IO 虚拟逻辑分组,不是实体连接,只用于事件广播
公共房间 多个客户端都能加入,比如 chatroom1group_abc
单独房间 服务端默认会给每个 sid(socket id)分配一个独立房间,可用于点对点通信
Namespace Socket.IO 的大分区机制,默认是 /,房间是在命名空间内的更细一级分组
sid 每个连接唯一 ID,常用于单播消息

房间行为

模拟房间的具体实现动作,比如群聊:加入房间(加入聊天群) -> 广播信息 (发消息) -> 离开房间(退群) -> 关闭房间(删除群)

关键功能 描述
 join room 加入房间
 leave room 离开房间
 close room 关闭房间
 broadcast event 广播事件给房间或所有人
 single event 单播事件给指定连接

2.2 事件行为对照表(服务端 & 客户端)

操作 服务端  客户端  说明
加入房间 sio.enter_room(sid, room) socket.emit('join', {room}) 加入指定房间
离开房间 sio.leave_room(sid, room) socket.emit('leave', {room}) 主动离开房间
关闭房间 sio.close_room(room) 客户端无法关闭房间 所有成员被踢,房间失效(可选)
广播给所有人 sio.emit('event', data) socket.emit(...)(客户端发) 所有连接都收到(跨房间)
广播给房间 sio.emit('event', data, room='room1') socket.emit(...) 房间内广播
单播给某个客户端 sio.emit('event', data, to=sid) socket.emit(...) 单播(服务端 -> 客户端)


三、实例演示

3.1 需求背景

分属异地的几个机房,上海、北京、广州、成都,各机房下都有若干台虚拟机,其命名型如shanghai-2499111,beijing-2929141(sn码),各机房下亦都有一台堡垒机,其作用是查看虚拟机的状态,排错调试用。我们在堡垒机上安装了一个智能脚本,可以根据sn码查询到虚拟机的状态。随着项目推进,机房数量增多,且每日待排错的虚拟机数量也增多,每一次连上堡垒机去调试太耗时。

此时A同学想到一个方法,在中央部署一台服务器,可以远程调用各个机房上的智能脚本,操作中央服务器即操作各个机房堡垒机,但各个机房因网络安全问题不允许向外开放端口(仅允许单方面访问外网),意味着中央没办法找到堡垒机。

此时常规的方式是通过中间件(如redis、rocketmq)采用消息订阅的方式,即中央作为生产者任发布任务,各地区作为消费者消费任务,但这种方式有一些缺点,一是要搭建中间件服务且需要运维成本、二是采用任务模式无法做到实时性,如一些指令集的调用无法达到毫秒级返回(堡垒机,即消费者本质上只能通过定时调用的方式去取任务,如采用1s/次的扫描间隙去取任务,但任然不能满足实时返回的需求,比如我需要对北京的beijing-2929141虚拟机做ls、df -h、free -h调用,还需要先插队列再进行消费执行。第三如果我有批量操作需求,采用任务订阅的方式需要额外的去开发业务代码,这会使整个项目更重。

因此我们决定采用实时通信的方式,通过python socketio创建房间,每一个房间可以对应一个机房(单调用),一个房间也可加入多个(批量调用),亦可采用点对点调用(sid)

3.2 设计思路

下面展示的内容 采用广播给所有客户端的方式,有4个区域(上海、北京、广州、成都),客户端需要做区域判断逻辑,如beijing-2929141则只有北京区域会对消息进行回复。当然也可以为每个区域都创建房间,在前端选择具体区域的房间进行广播。我认为广播所有+客户端区域判断逻辑的实现在操作性上更简易(剩去了选择区域这个步骤)。

服务端基于socketio+flask+h5/css/js进行开发,客户端使用socketio进行开发,每个区域的堡垒机上都需要部署客户端。

主体:

  • 服务端(含socketio服务端和前端页面)
  • 客户端(各个区域)

链路流程:

  1. 所有区域加入房间(启动客户端程序)
  2. server端收到加入房间行为,输出到前端
  3. 用户输入指令,消息广播到所有区域
  4. 服务端监听广播信息事件event=my_broadcast_event,监听到信息后再广播到event=my_response
  5. 客户端监听event=my_response
  6. 自区域判断,如果为本地区域的sn,则执行指令(否则忽视)
  7. 执行完指令的区域广播返回执行结果,前端打印输出

3.3 服务端实现

主要技术:

  • socketio(实现实时通信、内网穿透)
  • flask(实现前端操作页面)

服务端需要实现的功能:

  • 房间功能:加入房间join()、离开房间leave()、关闭房间close()
  • 消息事件:广播消息、单播消息
  • 增强处理(可选):连接、断开连接优雅处理、ping心跳检测

关于房间功能有几点要解释下,服务端本质上来说不会存在加入房间和离开房间的行为,这里两者主要是为了解释用户行为,比如A用户进入后打印一下用户进入的提示,服务端通过 join_room 来标记“某个用户进入了这个房间”,然后根据这个行为触发一些反馈(通知、日志、广播等),离开房间也是类似:只是从该连接的房间列表中移除,不涉及服务端自身状态变化。而关闭房间则是必要的(断绝某个区域的收信),且只能是服务端执行。

后端代码

默认端口1235

vim server.py

from flask_cors import CORS
from flask import Flask, render_template, session, request, copy_current_request_context,redirect,url_for
from flask_socketio import SocketIO, emit, join_room, leave_room, close_room, rooms, disconnect
from threading import Lock

async_mode = None

app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()

#前端首页
@app.route('/')
def index():
    return render_template('index.html', async_mode=socketio.async_mode)

#广播事件(所有房间)
#接收前端请求,然后转发到my_response(客户端监听my_response)
@socketio.event
def my_broadcast_event(message):
    session['receive_count'] = session.get('receive_count', 0) + 1
    print(f'broadcast:{message}')
    emit('my_response',
        {'data': message['data'], 'count': session['receive_count']},
        broadcast=True)

#加入房间
#客户端加入某个房间(逻辑分组),类似频道,join_room(message['room']) 把当前客户端加入房间。
#回传当前加入的房间列表(rooms() 是当前客户端所在的房间集合)。
@socketio.event
def join(message):
    join_room(message['room'])
    a = message['room']
    print(f"{a} in room")
    session['receive_count'] = session.get('receive_count', 0) + 1
    emit('my_response',
        {'data': 'In rooms: ' + ', '.join(rooms()),
        'count': session['receive_count']})

#离开房间
#客户端请求离开房间,使用 leave_room 把其移除。同样发送一个反馈消息,显示当前房间号。
@socketio.event
def leave(message):
    leave_room(message['room'])
    session['receive_count'] = session.get('receive_count', 0) + 1
    emit('my_response',
        {'data': 'In rooms: ' + ', '.join(rooms()),
        'count': session['receive_count']})

#关闭房间
#关闭某个房间(所有客户端将无法再发送/接收该房间的事件),向该房间广播“关闭通知”,再执行 close_room() 关闭操作。
@socketio.on('close_room')
def on_close_room(message):
    session['receive_count'] = session.get('receive_count', 0) + 1
    emit('my_response', {'data': 'Room ' + message['room'] + ' is closing.',
                        'count': session['receive_count']},
        to=message['room'])
    close_room(message['room'])


#处理客户端请求断开连接的事件。
#@copy_current_request_context 是 Flask 提供的装饰器,用于把当前请求上下文复制到新线程中(因为 SocketIO 的回调可能会在新线程中执行)
#can_disconnect() 函数内部调用 disconnect(),用于主动断开当前 SocketIO 客户端连接。
@socketio.event
def disconnect_request():
    @copy_current_request_context
    def can_disconnect():
        disconnect()
    session['receive_count'] = session.get('receive_count', 0) + 1
    #设置 callback=can_disconnect,表示当客户端确认收到这个消息后,再执行断开连接(比较优雅,不会丢消息)。
    emit('my_response',
        {'data': 'Disconnected!', 'count': session['receive_count']},
        callback=can_disconnect)

#处理客户端的 ping(心跳包)
#收到客户端发来的 my_ping 事件时,返回 my_pong,类似于 SocketIO 层的心跳检测,用于确认连接是否活跃。
@socketio.event
def my_ping():
    emit('my_pong')

#处理客户端建立连接时的逻辑
#连接建立时启动后台任务 background_thread(如推送消息等循环任务,用线程锁 thread_lock 确保只启动一个线程(防止多用户连接时重复启动)。
@socketio.event
def connect():
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(background_thread)
    #emit('my_response', {'data': 'Connected', 'count': 0})

def background_thread():
    """Example of how to send server generated events to clients."""
    count = 0
    while True:
        socketio.sleep(10)
        count += 1
        socketio.emit('my_response',
                      {'data': 'Server generated event', 'count': count})

if __name__ == '__main__':
    print("======================================")
    print("🎉 server start success 🎉")
    print("======================================")
    socketio.run(app, host='0.0.0.0',port=1235,debug=True)

前端代码

vim templates/index.html

<!DOCTYPE HTML>
<html>
<head>
    <title>Flask-SocketIO Test</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.5.1/jquery.min.js" integrity="sha512-bLT0Qm9VnAYZDflyKcBaQ2gg0hSYNQrJ8RilYldYQ1FxQYoCLtUjuuRuZo+fjqhx/qtq/1itJ0C2ejDxltZVFg==" crossorigin="anonymous"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.4/socket.io.js" integrity="sha512-aMGMvNYu8Ue4G+fHa359jcPb1u+ytAF+P2SCb+PxrjCdO3n3ZTxJ30zuH39rimUggmTwmh2u7wvQsDTHESnmfQ==" crossorigin="anonymous"></script>
    <script type="text/javascript" charset="utf-8">
        $(document).ready(function() {
            // Connect to the Socket.IO server.
            // The connection URL has the following format, relative to the current page:
            //     http[s]://<domain>:<port>[/<namespace>]
            var socket = io.connect('http://' + document.domain + ':' + location.port);

            // Event handler for new connections.
            // The callback function is invoked when a connection with the
            // server is established.
            socket.on('connect', function() {
                socket.emit('my_event', {data: 'I\'m connected!'});
            });

            // Event handler for server sent data.
            // The callback function is invoked whenever the server emits data
            // to the client. The data is then displayed in the "Received"
            // section of the page.
            socket.on('my_response', function(msg, cb) {
                $('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
                if (cb)
                    cb();
            });

            // Interval function that tests message latency by sending a "ping"
            // message. The server then responds with a "pong" message and the
            // round trip time is measured.
            var ping_pong_times = [];
            var start_time;
            window.setInterval(function() {
                start_time = (new Date).getTime();
                $('#transport').text(socket.io.engine.transport.name);
                socket.emit('my_ping');
            }, 1000);

            // Handler for the "pong" message. When the pong is received, the
            // time from the ping is stored, and the average of the last 30
            // samples is average and displayed.
            socket.on('my_pong', function() {
                var latency = (new Date).getTime() - start_time;
                ping_pong_times.push(latency);
                ping_pong_times = ping_pong_times.slice(-30); // keep last 30 samples
                var sum = 0;
                for (var i = 0; i < ping_pong_times.length; i++)
                    sum += ping_pong_times[i];
                $('#ping-pong').text(Math.round(10 * sum / ping_pong_times.length) / 10);
            });

            // Handlers for the different forms in the page.
            // These accept data from the user and send it to the server in a
            // variety of ways
            $('form#broadcast').submit(function(event) {
                socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()});
                return false;
            });
        });
    </script>
</head>
<body>
    <h1>Flask-SocketIO Test</h1>
    <p>
      Async mode is: <b>{{ async_mode }}</b><br>
      Current transport is: <b><span id="transport"></span></b><br>
      Average ping/pong latency: <b><span id="ping-pong"></span>ms</b>
    </p>
    <h2>Send:</h2>
    <form id="broadcast" method="POST" action='#'>
        <input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
        <input type="submit" value="Broadcast">
    </form>
    <form id="close" method="POST" action="#">
        <input type="text" name="close_room" id="close_room" placeholder="Room Name">
        <input type="submit" value="Close Room">
    </form>
    <h2>Receive:</h2>
    <div id="log"></div>
</body>
</html>

3.4 客户端实现

需要改写下服务器ip信息,ROOM_URL = "http://xxxx:1235"

vim client.py

import socketio
import asyncio
import json
import subprocess

ROOM_URL = "http://xxxx:1235"
sio = socketio.AsyncClient()
AREA = 'beijing'

def do_something(message):
    print("do done.")
    return "指令执行完成"

def judge_head(message):
    if message.startswith(AREA):
        return True
    else:
        return False

def judege_area(message):
    if AREA in message:
        return True
    else:
        return False

# join room
@sio.event
async def connet():
    print(f"{AREA} Connected to server")
    await sio.emit('join', {'room': 'allroom'})  

# leave room
@sio.on("disconnect")
async def on_disconnect():
    print("Disconnected from the server!")

@sio.on("my_response")
async def my_event(data):
    try:
        message = data.get('data')
        if message is None:
            print("No 'data' field in message.")
            return
        print(f"Message from server: {data}")
        message = data['data']
        if judge_head(message) and judege_area(message):
            print(f"Area and head judgement success")

            revived_return = f"已收到{message}实例的请求,正在处理中,系统将在1分钟内返回结果"

            await sio.emit('my_broadcast_event', {
                'data': revived_return, 
                'count': 0
            })
            print("Received valid message, acknowledged.")

            # 2.后台异步处理长耗时任务
            asyncio.create_task(process_content_and_respond(message))

    except Exception as e:
        print(f"Error in 'my_response': {e}")

async def process_content_and_respond(message):
    try:
        result = await asyncio.to_thread(do_something, message)  
        if result:
            await sio.emit('my_broadcast_event', {
                'data': result, 
                'count': 1
            })
            print(f"Sent result: {result}")
    except Exception as e:
        print(f"Error in async processing: {e}")

async def start():
    try:
        await sio.connect(ROOM_URL)  # 连接到服务器
        print(f"Connected to {ROOM_URL}")
        await sio.wait()  # 等待事件循环执行,保持连接
    except Exception as e:
        print(f"Failed to connect: {e}")

if __name__ == '__main__':
    print("======================================")
    print("🎉 beijing_client start success 🎉")
    print("======================================")
    asyncio.run(start())

3.5 效果展示

http:服务器ip:端口 访问到前端

在Broadcast输入区域虚拟机sn 加具体指令(广播事件发给my_broadcast_event)

服务端收到广播消息事件后,emit到my_response 

客户端监听my_response事件,收到消息后进行处理,处理后再emit到广播事件my_broadcast_event(注意这里收到的消息是来自服务端的)

3.6 两个监听事件的分析

注意到上面用到了两个事件,服务端和前端用到了my_broadcast_event事件,客户端用到了my_response事件。

从链路上来说,前端发送到my_broadcast_event -> 服务端监听my_broadcast_event 转发到 my_response -> 客户端处理完结果反馈到my_broadcast_event

也就是说客户端只需要被动回应消息,而不需要主动的发送行为,如果不这么设计,比如统一使用my_broadcast_event事件,这会导致客户端收到 my_broadcast_event 就自动响应,也用同一个事件发出去,可能收回自己消息再触发一轮,甚至可能触发死循环。

两者关系 & 差异对比

特性 my_broadcast_event my_response
意图 主动发出消息、请求广播 被动回应消息、发送反馈
谁发 通常由客户端/前端发起 通常由服务端或客户端响应
谁收 服务端或其他客户端 原始发送方
是否广播 通常广播 通常单发
场景 用户发起请求、问答消息 消息处理完后的回执、确认

参考文档

【websocket】小白快速上手flask-socketio - 天意凉 - 博客园

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐