个人设计的抓包分析方式,可能比较low,欢迎大佬指点!

整体思路:将所有抓包数据先都解析一遍,再根据不同场景进行二次提取和分析

1. 抓包解析

1.1 主函数实现方式

主函数主要负责识别文件并且开启多线程去分析每个抓包,将所有分析后的数据整理一起。

因此需要先声明一个self.pcapPacket_list = []用于存储每个抓包的内容,最后总结成dataframe数据

1.读取抓包文件

packets = PcapReader(self.pcap_file_path).read_all()

2.多线程解析每个抓包的内容

id = 0
with ThreadPoolExecutor(max_workers=4) as executor:
    for packet in packets:
        id += 1
        executor.submit(process_packet, packet, id, self.pcapPacket_list, self.pcap_file_path)

3.所有线程结束后,进行数据收集,写入本地数据库

all_data = pd.concat(self.pcapPacket_list, ignore_index=True)
all_data.to_sql('pcap_info', self.conn, if_exists='append', index=False)

1.2 解析函数实现方式

1.获取本次抓包的主要信息,主要有源地址端口、目的地址端口、抓包传输协议和抓包时间

# 获取源地址和源端口
src_ip = packet[IP].src if packet.haslayer(IP) else "N/A"
src_port = packet[TCP].sport if packet.haslayer(TCP) else (packet[UDP].sport if packet.haslayer(UDP) else "N/A")
dst_ip = packet[IP].dst if packet.haslayer(IP) else "N/A"
dst_port = packet[TCP].dport if packet.haslayer(TCP) else (packet[UDP].dport if packet.haslayer(UDP) else "N/A")
protocol = packet[IP].proto if packet.haslayer(IP) else "N/A"
protocol = get_protocol_name(protocol)
timestamp = datetime.fromtimestamp(float(packet.time))
packet_time = timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')  # 格式化为 年-月-日 时:分:秒.毫秒

抓包协议定义如下:

def get_protocol_name(protocol):
    protocol_names = {
        1: "ICMP",
        6: "TCP",
        17: "UDP",
        41: "IPv6",
        58: "ICMPv6",
        89: "OSPF"
    }
    return protocol_names.get(protocol, "Unknown")

3. 设计一个唯一键,将同一端口交互放在一起方便后续数据的提取。这边主要采用源端口+目的端口+文件名形成唯一键,两个端口需要做下大小判断,保障双向交互的唯一键一样

if src_port > dst_port:
    uniqueKey = str(dst_port) + str(src_port) + file_name
else:
    uniqueKey = str(src_port) + str(dst_port) + file_name

 4. 解析tcp层的内容,主要有seq、ack、flag值,seq和ack可以用来后续抓包去重,flag用来判断tcp的动作

if packet.haslayer(TCP):
    tcp_flags = packet[TCP].flags
    # 将 tcp_flags 转换为整数
    tcp_flags_int = int(tcp_flags)
    # 使用 Scapy 内置的方法获取标志位字符串
    flags_str = packet[TCP].sprintf('%TCP.flags%')
    # 定义一个字典来映射标志位字符串到描述性名称
    flag_mapping = {
        'SA': 'SYN-ACK',
        'S': 'SYN',
        'A': 'ACK',
        'R': 'RST',
        'P': 'PSH',
        'F': 'FIN',
        'U': 'URG',
        'E': 'ECE',
        'C': 'CWR',
        'PA': 'PSH-ACK',
        'FA': 'FIN-ACK',
        'RA': 'RST-ACK',
        'UPA': 'URG-PSH-ACK',
    }
    seq = packet[TCP].seq
    ack = packet[TCP].ack
    # 获取描述性名称
    flags = flag_mapping.get(flags_str, 'OTHER')
    # report_logger.info(f"TCP Flags (Hex): {tcp_flags_int:03X},{flags_str}:{flags}")

5.如果抓包存在内容则进行下一步的解析:

(1)将能转成utf-8和二进制的数据进行区分,转不成utf-8的数据存入二进制数据字段

(2)将头和内容进行区分,http存在请求头和内容

(3)整理所有数据写入之前定义的列表中

