python 利用socketio(WebSocket协议)实现轻量级穿透方案
基于 WebSocket 封装的通信库;初始握手用;成功协商后,优先使用 WebSocket 通信;如果浏览器或网络环境不支持 WebSocket,会 fallback 到HTTP 长轮询(long-polling);
一、轻量级内网穿透场景介绍
Socket.IO(或 WebSocket) 可以被用作一种 “内网穿透”方式的实现手段,虽然它的原始用途是“实时通信”。
考虑下面这种场景:
内网设备不能被公网访问(没有公网 IP,无法 NAT 映射)
但内网设备可以主动访问公网
这种情况没办法调用到局域网内的服务器,因为没有公网ip,无法或难以实现NAT映射
此时可以
在公网搭建服务端,内网设备作为客户端主动连接 Socket.IO 服务端
后续你只需向服务端发消息,就能实时下发指令到内网设备
这就构成了 伪反向连接 能力,达到内网穿透效果。
实现原理:
通信方向 | 说明 |
---|---|
内网客户端主动连接服务端(建立长连接) | Socket.IO 是双向的,内网客户端可以接收消息 |
公网服务端主动发消息到连接 | 即使服务端在公网,通过 socketio 的 emit 就能推送到内网客户端 |
整个过程绕过 NAT 限制 | 因为连接是由内网“发起”的,NAT 是允许的,且连接保持长开 |
二、socketio介绍
Socket.IO 本质上是:
-
基于 WebSocket 封装的通信库;
-
初始握手用 HTTP + polling;
-
成功协商后,优先使用 WebSocket 通信;
-
如果浏览器或网络环境不支持 WebSocket,会 fallback 到 HTTP 长轮询(long-polling);
2.1 房间概念
想要理解实时通信,先要搞清楚房间的概念(这是抽象概念,socketio对房间概念进行具体实现)。像qq、微信的群聊,就相当于将一群客户丢到一个具体房间(比如1001),在这个房间内任意对象发消息,所有人都能收到。而私聊就可以理解为单独开一个房间,房间内只有两个对象,这时任意对象说话只有对方能听到。
房间类型
概念 | 说明 |
---|---|
Room(房间) | Socket.IO 虚拟逻辑分组,不是实体连接,只用于事件广播 |
公共房间 | 多个客户端都能加入,比如 chatroom1 、group_abc |
单独房间 | 服务端默认会给每个 sid (socket id)分配一个独立房间,可用于点对点通信 |
Namespace | Socket.IO 的大分区机制,默认是 / ,房间是在命名空间内的更细一级分组 |
sid | 每个连接唯一 ID,常用于单播消息 |
房间行为
模拟房间的具体实现动作,比如群聊:加入房间(加入聊天群) -> 广播信息 (发消息) -> 离开房间(退群) -> 关闭房间(删除群)
关键功能 | 描述 |
---|---|
join room | 加入房间 |
leave room | 离开房间 |
close room | 关闭房间 |
broadcast event | 广播事件给房间或所有人 |
single event | 单播事件给指定连接 |
2.2 事件行为对照表(服务端 & 客户端)
操作 | 服务端 | 客户端 | 说明 |
---|---|---|---|
加入房间 | sio.enter_room(sid, room) |
socket.emit('join', {room}) |
加入指定房间 |
离开房间 | sio.leave_room(sid, room) |
socket.emit('leave', {room}) |
主动离开房间 |
关闭房间 | sio.close_room(room) |
客户端无法关闭房间 | 所有成员被踢,房间失效(可选) |
广播给所有人 | sio.emit('event', data) |
socket.emit(...) (客户端发) |
所有连接都收到(跨房间) |
广播给房间 | sio.emit('event', data, room='room1') |
socket.emit(...) |
房间内广播 |
单播给某个客户端 | sio.emit('event', data, to=sid) |
socket.emit(...) |
单播(服务端 -> 客户端) |
三、实例演示
3.1 需求背景
分属异地的几个机房,上海、北京、广州、成都,各机房下都有若干台虚拟机,其命名型如shanghai-2499111,beijing-2929141(sn码),各机房下亦都有一台堡垒机,其作用是查看虚拟机的状态,排错调试用。我们在堡垒机上安装了一个智能脚本,可以根据sn码查询到虚拟机的状态。随着项目推进,机房数量增多,且每日待排错的虚拟机数量也增多,每一次连上堡垒机去调试太耗时。
此时A同学想到一个方法,在中央部署一台服务器,可以远程调用各个机房上的智能脚本,操作中央服务器即操作各个机房堡垒机,但各个机房因网络安全问题不允许向外开放端口(仅允许单方面访问外网),意味着中央没办法找到堡垒机。
此时常规的方式是通过中间件(如redis、rocketmq)采用消息订阅的方式,即中央作为生产者任发布任务,各地区作为消费者消费任务,但这种方式有一些缺点,一是要搭建中间件服务且需要运维成本、二是采用任务模式无法做到实时性,如一些指令集的调用无法达到毫秒级返回(堡垒机,即消费者本质上只能通过定时调用的方式去取任务,如采用1s/次的扫描间隙去取任务,但任然不能满足实时返回的需求,比如我需要对北京的beijing-2929141虚拟机做ls、df -h、free -h调用,还需要先插队列再进行消费执行。第三如果我有批量操作需求,采用任务订阅的方式需要额外的去开发业务代码,这会使整个项目更重。
因此我们决定采用实时通信的方式,通过python socketio创建房间,每一个房间可以对应一个机房(单调用),一个房间也可加入多个(批量调用),亦可采用点对点调用(sid)
3.2 设计思路
下面展示的内容 采用广播给所有客户端的方式,有4个区域(上海、北京、广州、成都),客户端需要做区域判断逻辑,如beijing-2929141则只有北京区域会对消息进行回复。当然也可以为每个区域都创建房间,在前端选择具体区域的房间进行广播。我认为广播所有+客户端区域判断逻辑的实现在操作性上更简易(剩去了选择区域这个步骤)。
服务端基于socketio+flask+h5/css/js进行开发,客户端使用socketio进行开发,每个区域的堡垒机上都需要部署客户端。
主体:
- 服务端(含socketio服务端和前端页面)
- 客户端(各个区域)
链路流程:
- 所有区域加入房间(启动客户端程序)
- server端收到加入房间行为,输出到前端
- 用户输入指令,消息广播到所有区域
- 服务端监听广播信息事件event=my_broadcast_event,监听到信息后再广播到event=my_response
- 客户端监听event=my_response
- 自区域判断,如果为本地区域的sn,则执行指令(否则忽视)
- 执行完指令的区域广播返回执行结果,前端打印输出
3.3 服务端实现
主要技术:
- socketio(实现实时通信、内网穿透)
- flask(实现前端操作页面)
服务端需要实现的功能:
- 房间功能:加入房间join()、离开房间leave()、关闭房间close()
- 消息事件:广播消息、单播消息
- 增强处理(可选):连接、断开连接优雅处理、ping心跳检测
关于房间功能有几点要解释下,服务端本质上来说不会存在加入房间和离开房间的行为,这里两者主要是为了解释用户行为,比如A用户进入后打印一下用户进入的提示,服务端通过 join_room
来标记“某个用户进入了这个房间”,然后根据这个行为触发一些反馈(通知、日志、广播等),离开房间也是类似:只是从该连接的房间列表中移除,不涉及服务端自身状态变化。而关闭房间则是必要的(断绝某个区域的收信),且只能是服务端执行。
后端代码
默认端口1235
vim server.py
from flask_cors import CORS
from flask import Flask, render_template, session, request, copy_current_request_context,redirect,url_for
from flask_socketio import SocketIO, emit, join_room, leave_room, close_room, rooms, disconnect
from threading import Lock
async_mode = None
app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()
#前端首页
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
#广播事件(所有房间)
#接收前端请求,然后转发到my_response(客户端监听my_response)
@socketio.event
def my_broadcast_event(message):
session['receive_count'] = session.get('receive_count', 0) + 1
print(f'broadcast:{message}')
emit('my_response',
{'data': message['data'], 'count': session['receive_count']},
broadcast=True)
#加入房间
#客户端加入某个房间(逻辑分组),类似频道,join_room(message['room']) 把当前客户端加入房间。
#回传当前加入的房间列表(rooms() 是当前客户端所在的房间集合)。
@socketio.event
def join(message):
join_room(message['room'])
a = message['room']
print(f"{a} in room")
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': 'In rooms: ' + ', '.join(rooms()),
'count': session['receive_count']})
#离开房间
#客户端请求离开房间,使用 leave_room 把其移除。同样发送一个反馈消息,显示当前房间号。
@socketio.event
def leave(message):
leave_room(message['room'])
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response',
{'data': 'In rooms: ' + ', '.join(rooms()),
'count': session['receive_count']})
#关闭房间
#关闭某个房间(所有客户端将无法再发送/接收该房间的事件),向该房间广播“关闭通知”,再执行 close_room() 关闭操作。
@socketio.on('close_room')
def on_close_room(message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my_response', {'data': 'Room ' + message['room'] + ' is closing.',
'count': session['receive_count']},
to=message['room'])
close_room(message['room'])
#处理客户端请求断开连接的事件。
#@copy_current_request_context 是 Flask 提供的装饰器,用于把当前请求上下文复制到新线程中(因为 SocketIO 的回调可能会在新线程中执行)
#can_disconnect() 函数内部调用 disconnect(),用于主动断开当前 SocketIO 客户端连接。
@socketio.event
def disconnect_request():
@copy_current_request_context
def can_disconnect():
disconnect()
session['receive_count'] = session.get('receive_count', 0) + 1
#设置 callback=can_disconnect,表示当客户端确认收到这个消息后,再执行断开连接(比较优雅,不会丢消息)。
emit('my_response',
{'data': 'Disconnected!', 'count': session['receive_count']},
callback=can_disconnect)
#处理客户端的 ping(心跳包)
#收到客户端发来的 my_ping 事件时,返回 my_pong,类似于 SocketIO 层的心跳检测,用于确认连接是否活跃。
@socketio.event
def my_ping():
emit('my_pong')
#处理客户端建立连接时的逻辑
#连接建立时启动后台任务 background_thread(如推送消息等循环任务,用线程锁 thread_lock 确保只启动一个线程(防止多用户连接时重复启动)。
@socketio.event
def connect():
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
#emit('my_response', {'data': 'Connected', 'count': 0})
def background_thread():
"""Example of how to send server generated events to clients."""
count = 0
while True:
socketio.sleep(10)
count += 1
socketio.emit('my_response',
{'data': 'Server generated event', 'count': count})
if __name__ == '__main__':
print("======================================")
print("🎉 server start success 🎉")
print("======================================")
socketio.run(app, host='0.0.0.0',port=1235,debug=True)
前端代码
vim templates/index.html
<!DOCTYPE HTML>
<html>
<head>
<title>Flask-SocketIO Test</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.5.1/jquery.min.js" integrity="sha512-bLT0Qm9VnAYZDflyKcBaQ2gg0hSYNQrJ8RilYldYQ1FxQYoCLtUjuuRuZo+fjqhx/qtq/1itJ0C2ejDxltZVFg==" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.4/socket.io.js" integrity="sha512-aMGMvNYu8Ue4G+fHa359jcPb1u+ytAF+P2SCb+PxrjCdO3n3ZTxJ30zuH39rimUggmTwmh2u7wvQsDTHESnmfQ==" crossorigin="anonymous"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function() {
// Connect to the Socket.IO server.
// The connection URL has the following format, relative to the current page:
// http[s]://<domain>:<port>[/<namespace>]
var socket = io.connect('http://' + document.domain + ':' + location.port);
// Event handler for new connections.
// The callback function is invoked when a connection with the
// server is established.
socket.on('connect', function() {
socket.emit('my_event', {data: 'I\'m connected!'});
});
// Event handler for server sent data.
// The callback function is invoked whenever the server emits data
// to the client. The data is then displayed in the "Received"
// section of the page.
socket.on('my_response', function(msg, cb) {
$('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
if (cb)
cb();
});
// Interval function that tests message latency by sending a "ping"
// message. The server then responds with a "pong" message and the
// round trip time is measured.
var ping_pong_times = [];
var start_time;
window.setInterval(function() {
start_time = (new Date).getTime();
$('#transport').text(socket.io.engine.transport.name);
socket.emit('my_ping');
}, 1000);
// Handler for the "pong" message. When the pong is received, the
// time from the ping is stored, and the average of the last 30
// samples is average and displayed.
socket.on('my_pong', function() {
var latency = (new Date).getTime() - start_time;
ping_pong_times.push(latency);
ping_pong_times = ping_pong_times.slice(-30); // keep last 30 samples
var sum = 0;
for (var i = 0; i < ping_pong_times.length; i++)
sum += ping_pong_times[i];
$('#ping-pong').text(Math.round(10 * sum / ping_pong_times.length) / 10);
});
// Handlers for the different forms in the page.
// These accept data from the user and send it to the server in a
// variety of ways
$('form#broadcast').submit(function(event) {
socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()});
return false;
});
});
</script>
</head>
<body>
<h1>Flask-SocketIO Test</h1>
<p>
Async mode is: <b>{{ async_mode }}</b><br>
Current transport is: <b><span id="transport"></span></b><br>
Average ping/pong latency: <b><span id="ping-pong"></span>ms</b>
</p>
<h2>Send:</h2>
<form id="broadcast" method="POST" action='#'>
<input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
<input type="submit" value="Broadcast">
</form>
<form id="close" method="POST" action="#">
<input type="text" name="close_room" id="close_room" placeholder="Room Name">
<input type="submit" value="Close Room">
</form>
<h2>Receive:</h2>
<div id="log"></div>
</body>
</html>
3.4 客户端实现
需要改写下服务器ip信息,ROOM_URL = "http://xxxx:1235"
vim client.py
import socketio
import asyncio
import json
import subprocess
ROOM_URL = "http://xxxx:1235"
sio = socketio.AsyncClient()
AREA = 'beijing'
def do_something(message):
print("do done.")
return "指令执行完成"
def judge_head(message):
if message.startswith(AREA):
return True
else:
return False
def judege_area(message):
if AREA in message:
return True
else:
return False
# join room
@sio.event
async def connet():
print(f"{AREA} Connected to server")
await sio.emit('join', {'room': 'allroom'})
# leave room
@sio.on("disconnect")
async def on_disconnect():
print("Disconnected from the server!")
@sio.on("my_response")
async def my_event(data):
try:
message = data.get('data')
if message is None:
print("No 'data' field in message.")
return
print(f"Message from server: {data}")
message = data['data']
if judge_head(message) and judege_area(message):
print(f"Area and head judgement success")
revived_return = f"已收到{message}实例的请求,正在处理中,系统将在1分钟内返回结果"
await sio.emit('my_broadcast_event', {
'data': revived_return,
'count': 0
})
print("Received valid message, acknowledged.")
# 2.后台异步处理长耗时任务
asyncio.create_task(process_content_and_respond(message))
except Exception as e:
print(f"Error in 'my_response': {e}")
async def process_content_and_respond(message):
try:
result = await asyncio.to_thread(do_something, message)
if result:
await sio.emit('my_broadcast_event', {
'data': result,
'count': 1
})
print(f"Sent result: {result}")
except Exception as e:
print(f"Error in async processing: {e}")
async def start():
try:
await sio.connect(ROOM_URL) # 连接到服务器
print(f"Connected to {ROOM_URL}")
await sio.wait() # 等待事件循环执行,保持连接
except Exception as e:
print(f"Failed to connect: {e}")
if __name__ == '__main__':
print("======================================")
print("🎉 beijing_client start success 🎉")
print("======================================")
asyncio.run(start())
3.5 效果展示
http:服务器ip:端口 访问到前端
在Broadcast输入区域虚拟机sn 加具体指令(广播事件发给my_broadcast_event)
服务端收到广播消息事件后,emit到my_response
客户端监听my_response事件,收到消息后进行处理,处理后再emit到广播事件my_broadcast_event(注意这里收到的消息是来自服务端的)
3.6 两个监听事件的分析
注意到上面用到了两个事件,服务端和前端用到了my_broadcast_event事件,客户端用到了my_response事件。
从链路上来说,前端发送到my_broadcast_event -> 服务端监听my_broadcast_event 转发到 my_response -> 客户端处理完结果反馈到my_broadcast_event
也就是说客户端只需要被动回应消息,而不需要主动的发送行为,如果不这么设计,比如统一使用my_broadcast_event事件,这会导致
客户端收到 my_broadcast_event
就自动响应,也用同一个事件发出去,可能收回自己消息再触发一轮,甚至可能触发死循环。
两者关系 & 差异对比
特性 | my_broadcast_event |
my_response |
---|---|---|
意图 | 主动发出消息、请求广播 | 被动回应消息、发送反馈 |
谁发 | 通常由客户端/前端发起 | 通常由服务端或客户端响应 |
谁收 | 服务端或其他客户端 | 原始发送方 |
是否广播 | 通常广播 | 通常单发 |
场景 | 用户发起请求、问答消息 | 消息处理完后的回执、确认 |
参考文档
更多推荐
所有评论(0)