python bilibili直播弹幕
【代码】python bilibili直播弹幕。
·
原文:AioWebSocket实现python异步接收B站直播弹幕
import ssl
import websocket
import asyncio
import zlib
import json
import _thread as thread
import time
import requests
class WebsocketClient(object):
"""docstring for WebsocketClient"""
def __init__(self, address, send_message):
super(WebsocketClient, self).__init__()
self.address = address
self.send_message = send_message
def on_message(self, ws, message):
printDM(message)
def on_error(self, ws, error):
print("### 报错:", error)
def on_close(self, ws,x,y):
print("### 已关闭###")
print(x,y)
def on_open(self, ws):
print("开始连接")
def run(*args):
self.ws.send(self.send_message)
print(self.send_message)
# self.ws.close()
hb='00 00 00 10 00 10 00 01 00 00 00 02 00 00 00 01'
def sendHeartBeat():
while True:
time.sleep(30)
self.ws.send(bytes.fromhex(hb))
print('[Notice] 发送心跳包,并获取人气值')
thread.start_new_thread(run, ())
thread.start_new_thread(sendHeartBeat, ())
def run(self):
websocket.enableTrace(False)
self.ws = websocket.WebSocketApp(self.address,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.ws.on_open = self.on_open
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
def color_text(text,front=33,back=40):#自定义文字颜色
return f'\033[1;{front};{back}m'+str(text)+'\033[0m'
#参数文字颜色front:灰色(30)-红色(31)-绿色(32)-黄色(33)-蓝色(34)-粉色(35),
#背景颜色back:红色(41)-绿色(42)-黄色(43)-蓝色(44)-粉色(45)-绿色(46)-灰白(47)
# 将数据包传入,解析数据:
dm_count = 0#弹幕计数
lw_count = 0#礼物计数
def printDM(data):
global dm_count
# 获取数据包的长度,版本和操作类型
packetLen = int(data[:4].hex(), 16)
ver = int(data[6:8].hex(), 16)
op = int(data[8:12].hex(), 16)
# 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断,
if (len(data) > packetLen):
printDM(data[packetLen:])
data = data[:packetLen]
# 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。
if (ver == 2):
data = zlib.decompress(data[16:])
printDM(data)
return
# ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。
if (ver == 1):
if (op == 3):
print('[人气] {}'.format(int(data[16:].hex(), 16)))
return
# ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。
# op 为5意味着这是通知消息,cmd 基本就那几个了。
if (op == 5):
try:
jd = json.loads(data[16:].decode('utf-8', errors='ignore'))
if (jd['cmd'] == 'DANMU_MSG'):
dm_count+=1
print(color_text(f'[弹幕{dm_count}] '), jd['info'][2][1], ': ', jd['info'][1])
elif (jd['cmd'] == 'SEND_GIFT'):
print(color_text(f'[礼物]',31), jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x',
jd['data']['giftName'])
elif (jd['cmd'] == 'LIVE'):
print('[Notice] LIVE Start!')
elif (jd['cmd'] == 'PREPARING'):
print('[Notice] LIVE Ended!')
# else:
# print('[OTHER] ', jd['cmd'])
except Exception as e:
pass
if __name__ == '__main__':
short_id = '82568'#url链接里的id
response = requests.get(f'https://api.live.bilibili.com/room/v1/Room/room_init?id={short_id}').json()
roomid = str(response['data']['room_id'])#真实房间号
route = ['wss://hw-gz-live-comet-02.chat.bilibili.com/sub','wss://tx-bj-live-comet-03.chat.bilibili.com/sub','wss://hw-gz-live-comet-05.chat.bilibili.com/sub']#线路
data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'.format(headerLen=hex(27 + len(roomid))[2:],roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))
ws_client = WebsocketClient(route[0],bytes.fromhex(data_raw))
ws_client.run()
更多推荐
所有评论(0)