# 如果存在报文层
if packet.haslayer(Raw):
    raw_layer = packet[Raw]
    raw_info = raw_layer.load
    try:
        raw_info_utf = raw_layer.load.decode('utf-8')
        raw_info_utf = raw_info_utf.replace("'", '"')
        headers, body = parse_http_payload(raw_info_utf)
        data = {
            'id': id,
            'uniqueKey': uniqueKey,
            'packetTime': packet_time,
            'protocol': protocol,
            'srcIp': src_ip,
            'srcPort': src_port,
            'dstIp': dst_ip,
            'dstPort': dst_port,
            'flags': flags,
            'headers': headers,
            'body': body,
            'raw_data': None,
            'filePath': pcap_file_path,
            'extractionTime': '0',
            'seq': seq,
            'ack': ack
        }
        # 创建 DataFrame 并添加到列表中
        df = pd.DataFrame([data])
        df_list.append(df)
    except:
        data = {
            'id': id,
            'uniqueKey': uniqueKey,
            'packetTime': packet_time,
            'protocol': protocol,
            'srcIp': src_ip,
            'srcPort': src_port,
            'dstIp': dst_ip,
            'dstPort': dst_port,
            'flags': flags,
            'headers': 'UnsupportedData',
            'body': 'UnsupportedData',
            'raw_data': raw_info,
            'filePath': pcap_file_path,
            'extractionTime': '0',
            'seq': seq,
            'ack': ack
        }
        # 创建 DataFrame 并添加到列表中
        df = pd.DataFrame([data])
        df_list.append(df)
else:
    data = {
        'id': id,
        'uniqueKey': uniqueKey,
        'packetTime': packet_time,
        'protocol': protocol,
        'srcIp': src_ip,
        'srcPort': src_port,
        'dstIp': dst_ip,
        'dstPort': dst_port,
        'flags': flags,
        'headers': '',
        'body': '',
        'raw_data': None,
        'filePath': pcap_file_path,
        'extractionTime': '0',
        'seq': seq,
        'ack': ack
    }
    # 创建 DataFrame 并添加到列表中
    df = pd.DataFrame([data])
    df_list.append(df)

 报文头和内容解析方式如下:

def parse_http_payload(payload):
    try:
        # 分割头部和内容
        headers, body = payload.split('\r\n\r\n', 1)
        return headers, body
    except UnicodeDecodeError:
        # 如果解码失败,返回原始负载
        return '', payload
    except ValueError:
        # 如果没有找到两个连续的 \r\n\r\n,返回原始负载
        return '', payload

2. 提取HTTP请求

http请求基本都是明文,格式一般如下包含了请求头、请求内容、响应头和响应内容:

POST /VIID/System/Keepalive HTTP/1.1

