原文:AioWebSocket实现python异步接收B站直播弹幕

在这里插入图片描述

import ssl
import websocket
import asyncio
import zlib
import json
import _thread as thread
import time
import requests
class WebsocketClient(object):
    """docstring for WebsocketClient"""

    def __init__(self, address, send_message):
        super(WebsocketClient, self).__init__()
        self.address = address
        self.send_message = send_message

    def on_message(self, ws, message):
        printDM(message)

    def on_error(self, ws, error):
        print("### 报错:", error)

    def on_close(self, ws,x,y):
        print("### 已关闭###")
        print(x,y)

    def on_open(self, ws):
        print("开始连接")

        def run(*args):
            self.ws.send(self.send_message)
            print(self.send_message)
            # self.ws.close()
        hb='00 00 00 10 00 10 00 01  00 00 00 02 00 00 00 01'
        def sendHeartBeat():
            while True:
                time.sleep(30)
                self.ws.send(bytes.fromhex(hb))
                print('[Notice] 发送心跳包,并获取人气值')

        thread.start_new_thread(run, ())
        thread.start_new_thread(sendHeartBeat, ())

    def run(self):
        websocket.enableTrace(False)
        self.ws = websocket.WebSocketApp(self.address,
                                         on_message=self.on_message,
                                         on_error=self.on_error,
                                         on_close=self.on_close)
        self.ws.on_open = self.on_open
        self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})


def color_text(text,front=33,back=40):#自定义文字颜色
              return f'\033[1;{front};{back}m'+str(text)+'\033[0m' 
              #参数文字颜色front:灰色(30)-红色(31)-绿色(32)-黄色(33)-蓝色(34)-粉色(35),
              #背景颜色back:红色(41)-绿色(42)-黄色(43)-蓝色(44)-粉色(45)-绿色(46)-灰白(47)

# 将数据包传入,解析数据:
dm_count = 0#弹幕计数
lw_count = 0#礼物计数
def printDM(data):
    global dm_count
    # 获取数据包的长度,版本和操作类型
    packetLen = int(data[:4].hex(), 16)
    ver = int(data[6:8].hex(), 16)
    op = int(data[8:12].hex(), 16)

    # 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断,
    if (len(data) > packetLen):
        printDM(data[packetLen:])
        data = data[:packetLen]

    # 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。
    if (ver == 2):
        data = zlib.decompress(data[16:])
        printDM(data)
        return

    # ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。
    if (ver == 1):
        if (op == 3):
            print('[人气]  {}'.format(int(data[16:].hex(), 16)))
        return

    # ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。
    # op 为5意味着这是通知消息,cmd 基本就那几个了。
    
    if (op == 5):
        try:
            jd = json.loads(data[16:].decode('utf-8', errors='ignore'))
            if (jd['cmd'] == 'DANMU_MSG'):
                dm_count+=1
                print(color_text(f'[弹幕{dm_count}] '), jd['info'][2][1], ': ', jd['info'][1])
            elif (jd['cmd'] == 'SEND_GIFT'):
                print(color_text(f'[礼物]',31), jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x',
                      jd['data']['giftName'])
            elif (jd['cmd'] == 'LIVE'):
                print('[Notice] LIVE Start!')
            elif (jd['cmd'] == 'PREPARING'):
                print('[Notice] LIVE Ended!')
            # else:
            #     print('[OTHER] ', jd['cmd'])
        except Exception as e:
            pass


if __name__ == '__main__':
    short_id = '82568'#url链接里的id
    response = requests.get(f'https://api.live.bilibili.com/room/v1/Room/room_init?id={short_id}').json()
    roomid = str(response['data']['room_id'])#真实房间号
    route = ['wss://hw-gz-live-comet-02.chat.bilibili.com/sub','wss://tx-bj-live-comet-03.chat.bilibili.com/sub','wss://hw-gz-live-comet-05.chat.bilibili.com/sub']#线路

    data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'.format(headerLen=hex(27 + len(roomid))[2:],roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))

    ws_client = WebsocketClient(route[0],bytes.fromhex(data_raw))
    ws_client.run()

    
    
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