使用python实现抓包分析
从抓包中提取http请求并展示报文
个人设计的抓包分析方式,可能比较low,欢迎大佬指点!
整体思路:将所有抓包数据先都解析一遍,再根据不同场景进行二次提取和分析
1. 抓包解析
1.1 主函数实现方式
主函数主要负责识别文件并且开启多线程去分析每个抓包,将所有分析后的数据整理一起。
因此需要先声明一个self.pcapPacket_list = []用于存储每个抓包的内容,最后总结成dataframe数据
1.读取抓包文件
packets = PcapReader(self.pcap_file_path).read_all()
2.多线程解析每个抓包的内容
id = 0
with ThreadPoolExecutor(max_workers=4) as executor:
for packet in packets:
id += 1
executor.submit(process_packet, packet, id, self.pcapPacket_list, self.pcap_file_path)
3.所有线程结束后,进行数据收集,写入本地数据库
all_data = pd.concat(self.pcapPacket_list, ignore_index=True)
all_data.to_sql('pcap_info', self.conn, if_exists='append', index=False)
1.2 解析函数实现方式
1.获取本次抓包的主要信息,主要有源地址端口、目的地址端口、抓包传输协议和抓包时间
# 获取源地址和源端口
src_ip = packet[IP].src if packet.haslayer(IP) else "N/A"
src_port = packet[TCP].sport if packet.haslayer(TCP) else (packet[UDP].sport if packet.haslayer(UDP) else "N/A")
dst_ip = packet[IP].dst if packet.haslayer(IP) else "N/A"
dst_port = packet[TCP].dport if packet.haslayer(TCP) else (packet[UDP].dport if packet.haslayer(UDP) else "N/A")
protocol = packet[IP].proto if packet.haslayer(IP) else "N/A"
protocol = get_protocol_name(protocol)
timestamp = datetime.fromtimestamp(float(packet.time))
packet_time = timestamp.strftime('%Y-%m-%d %H:%M:%S.%f') # 格式化为 年-月-日 时:分:秒.毫秒
抓包协议定义如下:
def get_protocol_name(protocol):
protocol_names = {
1: "ICMP",
6: "TCP",
17: "UDP",
41: "IPv6",
58: "ICMPv6",
89: "OSPF"
}
return protocol_names.get(protocol, "Unknown")
3. 设计一个唯一键,将同一端口交互放在一起方便后续数据的提取。这边主要采用源端口+目的端口+文件名形成唯一键,两个端口需要做下大小判断,保障双向交互的唯一键一样
if src_port > dst_port:
uniqueKey = str(dst_port) + str(src_port) + file_name
else:
uniqueKey = str(src_port) + str(dst_port) + file_name
4. 解析tcp层的内容,主要有seq、ack、flag值,seq和ack可以用来后续抓包去重,flag用来判断tcp的动作
if packet.haslayer(TCP):
tcp_flags = packet[TCP].flags
# 将 tcp_flags 转换为整数
tcp_flags_int = int(tcp_flags)
# 使用 Scapy 内置的方法获取标志位字符串
flags_str = packet[TCP].sprintf('%TCP.flags%')
# 定义一个字典来映射标志位字符串到描述性名称
flag_mapping = {
'SA': 'SYN-ACK',
'S': 'SYN',
'A': 'ACK',
'R': 'RST',
'P': 'PSH',
'F': 'FIN',
'U': 'URG',
'E': 'ECE',
'C': 'CWR',
'PA': 'PSH-ACK',
'FA': 'FIN-ACK',
'RA': 'RST-ACK',
'UPA': 'URG-PSH-ACK',
}
seq = packet[TCP].seq
ack = packet[TCP].ack
# 获取描述性名称
flags = flag_mapping.get(flags_str, 'OTHER')
# report_logger.info(f"TCP Flags (Hex): {tcp_flags_int:03X},{flags_str}:{flags}")
5.如果抓包存在内容则进行下一步的解析:
(1)将能转成utf-8和二进制的数据进行区分,转不成utf-8的数据存入二进制数据字段
(2)将头和内容进行区分,http存在请求头和内容
(3)整理所有数据写入之前定义的列表中
# 如果存在报文层
if packet.haslayer(Raw):
raw_layer = packet[Raw]
raw_info = raw_layer.load
try:
raw_info_utf = raw_layer.load.decode('utf-8')
raw_info_utf = raw_info_utf.replace("'", '"')
headers, body = parse_http_payload(raw_info_utf)
data = {
'id': id,
'uniqueKey': uniqueKey,
'packetTime': packet_time,
'protocol': protocol,
'srcIp': src_ip,
'srcPort': src_port,
'dstIp': dst_ip,
'dstPort': dst_port,
'flags': flags,
'headers': headers,
'body': body,
'raw_data': None,
'filePath': pcap_file_path,
'extractionTime': '0',
'seq': seq,
'ack': ack
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
df_list.append(df)
except:
data = {
'id': id,
'uniqueKey': uniqueKey,
'packetTime': packet_time,
'protocol': protocol,
'srcIp': src_ip,
'srcPort': src_port,
'dstIp': dst_ip,
'dstPort': dst_port,
'flags': flags,
'headers': 'UnsupportedData',
'body': 'UnsupportedData',
'raw_data': raw_info,
'filePath': pcap_file_path,
'extractionTime': '0',
'seq': seq,
'ack': ack
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
df_list.append(df)
else:
data = {
'id': id,
'uniqueKey': uniqueKey,
'packetTime': packet_time,
'protocol': protocol,
'srcIp': src_ip,
'srcPort': src_port,
'dstIp': dst_ip,
'dstPort': dst_port,
'flags': flags,
'headers': '',
'body': '',
'raw_data': None,
'filePath': pcap_file_path,
'extractionTime': '0',
'seq': seq,
'ack': ack
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
df_list.append(df)
报文头和内容解析方式如下:
def parse_http_payload(payload):
try:
# 分割头部和内容
headers, body = payload.split('\r\n\r\n', 1)
return headers, body
except UnicodeDecodeError:
# 如果解码失败,返回原始负载
return '', payload
except ValueError:
# 如果没有找到两个连续的 \r\n\r\n,返回原始负载
return '', payload
2. 提取HTTP请求
http请求基本都是明文,格式一般如下包含了请求头、请求内容、响应头和响应内容:
POST /VIID/System/Keepalive HTTP/1.1
Accept: */*
Connection: close
Content-Length: 79
Content-Type: application/VIID+JSON;charset=UTF-8
Host: 32.104.63.40
User-Agent: cpp-httplib/0.9
{"KeepaliveObject":{"DeviceID":"******","ProtocolVersion":"2.0"}}
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 327
{"ResponseStatusObject":{"RequestURL":"/VIID/System/Keepalive","StatusCode":"0"}}
2.1 主函数实现方式
1.主函数主要需要先过滤出需要的请求头报文,常见的请求头包括POST、PUT、GET、DELETE、OPTIONS、DESCRIBE、SETUP、PLAY、PATCH、HEAD。
同时需要考虑抓包文件中重传、乱序的影响
pcap_info = pd.read_sql_query(f"SELECT * FROM pcap_info where filePath = '{self.pcap_file_path}' ORDER BY id ASC", self.conn)
# 按照 uniqueKey, seq 和 ack 去重
pcap_info = pcap_info.drop_duplicates(subset=['uniqueKey','flags','seq', 'ack'], keep='last')
# 规避抓包乱序问题导致工具整理内容乱的问题
pcap_info['sum_seq_ack'] = pcap_info['seq'] + pcap_info['ack']
sorted_pcap = pcap_info.sort_values(['sum_seq_ack', 'id'])
sorted_pcap = sorted_pcap.reset_index(drop=True)
# 检查包含关键字的行
filtered_rows = sorted_pcap.loc[sorted_pcap['headers'].str.contains(
f"^POST\s+.*{re.escape(queryKeywords)}|^PUT\s+.*{re.escape(queryKeywords)}|^GET\s+.*{re.escape(queryKeywords)}|"
f"^DELETE\s+.*{re.escape(queryKeywords)}|^OPTIONS\s+.*{re.escape(queryKeywords)}|^DESCRIBE\s+.*{re.escape(queryKeywords)}|"
f"^SETUP\s+.*{re.escape(queryKeywords)}|^PLAY\s+.*{re.escape(queryKeywords)}|"
f"^PATCH\s+.*{re.escape(queryKeywords)}|^HEAD\s+.*{re.escape(queryKeywords)}", case=False, na=False)]
2.提取头后多线程进行分析
with ThreadPoolExecutor(max_workers=4) as executor:
for idx in filtered_rows.index:
executor.submit(process_httpExtract, sorted_pcap, filtered_rows, idx,self.httpResult_list)
3.整理所有的http请求 ,写入本地数据库
if self.httpResult_list:
all_data = pd.concat(self.httpResult_list, ignore_index=True)
all_data.to_sql('http_result', self.conn, if_exists='append', index=False)
2.2 解析函数实现方式
1.解析函数需要包含源地址端口信息、目的地址端口信息、和抓包关联的唯一键信息等
keywords = ''
client_info = filtered_rows.loc[idx, 'srcIp'] + ':' + str(filtered_rows.loc[idx, 'srcPort'])
server_info = filtered_rows.loc[idx, 'dstIp'] + ':' + str(filtered_rows.loc[idx, 'dstPort'])
uniqueKey = filtered_rows.loc[idx, 'uniqueKey']
id = filtered_rows.loc[idx, 'id']
thread_rows = pcap_info.loc[(pcap_info['uniqueKey'] == uniqueKey) & (pcap_info['id'] >= id)]
isHeaders = 0
request_hreaders = ''
request_body = ''
response_hreaders = ''
response_body = ''
request_time = 0.0
response_time = 0.0
2.提取请求头、请求内容、响应头和响应内容
isHeaders = 0
for idm in thread_rows.index:
# 先设定一个isHeaders用于判断接收到头的次数,如果大于2就退出
if isHeaders > 2 and thread_rows.loc[idm, 'headers'] != '':
break
# 考虑头和内容在同一个tcp包中的情况
if thread_rows.loc[idm, 'headers'] != '' and thread_rows.loc[idm, 'body'] != '':
isHeaders += 1
if isHeaders == 1:
request_hreaders = thread_rows.loc[idm, 'headers']
request_body = thread_rows.loc[idm, 'body']
request_time = thread_rows.loc[idm, 'packetTime']
elif isHeaders == 2:
response_hreaders = thread_rows.loc[idm, 'headers']
response_body = thread_rows.loc[idm, 'body']
response_time = thread_rows.loc[idm, 'packetTime']
# 考虑仅有头没有内容的包
elif thread_rows.loc[idm, 'headers'] != '':
isHeaders += 1
if isHeaders == 1:
request_hreaders = thread_rows.loc[idm, 'headers']
request_time = thread_rows.loc[idm, 'packetTime']
elif isHeaders == 2:
response_hreaders = thread_rows.loc[idm, 'headers']
response_time = thread_rows.loc[idm, 'packetTime']
# 考虑仅有内容的包
elif thread_rows.loc[idm, 'body'] != '':
if isHeaders == 1:
request_body += thread_rows.loc[idm, 'body']
elif isHeaders == 2:
response_body += thread_rows.loc[idm, 'body']
3.请求头处理,提取出调用的接口,其中通过get_http_dictionary()函数获取接口名称(接口名称是自定义的)
if request_hreaders != '':
# 查找匹配
match = re.search(r'(POST|PUT|DELETE|OPTIONS|PATCH|HEAD|GET|DESCRIBE|SETUP|PLAY)\s+(.*?)\s+', request_hreaders)
if match:
keywords = match.group(1)
http_request_hreaders = match.group(2)
if keywords in ['OPTIONS', 'DESCRIBE', 'SETUP', 'PLAY']:
if re.findall('playBackMode', http_request_hreaders):
interface_name = '回放请求'
else:
interface_name = '取流请求'
component = 'rtsp'
else:
interface = http_request_hreaders.split('?', 1)[0]
conn = tool_db.get_conn()
componentQuery, nameQuery = tool_db.get_http_dictionary(conn, interface)
if componentQuery == '' or nameQuery == '':
interface_name = '未定义'
parts = interface.split('/')
# 提取第一个非空部分
component = next((part for part in parts if part), '未定义')
# onvif交互另外解析关键字
if re.findall('^/onvif', http_request_hreaders):
report_logger.info("本次为onvif业务的抓包")
keyword_match = re.findall('action.*/(\w+)', request_hreaders)
if keyword_match:
keywords = keyword_match[0]
else:
http_request_hreaders = request_hreaders.split('\n')[0]
interface_name = '未定义'
component = '未定义'
report_logger.info(f'http_request_hreaders:{http_request_hreaders}')
else:
interface_name = '未定义'
component = '未定义'
http_request_hreaders = ''
4.请求内容处理,部分透传接口提取关键信息
if request_body != '':
request_body_replace = request_body.replace('\n', '')
match = re.findall('[\[|{].*[}|\]]', request_body_replace)
if match:
http_request_body = match[0]
try:
requestBody = json.loads(http_request_body)
# 判断是否存在code字段
if 'method' in requestBody:
keywords = requestBody['method']
except Exception:
pass
report_logger.info(f'http_request_body:{http_request_body}')
else:
http_request_body = request_body
report_logger.info(f'http_request_body:{http_request_body}')
else:
http_request_body = ''
3.响应头处理,需要提取响应的结果常见有200、400、502、302等,方便后续展示。如果响应报文中存在特殊错误码,优先展示错误码
if response_hreaders != '':
# 使用正则表达式提取状态码
match = re.search(r'(HTTP|RTSP)/\d+\.\d+ (\d{3})', response_hreaders.split('\n')[0])
# 取流情况需要从头提取错误码
errorCodeMatch = re.findall('errcode=(\d+)', response_hreaders)
if errorCodeMatch:
errorCode = errorCodeMatch[0]
errorCode_hex = f"0x{int(errorCode):08X}"
http_response_hreaders = str(errorCode_hex)
elif match:
http_response_hreaders = match.group(2)
# 使用json加载responseBody
try:
responseBody = json.loads(http_response_body)
# 判断是否存在code字段
if 'code' in responseBody:
if responseBody['code'] != '0':
http_response_hreaders = responseBody['code']
except Exception as e:
pass
else:
http_response_hreaders = 'unknown'
report_logger.info(f'http_response_hreaders:{http_response_hreaders}')
else:
http_response_hreaders = ''
4.响应内容处理,这里能提取json报文优先提取,大部分响应内容都是符合json格式的,但是需要去掉不必要的换行和空格
if response_body != '':
# 去掉响应内容中的换行和空格
response_body = re.sub('[\n|\s]', '', response_body)
match = re.findall('[\[|{].*[}|\]]', response_body, re.MULTILINE)
if match:
http_response_body = match[0]
report_logger.info(f'http_response_body:{http_response_body}')
else:
http_response_body = response_body
report_logger.info(f'http_response_body:{http_response_body}')
else:
http_response_body = ''
5.响应耗时计算
if request_time != 0.0 and response_time != 0.0:
report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
response_time_float = datetime.strptime(response_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
responseTime = response_time_float - request_time_float
responseTime = round(responseTime, 3)
else:
responseTime = 0.000
6.数据整理
data = {
'id': int(id),
'uniqueKey': uniqueKey,
'responseTime': str(responseTime),
'beginTime': request_time,
'clientInfo': client_info,
'serverInfo': server_info,
'requestHreaders': http_request_hreaders,
'requestBody': http_request_body,
'responseHeaders': http_response_hreaders,
'responseBody': http_response_body,
'originalRequestHreaders': request_hreaders,
'originalResponseHeaders': response_hreaders,
'keywords': keywords,
'component': component,
'interfaceName': interface_name
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
httpResult_list.append(df)
3. 抓包信息展示
3.1 报表的设计
1.抓包主要展示内容有:'所属组件', '请求接口', '接口名称', '响应结果', '查看详情', '响应耗时', '请求时间', '关键信息'。
2.由于http请求包含取流,取流一条链路中存在多个请求头,因此我们只需要提取第一个请求的OPTION,过滤掉其他的'DESCRIBE', 'SETUP', 'PLAY'
data_info = data_info.loc[~data_info['keywords'].isin(['DESCRIBE', 'SETUP', 'PLAY'])].reset_index(drop=True)
3.取流请求和http请求需要分开处理,展示不同的颜色
if keywords == 'OPTIONS':
uniqueKey = data_info.loc[row, 'uniqueKey']
responseHeaders = tool_db.excute_sql(
f"select responseHeaders from http_result where uniqueKey = '{uniqueKey}' ORDER BY id ASC")[-1]
announce = tool_db.excute_sql(
f"SELECT headers FROM pcap_info where headers like 'ANNOUNCE%' and uniqueKey = '{uniqueKey}'")[-1]
# 取流中服务端会通过ANNOUNCE上报报错信息
if announce != '':
errorCodeMatch = re.findall('errcode=(\d+)', announce)
if errorCodeMatch:
errorCode = errorCodeMatch[0]
errorCode_hex = f"0x{int(errorCode):08X}"
responseHeaders = str(errorCode_hex)
keywords = tool_db.excute_sql(f"select keywords from http_result where uniqueKey = '{uniqueKey}' ORDER BY id ASC")[
-1]
if re.findall('^20\d$', responseHeaders):
item = QTableWidgetItem('成功')
item.setForeground(QColor("#1DB84C")) # 设置绿色
result_item.setForeground(QColor(0, 128, 0))
elif re.findall('^30\d$', responseHeaders):
item = QTableWidgetItem('重定向')
item.setForeground(QColor("#FFA45A")) # 设置橙色
result_item.setForeground(QColor("#FF832B"))
elif re.findall('401', responseHeaders):
item = QTableWidgetItem('未鉴权')
item.setForeground(QColor("#EC4E51")) # 设置红色
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('404', responseHeaders):
item = QTableWidgetItem('接口不通')
item.setForeground(QColor("#EC4E51")) # 设置红色
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('^40\d$', responseHeaders):
item = QTableWidgetItem('失败')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('^50\d$', responseHeaders):
item = QTableWidgetItem('异常')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
elif responseHeaders == '':
item = QTableWidgetItem('无响应')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
else:
item = QTableWidgetItem(responseHeaders)
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
else:
responseHeaders = data_info.loc[row, 'responseHeaders']
if re.findall('^20\d$', responseHeaders):
item = QTableWidgetItem('成功')
item.setForeground(QColor("#1DB84C")) # 设置绿色
result_item.setForeground(QColor(0, 128, 0))
elif re.findall('^30\d$', responseHeaders):
item = QTableWidgetItem('重定向')
item.setForeground(QColor("#FFA45A")) # 设置橙色
result_item.setForeground(QColor("#FF832B"))
elif re.findall('401', responseHeaders):
item = QTableWidgetItem('未鉴权')
item.setForeground(QColor("#EC4E51")) # 设置红色
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('404', responseHeaders):
item = QTableWidgetItem('接口不通')
item.setForeground(QColor("#EC4E51")) # 设置红色
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('^40\d$', responseHeaders):
item = QTableWidgetItem('失败')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
elif re.findall('^50\d$', responseHeaders):
item = QTableWidgetItem('异常')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
elif responseHeaders == '':
item = QTableWidgetItem('无响应')
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
else:
item = QTableWidgetItem(responseHeaders)
item.setForeground(QColor("#EC4E51"))
result_item.setForeground(QColor(255, 0, 0))
其他信息都是在http提取时填入数据库,直接获取使用即可,这边不再赘述
3.2 抓包内容的展示
1.抓包展示这边参考wireshark的红蓝格式
requestHreaders = data_info.loc[idx, 'originalRequestHreaders']
headers_list = requestHreaders.splitlines()
for line in headers_list:
escaped_line = html.escape(line) # 需要转换格式,如果报文中存在<>会导致http内容展示缺失
html_content += f"<p><span style='color:{red_font_color}'>{escaped_line}</span></p>"
html_content += f"<p><pre> </pre></p>"
requestBody = data_info.loc[idx, 'requestBody']
html_content += f"<p><span style='color:{red_font_color}'>{html.escape(requestBody)}</span></p>"
responseHeaders = data_info.loc[idx, 'originalResponseHeaders']
headers_list = responseHeaders.splitlines()
for line in headers_list:
html_content += f"<p><span style='color:{blue_font_color}'>{line}</span></p>"
html_content += f"<p><pre> </pre></p>"
responseBody = data_info.loc[idx, 'responseBody']
responseBodyList = responseBody.splitlines()
for line in responseBodyList:
escaped_line = html.escape(line)
html_content += f"<p><span style='color:{blue_font_color}'>{escaped_line}</span></p>"
2.可以对抓包中存在的报错提供一些建议,或者特殊内容处理,这边不再展开。
4.提取MQTT信令
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅(Publish/Subscribe)消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计,尤其适用于物联网(IoT)场景。
4.1 mqtt注册信息解析
def parse_connect(data,fixed_header):
offset = fixed_header["header_length"]
# 解析协议名(UTF-8字符串)
protocol_name_len = int.from_bytes(data[offset:offset + 2], 'big')
offset += 2
protocol_name = data[offset:offset + protocol_name_len].decode('utf-8')
offset += protocol_name_len
# 协议版本(1字节)
protocol_level = data[offset]
offset += 1
# 连接标志(1字节)
connect_flags = data[offset]
clean_session = connect_flags & 0x02
will_flag = connect_flags & 0x04
username_flag = bool(connect_flags & 0x80)
password_flag = bool(connect_flags & 0x40)
offset += 1
# 心跳间隔(2字节)
keep_alive = int.from_bytes(data[offset:offset + 2], 'big')
offset += 2
# 客户端ID(UTF-8字符串)
client_id_len = int.from_bytes(data[offset:offset + 2], 'big')
offset += 2
client_id = data[offset:offset + client_id_len].decode('utf-8')
offset += client_id_len
username = parse_utf8_str(data, offset) if username_flag else None
username_len = int.from_bytes(data[offset:offset + 2], 'big')
offset += username_len
offset += 2
return {
"protocol_name": protocol_name,
"protocol_level": protocol_level,
"clean_session": clean_session,
"keep_alive": keep_alive,
"client_id": client_id,
"will_flag": will_flag,
"username": username,
"password": parse_utf8_str(data, offset) if password_flag else None
}
def parse_utf8_str(data, offset):
str_len = int.from_bytes(data[offset:offset + 2], 'big')
offset += 2
return data[offset:offset + str_len].decode('utf-8')
4.2 推送消息解析
def parse_publish(data,fixed_header):
offset = fixed_header["header_length"]
# 主题名(UTF-8字符串)
topic_len = int.from_bytes(data[offset:offset + 2], 'big')
offset += 2
topic = data[offset:offset + topic_len].decode('utf-8')
offset += topic_len
# 消息内容(offset+2去掉前面无用的两个字节)
payload = data[offset+2:offset + fixed_header["remaining_length"] - (offset - fixed_header["header_length"])].decode('utf-8')
seq_match = re.findall(r'Seq.*?(\w+)', payload)
seq = seq_match[0] if seq_match else 'PSH-ACK'
return {
"topic": topic,
"seq": seq,
"payload": payload
}
4.3 从抓包中解析mqtt
mqtt和tcp解析方式不一样,所以和提取http方式要单独开来。结合上面两个方法,将注册和消息推送单独解析。
注意:消息推送中单独解析了seq字段放到flgs是为了后面整理用上
# 处理非http数据,按照mqtt方式处理
try:
headers = 'UnsupportedData'
body = 'UnsupportedData'
fixed_header = parse_fixed_header(raw_info)
if fixed_header["packet_type"] == 3: # PUBLISH类型为3
protocol = 'MQTT'
publish_info = parse_publish(raw_info, fixed_header)
headers = publish_info['topic']
body = str(publish_info['payload'])
flags = publish_info['seq']
elif fixed_header["packet_type"] == 1: # CONNECT类型为1
protocol = 'MQTT'
connect_info = parse_connect(raw_info, fixed_header)
headers = 'ConnectCommand'
body = str(connect_info)
elif fixed_header["packet_type"] == 2: # Connect Ack类型为2
protocol = 'MQTT'
headers = 'ConnectAck'
body = ''
elif fixed_header["packet_type"] == 8: # Subscribe类型为8
protocol = 'MQTT'
headers = 'Subscribe'
body = ''
elif fixed_header["packet_type"] == 9: # Subscribe Ack类型为9
protocol = 'MQTT'
headers = 'SubscribeAck'
body = ''
elif fixed_header["packet_type"] == 12: # ping request类型为12
protocol = 'MQTT'
headers = 'ping'
body = ''
elif fixed_header["packet_type"] == 13: # ping response类型为13
protocol = 'MQTT'
headers = 'pong'
body = ''
elif fixed_header["packet_type"] == 14: # Disconnect类型为13
protocol = 'MQTT'
headers = 'Disconnect'
body = ''
else:
report_logger.debug(f"Unknown packet type: {id, fixed_header['packet_type']}")
data = {
'id': id,
'uniqueKey': uniqueKey,
'packetTime': packet_time,
'protocol': protocol,
'srcIp': src_ip,
'srcPort': src_port,
'dstIp': dst_ip,
'dstPort': dst_port,
'flags': flags,
'headers': headers,
'body': body,
'raw_data': raw_info,
'filePath': pcap_file_path,
'extractionTime': '0',
'seq': seq,
'ack': ack
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
df_list.append(df)
except Exception as e:
data = {
'id': id,
'uniqueKey': uniqueKey,
'packetTime': packet_time,
'protocol': protocol,
'srcIp': src_ip,
'srcPort': src_port,
'dstIp': dst_ip,
'dstPort': dst_port,
'flags': flags,
'headers': 'UnsupportedData',
'body': 'UnsupportedData',
'raw_data': raw_info,
'filePath': pcap_file_path,
'extractionTime': '0',
'seq': seq,
'ack': ack
}
# 创建 DataFrame 并添加到列表中
df = pd.DataFrame([data])
df_list.append(df)
4.4整理mqtt消息
抓包整理后,需要提取成可以展示的内容,这里关于报表展示参考第三章节内容即可。整理需要注意一下几点:
(1)注册-注册响应、订阅-订阅响应、心跳交互都是成双成对
(2)消息推送中请求和响应的seq是一致的,同时注意可能存在重复
def process_mqttExtract(self, pcapInfo):
logger.debug(f"process_mqttExtract")
try:
processed_ids = set()
httpResult_list = []
devices = {}
for idx in pcapInfo.index:
if idx in processed_ids: # 跳过已处理的记录
continue
keywords = ''
report_logger.debug(f"正在处理包('{pcapInfo.loc[idx, 'id']}')")
client_info = pcapInfo.loc[idx, 'srcIp'] + ':' + str(pcapInfo.loc[idx, 'srcPort'])
server_info = pcapInfo.loc[idx, 'dstIp'] + ':' + str(pcapInfo.loc[idx, 'dstPort'])
uniqueKey = pcapInfo.loc[idx, 'uniqueKey']
id = pcapInfo.loc[idx, 'id']
request_time = pcapInfo.loc[idx, 'packetTime']
responseTime = 0.000
interFaces = {
"ConnectCommand": "设备注册",
"ConnectAck": "注册响应",
"Subscribe": "设备订阅",
"SubscribeAck": "订阅响应",
"ping": "心跳包",
"pong": "心跳响应",
"Disconnect": "设备注销"
}
# 注册、心跳流程包
if pcapInfo.loc[idx, 'flags'] == 'PSH-ACK':
request_headers = pcapInfo.loc[idx, 'headers']
request_body = pcapInfo.loc[idx, 'body']
interfaceName = interFaces.get(request_headers, '未定义')
if request_headers == 'ConnectCommand':
data_dict = json.loads(request_body.replace("'", '"'))
if uniqueKey in devices:
pass
else:
devices[uniqueKey] = data_dict['username']
responsePacket = pcapInfo.loc[
(pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (
pcapInfo['headers'] == 'ConnectAck')]
if not responsePacket.empty:
target_row = responsePacket.iloc[0]
response_headers = '200'
origin_response_headers = target_row['headers']
response_time = target_row['packetTime']
response_body = f"响应时间:{response_time}"
if request_time != 0.0 and response_time != 0.0:
report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
response_time_float = datetime.strptime(response_time,
'%Y-%m-%d %H:%M:%S.%f').timestamp()
responseTime = response_time_float - request_time_float
responseTime = round(responseTime, 3)
# 删除处理过的记录
report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
processed_ids.add(target_row.name)
else:
report_logger.info(f"未找到注册响应")
response_headers = ""
response_body = ""
origin_response_headers = ""
elif request_headers == 'Subscribe':
responsePacket = pcapInfo.loc[
(pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (
pcapInfo['headers'] == 'SubscribeAck')]
if not responsePacket.empty:
target_row = responsePacket.iloc[0]
response_headers = '200'
origin_response_headers = target_row['headers']
response_time = target_row['packetTime']
response_body = f"订阅响应时间:{response_time}"
if request_time != 0.0 and response_time != 0.0:
report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
response_time_float = datetime.strptime(response_time,
'%Y-%m-%d %H:%M:%S.%f').timestamp()
responseTime = response_time_float - request_time_float
responseTime = round(responseTime, 3)
# 删除处理过的记录
report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
processed_ids.add(target_row.name)
else:
report_logger.info(f"未找到注册响应")
response_headers = ""
response_body = ""
origin_response_headers = ""
elif request_headers == 'ping':
responsePacket = pcapInfo.loc[
(pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (pcapInfo['headers'] == 'pong')]
if not responsePacket.empty:
target_row = responsePacket.iloc[0]
response_headers = '200'
origin_response_headers = target_row['headers']
response_time = target_row['packetTime']
response_body = f"心跳响应时间:{response_time}"
if request_time != 0.0 and response_time != 0.0:
report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
response_time_float = datetime.strptime(response_time,
'%Y-%m-%d %H:%M:%S.%f').timestamp()
responseTime = response_time_float - request_time_float
responseTime = round(responseTime, 3)
# 删除处理过的记录
report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
processed_ids.add(target_row.name)
else:
report_logger.info(f"未找到注册响应")
response_headers = ""
response_body = ""
origin_response_headers = ""
else:
response_headers = "200"
response_body = "设备发起断开无需响应"
origin_response_headers = "200"
else:
# 接口调用情况下
request_headers = pcapInfo.loc[idx, 'headers']
deviceIdMatch = re.findall("/iot/(\w+)", request_headers)
deviceId = deviceIdMatch[0] if deviceIdMatch else ''
if uniqueKey in devices:
pass
else:
devices[uniqueKey] = deviceId
request_body = pcapInfo.loc[idx, 'body']
seq = pcapInfo.loc[idx, 'flags']
interfaceName = 'MQTT主题接口'
# 找到大于本次idx,最小的idx,并且flags相等的数据
report_logger.info(f"本次seq为:'{seq}'")
responsePacket = pcapInfo.loc[
(pcapInfo['uniqueKey'] == uniqueKey) & (pcapInfo['id'] > id) & (pcapInfo['flags'] == seq)]
if not responsePacket.empty:
target_row = responsePacket.iloc[0]
response_headers = '200'
origin_response_headers = target_row['headers']
response_body = target_row['body']
response_time = target_row['packetTime']
if request_time != 0.0 and response_time != 0.0:
report_logger.debug(f"{idx},request_time:{request_time},response_time:{response_time}")
request_time_float = datetime.strptime(request_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
response_time_float = datetime.strptime(response_time, '%Y-%m-%d %H:%M:%S.%f').timestamp()
responseTime = response_time_float - request_time_float
responseTime = round(responseTime, 3)
# 删除处理过的记录
report_logger.info(f"删除处理过的记录:'{target_row['id']}'")
pcapInfo = pcapInfo.loc[pcapInfo['id'] != int(target_row['id'])]
processed_ids.add(target_row.name)
else:
report_logger.info(f"未找到响应包:'{seq}'")
response_headers = ""
response_body = ""
origin_response_headers = ""
data = {
'id': int(id),
'uniqueKey': uniqueKey,
'responseTime': responseTime,
'beginTime': request_time,
'clientInfo': client_info,
'serverInfo': server_info,
'requestHreaders': request_headers,
'requestBody': request_body,
'responseHeaders': response_headers,
'responseBody': response_body,
'originalRequestHreaders': request_headers,
'originalResponseHeaders': origin_response_headers,
'keywords': keywords,
'component': 'mqtt',
'interfaceName': interfaceName
}
df = pd.DataFrame([data])
httpResult_list.append(df)
if httpResult_list:
all_data = pd.concat(httpResult_list, ignore_index=True)
report_logger.info(f"devices:'{devices}'")
all_data['keywords'] = all_data['uniqueKey'].map(devices).fillna(all_data['keywords'])
all_data.to_sql('http_result', self.conn, if_exists='append', index=False)
return True
else:
return False
except Exception as e:
logger.error(f'异常:{str(e)}')
logger.error(traceback.format_exc())
return False
更多推荐
所有评论(0)