Accept: */*

Connection: close

Content-Length: 79

Content-Type: application/VIID+JSON;charset=UTF-8

Host: 32.104.63.40

User-Agent: cpp-httplib/0.9

{"KeepaliveObject":{"DeviceID":"******","ProtocolVersion":"2.0"}}

HTTP/1.1 200 OK

Content-Type: application/json

Content-Length: 327

{"ResponseStatusObject":{"RequestURL":"/VIID/System/Keepalive","StatusCode":"0"}}

 2.1 主函数实现方式

1.主函数主要需要先过滤出需要的请求头报文,常见的请求头包括POST、PUT、GET、DELETE、OPTIONS、DESCRIBE、SETUP、PLAY、PATCH、HEAD。

同时需要考虑抓包文件中重传、乱序的影响

pcap_info = pd.read_sql_query(f"SELECT * FROM pcap_info where filePath = '{self.pcap_file_path}' ORDER BY id ASC", self.conn)
# 按照 uniqueKey, seq 和 ack 去重
pcap_info = pcap_info.drop_duplicates(subset=['uniqueKey','flags','seq', 'ack'], keep='last')
# 规避抓包乱序问题导致工具整理内容乱的问题
pcap_info['sum_seq_ack'] = pcap_info['seq'] + pcap_info['ack']
sorted_pcap = pcap_info.sort_values(['sum_seq_ack', 'id'])
sorted_pcap = sorted_pcap.reset_index(drop=True)
# 检查包含关键字的行
filtered_rows = sorted_pcap.loc[sorted_pcap['headers'].str.contains(
    f"^POST\s+.*{re.escape(queryKeywords)}|^PUT\s+.*{re.escape(queryKeywords)}|^GET\s+.*{re.escape(queryKeywords)}|"
    f"^DELETE\s+.*{re.escape(queryKeywords)}|^OPTIONS\s+.*{re.escape(queryKeywords)}|^DESCRIBE\s+.*{re.escape(queryKeywords)}|"
    f"^SETUP\s+.*{re.escape(queryKeywords)}|^PLAY\s+.*{re.escape(queryKeywords)}|"
    f"^PATCH\s+.*{re.escape(queryKeywords)}|^HEAD\s+.*{re.escape(queryKeywords)}", case=False, na=False)]

2.提取头后多线程进行分析

with ThreadPoolExecutor(max_workers=4) as executor:
    for idx in filtered_rows.index:
        executor.submit(process_httpExtract, sorted_pcap, filtered_rows, idx,self.httpResult_list)

3.整理所有的http请求 ,写入本地数据库

if self.httpResult_list:
    all_data = pd.concat(self.httpResult_list, ignore_index=True)
    all_data.to_sql('http_result', self.conn, if_exists='append', index=False)

2.2 解析函数实现方式

 1.解析函数需要包含源地址端口信息、目的地址端口信息、和抓包关联的唯一键信息等

keywords = ''
client_info = filtered_rows.loc[idx, 'srcIp'] + ':' + str(filtered_rows.loc[idx, 'srcPort'])
server_info = filtered_rows.loc[idx, 'dstIp'] + ':' + str(filtered_rows.loc[idx, 'dstPort'])
uniqueKey = filtered_rows.loc[idx, 'uniqueKey']
id = filtered_rows.loc[idx, 'id']
thread_rows = pcap_info.loc[(pcap_info['uniqueKey'] == uniqueKey) & (pcap_info['id'] >= id)]
isHeaders = 0
request_hreaders = ''
request_body = ''
response_hreaders = ''
response_body = ''
request_time = 0.0
response_time = 0.0

2.提取请求头、请求内容、响应头和响应内容

isHeaders = 0
for idm in thread_rows.index:
    # 先设定一个isHeaders用于判断接收到头的次数,如果大于2就退出
    if isHeaders > 2 and thread_rows.loc[idm, 'headers'] != '':
        break
    # 考虑头和内容在同一个tcp包中的情况
    if thread_rows.loc[idm, 'headers'] != '' and thread_rows.loc[idm, 'body'] != '':
        isHeaders += 1
        if isHeaders == 1:
            request_hreaders = thread_rows.loc[idm, 'headers']
            request_body = thread_rows.loc[idm, 'body']
            request_time = thread_rows.loc[idm, 'packetTime']
        elif isHeaders == 2:
            response_hreaders = thread_rows.loc[idm, 'headers']
            response_body = thread_rows.loc[idm, 'body']
            response_time = thread_rows.loc[idm, 'packetTime']
    # 考虑仅有头没有内容的包
    elif thread_rows.loc[idm, 'headers'] != '':
        isHeaders += 1
        if isHeaders == 1:
            request_hreaders = thread_rows.loc[idm, 'headers']
            request_time = thread_rows.loc[idm, 'packetTime']
        elif isHeaders == 2:
            response_hreaders = thread_rows.loc[idm, 'headers']
            response_time = thread_rows.loc[idm, 'packetTime']
    # 考虑仅有内容的包
    elif thread_rows.loc[idm, 'body'] != '':
        if isHeaders == 1:
            request_body += thread_rows.loc[idm, 'body']
        elif isHeaders == 2:
            response_body += thread_rows.loc[idm, 'body']

3.请求头处理,提取出调用的接口,其中通过get_http_dictionary()函数获取接口名称(接口名称是自定义的)

if request_hreaders != '':
    # 查找匹配
    match = re.search(r'(POST|PUT|DELETE|OPTIONS|PATCH|HEAD|GET|DESCRIBE|SETUP|PLAY)\s+(.*?)\s+', request_hreaders)
    if match:
        keywords = match.group(1)
        http_request_hreaders = match.group(2)
        if keywords in ['OPTIONS', 'DESCRIBE', 'SETUP', 'PLAY']:
            if re.findall('playBackMode', http_request_hreaders):
                interface_name = '回放请求'
            else:
                interface_name = '取流请求'
            component = 'rtsp'
        else:
            interface = http_request_hreaders.split('?', 1)[0]
            conn = tool_db.get_conn()
            componentQuery, nameQuery = tool_db.get_http_dictionary(conn, interface)
            if componentQuery == '' or nameQuery == '':
                interface_name = '未定义'
                parts = interface.split('/')
                # 提取第一个非空部分
                component = next((part for part in parts if part), '未定义')
        # onvif交互另外解析关键字
        if re.findall('^/onvif', http_request_hreaders):
            report_logger.info("本次为onvif业务的抓包")
            keyword_match = re.findall('action.*/(\w+)', request_hreaders)
            if keyword_match:
                keywords = keyword_match[0]
    else:
        http_request_hreaders = request_hreaders.split('\n')[0]
        interface_name = '未定义'
        component = '未定义'
        report_logger.info(f'http_request_hreaders:{http_request_hreaders}')
else:
    interface_name = '未定义'
    component = '未定义'
    http_request_hreaders = ''

 4.请求内容处理,部分透传接口提取关键信息

if request_body != '':
    request_body_replace = request_body.replace('\n', '')
    match = re.findall('[\[|{].*[}|\]]', request_body_replace)
    if match:
        http_request_body = match[0]
        try:
            requestBody = json.loads(http_request_body)
            # 判断是否存在code字段
            if 'method' in requestBody:
                keywords = requestBody['method']
        except Exception:
            pass
        report_logger.info(f'http_request_body:{http_request_body}')
    else:
        http_request_body = request_body
        report_logger.info(f'http_request_body:{http_request_body}')
else:
    http_request_body = ''

 3.响应头处理,需要提取响应的结果常见有200、400、502、302等,方便后续展示。如果响应报文中存在特殊错误码,优先展示错误码

if response_hreaders != '':
    # 使用正则表达式提取状态码
    match = re.search(r'(HTTP|RTSP)/\d+\.\d+ (\d{3})', response_hreaders.split('\n')[0])
    # 取流情况需要从头提取错误码
    errorCodeMatch = re.findall('errcode=(\d+)', response_hreaders)
    if errorCodeMatch:
        errorCode = errorCodeMatch[0]
        errorCode_hex = f"0x{int(errorCode):08X}"
        http_response_hreaders = str(errorCode_hex)
    elif match:
        http_response_hreaders = match.group(2)
        # 使用json加载responseBody
        try:
            responseBody = json.loads(http_response_body)
            # 判断是否存在code字段
            if 'code' in responseBody:
                if responseBody['code'] != '0':
                    http_response_hreaders = responseBody['code']
        except Exception as e:
            pass
    else:
        http_response_hreaders = 'unknown'
    report_logger.info(f'http_response_hreaders:{http_response_hreaders}')
else:
    http_response_hreaders = ''

4.响应内容处理,这里能提取json报文优先提取,大部分响应内容都是符合json格式的,但是需要去掉不必要的换行和空格

if response_body != '':
    # 去掉响应内容中的换行和空格
    response_body = re.sub('[\n|\s]', '', response_body)
    match = re.findall('[\[|{].*[}|\]]', response_body, re.MULTILINE)
    if match:
        http_response_body = match[0]
        report_logger.info(f'http_response_body:{http_response_body}')
    else:
        http_response_body = response_body
        report_logger.info(f'http_response_body:{http_response_body}')
else:
    http_response_body = ''

5.响应耗时计算

if request_time != 0.0 and response_time != 0.0:
    report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
    request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
    response_time_float = datetime.strptime(response_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
    responseTime = response_time_float - request_time_float
    responseTime = round(responseTime, 3)
else:
    responseTime = 0.000

 6.数据整理

data = {
    'id': int(id),
    'uniqueKey': uniqueKey,
    'responseTime': str(responseTime),
    'beginTime': request_time,
    'clientInfo': client_info,
    'serverInfo': server_info,
    'requestHreaders': http_request_hreaders,
    'requestBody': http_request_body,
    'responseHeaders': http_response_hreaders,
    'responseBody': http_response_body,
    'originalRequestHreaders': request_hreaders,
    'originalResponseHeaders': response_hreaders,
    'keywords': keywords,
    'component': component,
    'interfaceName': interface_name
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
httpResult_list.append(df)

3. 抓包信息展示

3.1 报表的设计

1.抓包主要展示内容有:'所属组件', '请求接口', '接口名称', '响应结果', '查看详情', '响应耗时', '请求时间', '关键信息'。

2.由于http请求包含取流,取流一条链路中存在多个请求头,因此我们只需要提取第一个请求的OPTION,过滤掉其他的'DESCRIBE', 'SETUP', 'PLAY'

data_info = data_info.loc[~data_info['keywords'].isin(['DESCRIBE', 'SETUP', 'PLAY'])].reset_index(drop=True)

3.取流请求和http请求需要分开处理,展示不同的颜色

if keywords == 'OPTIONS':
    uniqueKey = data_info.loc[row, 'uniqueKey']
    responseHeaders = tool_db.excute_sql(
        f"select responseHeaders from http_result where uniqueKey = '{uniqueKey}' ORDER BY id ASC")[-1]
    announce = tool_db.excute_sql(
        f"SELECT headers FROM pcap_info where headers like 'ANNOUNCE%' and uniqueKey = '{uniqueKey}'")[-1]
    # 取流中服务端会通过ANNOUNCE上报报错信息
    if announce != '':
        errorCodeMatch = re.findall('errcode=(\d+)', announce)
        if errorCodeMatch:
            errorCode = errorCodeMatch[0]
            errorCode_hex = f"0x{int(errorCode):08X}"
            responseHeaders = str(errorCode_hex)
    keywords = tool_db.excute_sql(f"select keywords from http_result where uniqueKey = '{uniqueKey}' ORDER BY id ASC")[
        -1]

    if re.findall('^20\d$', responseHeaders):
        item = QTableWidgetItem('成功')
        item.setForeground(QColor("#1DB84C"))  # 设置绿色
        result_item.setForeground(QColor(0, 128, 0))
    elif re.findall('^30\d$', responseHeaders):
        item = QTableWidgetItem('重定向')
        item.setForeground(QColor("#FFA45A"))  # 设置橙色
        result_item.setForeground(QColor("#FF832B"))
    elif re.findall('401', responseHeaders):
        item = QTableWidgetItem('未鉴权')
        item.setForeground(QColor("#EC4E51"))  # 设置红色
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('404', responseHeaders):
        item = QTableWidgetItem('接口不通')
        item.setForeground(QColor("#EC4E51"))  # 设置红色
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('^40\d$', responseHeaders):
        item = QTableWidgetItem('失败')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('^50\d$', responseHeaders):
        item = QTableWidgetItem('异常')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    elif responseHeaders == '':
        item = QTableWidgetItem('无响应')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    else:
        item = QTableWidgetItem(responseHeaders)
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
else:
    responseHeaders = data_info.loc[row, 'responseHeaders']
    if re.findall('^20\d$', responseHeaders):
        item = QTableWidgetItem('成功')
        item.setForeground(QColor("#1DB84C"))  # 设置绿色
        result_item.setForeground(QColor(0, 128, 0))
    elif re.findall('^30\d$', responseHeaders):
        item = QTableWidgetItem('重定向')
        item.setForeground(QColor("#FFA45A"))  # 设置橙色
        result_item.setForeground(QColor("#FF832B"))
    elif re.findall('401', responseHeaders):
        item = QTableWidgetItem('未鉴权')
        item.setForeground(QColor("#EC4E51"))  # 设置红色
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('404', responseHeaders):
        item = QTableWidgetItem('接口不通')
        item.setForeground(QColor("#EC4E51"))  # 设置红色
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('^40\d$', responseHeaders):
        item = QTableWidgetItem('失败')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    elif re.findall('^50\d$', responseHeaders):
        item = QTableWidgetItem('异常')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    elif responseHeaders == '':
        item = QTableWidgetItem('无响应')
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))
    else:
        item = QTableWidgetItem(responseHeaders)
        item.setForeground(QColor("#EC4E51"))
        result_item.setForeground(QColor(255, 0, 0))

 其他信息都是在http提取时填入数据库,直接获取使用即可,这边不再赘述

3.2 抓包内容的展示

1.抓包展示这边参考wireshark的红蓝格式

requestHreaders = data_info.loc[idx, 'originalRequestHreaders']
headers_list = requestHreaders.splitlines()
for line in headers_list:
    escaped_line = html.escape(line) # 需要转换格式,如果报文中存在<>会导致http内容展示缺失
    html_content += f"<p><span style='color:{red_font_color}'>{escaped_line}</span></p>"
html_content += f"<p><pre> </pre></p>"
requestBody = data_info.loc[idx, 'requestBody']
html_content += f"<p><span style='color:{red_font_color}'>{html.escape(requestBody)}</span></p>"
responseHeaders = data_info.loc[idx, 'originalResponseHeaders']
headers_list = responseHeaders.splitlines()
for line in headers_list:
    html_content += f"<p><span style='color:{blue_font_color}'>{line}</span></p>"
html_content += f"<p><pre> </pre></p>"
responseBody = data_info.loc[idx, 'responseBody']
responseBodyList = responseBody.splitlines()
for line in responseBodyList:
    escaped_line = html.escape(line)
    html_content += f"<p><span style='color:{blue_font_color}'>{escaped_line}</span></p>"

2.可以对抓包中存在的报错提供一些建议,或者特殊内容处理,这边不再展开。

4.提取MQTT信令

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅(Publish/Subscribe)消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计,尤其适用于物联网(IoT)场景。

4.1 mqtt注册信息解析

def parse_connect(data,fixed_header):

    offset = fixed_header["header_length"]

    # 解析协议名(UTF-8字符串)
    protocol_name_len = int.from_bytes(data[offset:offset + 2], 'big')
    offset += 2
    protocol_name = data[offset:offset + protocol_name_len].decode('utf-8')
    offset += protocol_name_len

    # 协议版本(1字节)
    protocol_level = data[offset]
    offset += 1

    # 连接标志(1字节)
    connect_flags = data[offset]
    clean_session = connect_flags & 0x02
    will_flag = connect_flags & 0x04
    username_flag = bool(connect_flags & 0x80)
    password_flag = bool(connect_flags & 0x40)
    offset += 1

    # 心跳间隔(2字节)
    keep_alive = int.from_bytes(data[offset:offset + 2], 'big')
    offset += 2

    # 客户端ID(UTF-8字符串)
    client_id_len = int.from_bytes(data[offset:offset + 2], 'big')
    offset += 2
    client_id = data[offset:offset + client_id_len].decode('utf-8')
    offset += client_id_len
    username = parse_utf8_str(data, offset) if username_flag else None
    username_len = int.from_bytes(data[offset:offset + 2], 'big')
    offset += username_len
    offset += 2
    return {
        "protocol_name": protocol_name,
        "protocol_level": protocol_level,
        "clean_session": clean_session,
        "keep_alive": keep_alive,
        "client_id": client_id,
        "will_flag": will_flag,
        "username": username,
        "password": parse_utf8_str(data, offset) if password_flag else None
    }

def parse_utf8_str(data, offset):
    str_len = int.from_bytes(data[offset:offset + 2], 'big')
    offset += 2
    return data[offset:offset + str_len].decode('utf-8')

 4.2 推送消息解析

def parse_publish(data,fixed_header):
    offset = fixed_header["header_length"]

    # 主题名(UTF-8字符串)
    topic_len = int.from_bytes(data[offset:offset + 2], 'big')
    offset += 2
    topic = data[offset:offset + topic_len].decode('utf-8')
    offset += topic_len


    # 消息内容(offset+2去掉前面无用的两个字节)
    payload = data[offset+2:offset + fixed_header["remaining_length"] - (offset - fixed_header["header_length"])].decode('utf-8')
    seq_match = re.findall(r'Seq.*?(\w+)', payload)
    seq = seq_match[0] if seq_match else 'PSH-ACK'
    return {
        "topic": topic,
        "seq": seq,
        "payload": payload
    }

4.3 从抓包中解析mqtt

mqtt和tcp解析方式不一样,所以和提取http方式要单独开来。结合上面两个方法,将注册和消息推送单独解析。

注意:消息推送中单独解析了seq字段放到flgs是为了后面整理用上

# 处理非http数据,按照mqtt方式处理
try:
    headers = 'UnsupportedData'
    body = 'UnsupportedData'
    fixed_header = parse_fixed_header(raw_info)
    if fixed_header["packet_type"] == 3:  # PUBLISH类型为3
        protocol = 'MQTT'
        publish_info = parse_publish(raw_info, fixed_header)
        headers = publish_info['topic']
        body = str(publish_info['payload'])
        flags = publish_info['seq']
    elif fixed_header["packet_type"] == 1:  # CONNECT类型为1
        protocol = 'MQTT'
        connect_info = parse_connect(raw_info, fixed_header)
        headers = 'ConnectCommand'
        body = str(connect_info)
    elif fixed_header["packet_type"] == 2:  # Connect Ack类型为2
        protocol = 'MQTT'
        headers = 'ConnectAck'
        body = ''
    elif fixed_header["packet_type"] == 8:  # Subscribe类型为8
        protocol = 'MQTT'
        headers = 'Subscribe'
        body = ''
    elif fixed_header["packet_type"] == 9:  # Subscribe Ack类型为9
        protocol = 'MQTT'
        headers = 'SubscribeAck'
        body = ''
    elif fixed_header["packet_type"] == 12:  # ping request类型为12
        protocol = 'MQTT'
        headers = 'ping'
        body = ''
    elif fixed_header["packet_type"] == 13:  # ping response类型为13
        protocol = 'MQTT'
        headers = 'pong'
        body = ''
    elif fixed_header["packet_type"] == 14:  # Disconnect类型为13
        protocol = 'MQTT'
        headers = 'Disconnect'
        body = ''
    else:
        report_logger.debug(f"Unknown packet type: {id, fixed_header['packet_type']}")
    data = {
        'id': id,
        'uniqueKey': uniqueKey,
        'packetTime': packet_time,
        'protocol': protocol,
        'srcIp': src_ip,
        'srcPort': src_port,
        'dstIp': dst_ip,
        'dstPort': dst_port,
        'flags': flags,
        'headers': headers,
        'body': body,
        'raw_data': raw_info,
        'filePath': pcap_file_path,
        'extractionTime': '0',
        'seq': seq,
        'ack': ack
    }
    # 创建 DataFrame 并添加到列表中
    df = pd.DataFrame([data])
    df_list.append(df)
except Exception as e:
    data = {
        'id': id,
        'uniqueKey': uniqueKey,
        'packetTime': packet_time,
        'protocol': protocol,
        'srcIp': src_ip,
        'srcPort': src_port,
        'dstIp': dst_ip,
        'dstPort': dst_port,
        'flags': flags,
        'headers': 'UnsupportedData',
        'body': 'UnsupportedData',
        'raw_data': raw_info,
        'filePath': pcap_file_path,
        'extractionTime': '0',
        'seq': seq,
        'ack': ack
    }
    # 创建 DataFrame 并添加到列表中
    df = pd.DataFrame([data])
    df_list.append(df)

4.4整理mqtt消息

抓包整理后,需要提取成可以展示的内容,这里关于报表展示参考第三章节内容即可。整理需要注意一下几点:

(1)注册-注册响应、订阅-订阅响应、心跳交互都是成双成对

(2)消息推送中请求和响应的seq是一致的,同时注意可能存在重复

def process_mqttExtract(self, pcapInfo):
logger.debug(f"process_mqttExtract")
try:
    processed_ids = set()
    httpResult_list = []
    devices = {}
    for idx in pcapInfo.index:
        if idx in processed_ids:  # 跳过已处理的记录
            continue
        keywords = ''
        report_logger.debug(f"正在处理包('{pcapInfo.loc[idx, 'id']}')")
        client_info = pcapInfo.loc[idx, 'srcIp'] + ':' + str(pcapInfo.loc[idx, 'srcPort'])
        server_info = pcapInfo.loc[idx, 'dstIp'] + ':' + str(pcapInfo.loc[idx, 'dstPort'])
        uniqueKey = pcapInfo.loc[idx, 'uniqueKey']
        id = pcapInfo.loc[idx, 'id']
        request_time = pcapInfo.loc[idx, 'packetTime']
        responseTime = 0.000
        interFaces = {
            "ConnectCommand": "设备注册",
            "ConnectAck": "注册响应",
            "Subscribe": "设备订阅",
            "SubscribeAck": "订阅响应",
            "ping": "心跳包",
            "pong": "心跳响应",
            "Disconnect": "设备注销"
        }
        # 注册、心跳流程包
        if pcapInfo.loc[idx, 'flags'] == 'PSH-ACK':
            request_headers = pcapInfo.loc[idx, 'headers']
            request_body = pcapInfo.loc[idx, 'body']

            interfaceName = interFaces.get(request_headers, '未定义')
            if request_headers == 'ConnectCommand':
                data_dict = json.loads(request_body.replace("'", '"'))
                if uniqueKey in devices:
                    pass
                else:
                    devices[uniqueKey] = data_dict['username']
                responsePacket = pcapInfo.loc[
                    (pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (
                                pcapInfo['headers'] == 'ConnectAck')]
                if not responsePacket.empty:
                    target_row = responsePacket.iloc[0]
                    response_headers = '200'
                    origin_response_headers = target_row['headers']
                    response_time = target_row['packetTime']
                    response_body = f"响应时间:{response_time}"
                    if request_time != 0.0 and response_time != 0.0:
                        report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
                        request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        response_time_float = datetime.strptime(response_time,
                                                                '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        responseTime = response_time_float - request_time_float
                        responseTime = round(responseTime, 3)
                    # 删除处理过的记录
                    report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
                    pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
                    processed_ids.add(target_row.name)
                else:
                    report_logger.info(f"未找到注册响应")
                    response_headers = ""
                    response_body = ""
                    origin_response_headers = ""
            elif request_headers == 'Subscribe':
                responsePacket = pcapInfo.loc[
                    (pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (
                                pcapInfo['headers'] == 'SubscribeAck')]
                if not responsePacket.empty:
                    target_row = responsePacket.iloc[0]
                    response_headers = '200'
                    origin_response_headers = target_row['headers']
                    response_time = target_row['packetTime']
                    response_body = f"订阅响应时间:{response_time}"
                    if request_time != 0.0 and response_time != 0.0:
                        report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
                        request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        response_time_float = datetime.strptime(response_time,
                                                                '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        responseTime = response_time_float - request_time_float
                        responseTime = round(responseTime, 3)
                    # 删除处理过的记录
                    report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
                    pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
                    processed_ids.add(target_row.name)
                else:
                    report_logger.info(f"未找到注册响应")
                    response_headers = ""
                    response_body = ""
                    origin_response_headers = ""
            elif request_headers == 'ping':
                responsePacket = pcapInfo.loc[
                    (pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (pcapInfo['headers'] == 'pong')]
                if not responsePacket.empty:
                    target_row = responsePacket.iloc[0]
                    response_headers = '200'
                    origin_response_headers = target_row['headers']
                    response_time = target_row['packetTime']
                    response_body = f"心跳响应时间:{response_time}"
                    if request_time != 0.0 and response_time != 0.0:
                        report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
                        request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        response_time_float = datetime.strptime(response_time,
                                                                '%Y-%m-%d %H:%M:%S.%f').timestamp()
                        responseTime = response_time_float - request_time_float
                        responseTime = round(responseTime, 3)
                    # 删除处理过的记录
                    report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
                    pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
                    processed_ids.add(target_row.name)
                else:
                    report_logger.info(f"未找到注册响应")
                    response_headers = ""
                    response_body = ""
                    origin_response_headers = ""
            else:
                response_headers = "200"
                response_body = "设备发起断开无需响应"
                origin_response_headers = "200"
        else:
            # 接口调用情况下
            request_headers = pcapInfo.loc[idx, 'headers']
            deviceIdMatch = re.findall("/iot/(\w+)", request_headers)
            deviceId = deviceIdMatch[0] if deviceIdMatch else ''
            if uniqueKey in devices:
                pass
            else:
                devices[uniqueKey] = deviceId
            request_body = pcapInfo.loc[idx, 'body']
            seq = pcapInfo.loc[idx, 'flags']
            interfaceName = 'MQTT主题接口'
            # 找到大于本次idx,最小的idx,并且flags相等的数据
            report_logger.info(f"本次seq为:'{seq}'")
            responsePacket = pcapInfo.loc[
                (pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (pcapInfo['flags'] == seq)]
            if not responsePacket.empty:
                target_row = responsePacket.iloc[0]
                response_headers = '200'
                origin_response_headers = target_row['headers']
                response_body = target_row['body']
                response_time = target_row['packetTime']
                if request_time != 0.0 and response_time != 0.0:
                    report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
                    request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
                    response_time_float = datetime.strptime(response_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
                    responseTime = response_time_float - request_time_float
                    responseTime = round(responseTime, 3)
                # 删除处理过的记录
                report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
                pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
                processed_ids.add(target_row.name)
            else:
                report_logger.info(f"未找到响应包:'{seq}'")
                response_headers = ""
                response_body = ""
                origin_response_headers = ""

        data = {
            'id': int(id),
            'uniqueKey': uniqueKey,
            'responseTime': responseTime,
            'beginTime': request_time,
            'clientInfo': client_info,
            'serverInfo': server_info,
            'requestHreaders': request_headers,
            'requestBody': request_body,
            'responseHeaders': response_headers,
            'responseBody': response_body,
            'originalRequestHreaders': request_headers,
            'originalResponseHeaders': origin_response_headers,
            'keywords': keywords,
            'component': 'mqtt',
            'interfaceName': interfaceName
        }
        df = pd.DataFrame([data])
        httpResult_list.append(df)
    if httpResult_list:
        all_data = pd.concat(httpResult_list, ignore_index=True)
        report_logger.info(f"devices:'{devices}'")
        all_data['keywords'] = all_data['uniqueKey'].map(devices).fillna(all_data['keywords'])
        all_data.to_sql('http_result', self.conn, if_exists='append', index=False)
        return True
    else:
        return False
except Exception as e:
    logger.error(f'异常:{str(e)}')
    logger.error(traceback.format_exc())
    return False

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