import tkinter as tk
from tkinter import ttk
import subprocess
import threading
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import time
from datetime import datetime
import re
import sys


# 全局变量,用于保存测试数据
test_data = {
    'time': [],
    'rate': []
}
udp_data = []

# 全局变量来跟踪进程和运行状态
iperf3_process = None
running = False

# 执行iperf的线程
def start_test():
	global iperf3_process, running
	#清空全局变量--字典的数据
	#清空历史数据
	for row_labels in table_labels:
		for label in row_labels:
			label.config(text="")
	# 获取协议类型(TCP或UDP)	
	protocol = protocol_combobox.get()
	# 启动新线程执行iperf3
	thread = threading.Thread(target=run_iperf3, args=(protocol, ))
	thread.daemon = True
	thread.start()

def stop_test():
	global iperf3_process, running
	#running = False
	if running:
		running = False
		if iperf3_process is not None:
			# 优雅地终止iperf3进程
			iperf3_process.terminate()
			# 等待进程实际退出(可选,但推荐)
			iperf3_process.wait()

# 更新图表函数
def update_graph():
	ax.clear()
	ax.plot(test_data['time'], test_data['rate'], label="speed")
	ax.set_title("speeds vs times")
	ax.set_xlabel("times (s)")
	ax.set_ylabel("speeds (Mbps)")
	canvas.draw()
	#print (test_data)

# 解析iperf3 UDP输出行
def parse_iperfudp_line(line, is_sender):
	parts = line.split()
	interval = parts[2]  # Interval(sec)
	transfer = parts[4] + parts[5]  # Transfer(GBytes)
	bitrate = parts[6] + parts[7]  # Bitrate(Mbits/sec)
	jitter = parts[8]  # Jitter(ms)
	lost_total = parts[10] + parts[11]  # Lost/Total(%)
	datagrams = parts[12]  # Datagrams(sender/receiver)
	#print (interval, transfer, bitrate, jitter, lost_total, datagrams)
	return (interval, transfer, bitrate, jitter, lost_total, datagrams)
	
# 更新UDP表格
def update_table():
	
	# udp_data 使用列表推导式去掉小括号并合并成一个list
	data = [item for tup in udp_data for item in tup]
	# 遍历table_labels,更新每个单元格的文本
	for i, label_row in enumerate(table_labels):
		for j, label in enumerate(label_row):
			index = i * 6 + j  # 计算数据索引
			label.config(text=data[index])
		
# 线程执行 Iperf3 命令的函数
def run_iperf3(protocol):
	global iperf3_process, running
	running = True
	command = command_entry.get().split()  # 获取命令
	if protocol == 'TCP':
		iperf3_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

		# 提取iperf3 命令command -t后面的测试时间值
		# 找到'-t'的索引
		t_index = command.index('-t') if '-t' in command else None
		# 检查是否有'-t'并且其后有元素
		if t_index is not None and t_index + 1 < len(command):
			# 获取'-t'之后的元素
			t_value = command[t_index + 1]
			t_time = float(t_value)
			#print(f"-t 后面的参数是: {t_time}")
		#else:
			#print("-t 参数未找到或其后没有参数")
		#print (t_time)
		# 读取iperf3的输出
		while running:
			
			for line in iperf3_process.stdout:
				if not running:
					break
				#print (line)
				if "[SUM]" in line and "bits/sec" in line:
					try:			
						#以下提取时间戳
						parts = line.split()	#把process.stdout的信息分割成字符串
						print (parts)
						split_time = parts[1].split('-')	#分割时间取后面的时间值
						time = split_time[1]
						print (time)
						time_float = float(time)  # 时间戳
						print (time_float)
						timestamp = int(time_float)  # 时间戳
						print (timestamp)
						#print (timestamp)
						
						#以下提取吞吐量的速率值
						p5 = parts[5]	#取出[[[SUM]],[2.00-3.00],[sec],[1.20],[GBytes],[[10.3],[Gbits/sec]]中的10.3
						p6 = parts[6].split('/')	#分割[[[SUM]],[2.00-3.00],[sec],[1.20],[GBytes],[[10.3],[Gbits/sec]]中的[Gbits/sec]
						p66 = p6[0]		#取出分割后的Gbits
						tp = p5 + p66	#组成 xxKbits、xxMbits、xxGbits、bits字符
						# 提取数值和单位
						num, unit = tp[:-5], tp[-5:]  # 假设单位总是占5个字符,如'Gbits', 'Mbits'等
						num = float(num)  # 将数值转换为浮点数					
						# 根据单位进行转换
						if unit == 'Gbits':
							converted_value = num * 1000  # Gbits -> Mbits
						elif unit == 'Mbits':
							converted_value = num  # Mbits -> Mbits (不变)
						elif unit == 'Kbits':
							converted_value = num / 1000  # Kbits -> Mbits
						elif unit == 'bits':
							converted_value = num / 1_000_000  # bits -> Mbits
						else:
							raise ValueError(f"Unknown unit: {unit}")
						
						#把time和速率值添加到全局字典中
						#去掉iperf输出的后2条数据的值
						if time_float <= t_time:
							test_data['time'].append(timestamp)
							test_data['rate'].append(converted_value)
							
						# 如果iperf标准输出信息的时间大于测试时间,则不往test_data字典写值
						else:
							print("Reached {timestamp} = {t_time}, stopping appending.")
							break
										
						#test_data['time'].append(timestamp)
						#test_data['rate'].append(converted_value) 
						print (test_data)
						update_graph()
						
					except Exception as e:
						print(f"Parsing error: {e}")
						
			if iperf3_process.poll() is not None:
				break	
		print("Iperf3 TCP Test Finished")
		
	elif protocol == 'UDP':
		iperf3_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
		# 读取iperf3的输出
		# 清空GUI表格上的测试数据
		test_data["time"].clear()
		test_data["rate"].clear()
		udp_data.clear() 
		while running:			
			for line in iperf3_process.stdout:
				if not running:
					break
				#print (line)
				if "sender" in line or "receiver" in line:
					try:			
						#提取出sender的数据
						if "sender" in line:
							data = parse_iperfudp_line(line, is_sender=True)
							udp_data.append(data)
							#print (udp_data)
						else:
							data = parse_iperfudp_line(line, is_sender=False)
							udp_data.append(data)							
					except Exception as e:
						print(f"Parsing error: {e}")	
			if iperf3_process.poll() is not None:
				break
			#更新UDP表格
			update_table()
			
		print("Iperf3 UDP Test Finished")


	# 完成后清理资源
	iperf3_process.stdout.close()
	iperf3_process.stderr.close()


# 创建主应用窗口
root = tk.Tk()
root.title("Iperf3 Test GUI")
root.geometry("1000x800")

# 创建协议选择下拉框 (TCP/UDP)
ttk.Label(root, text="Protocol:").grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
protocol_combobox = ttk.Combobox(root, values=["TCP", "UDP"], state="readonly")
protocol_combobox.set("TCP")  # 默认选择TCP
protocol_combobox.grid(row=0, column=1, padx=10, pady=10, sticky=tk.W)
ttk.Label(root, text="iperf命令行如,TCP:iperf3 -c 127.0.0.1 -P 4 --forceflush -t 10;UDP:iperf3 -u -c 127.0.0.1 -b 10M --forceflush -t 10。").grid(row=1, column=0, padx=2, pady=2, sticky=tk.W)

# 创建命令行框,用于显示/编辑 Iperf3 命令
command_label = tk.Label(root, text="Iperf3 client:")
command_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)

#iperf3 client 命令行输入框
command_entry = tk.Entry(root, width=50)
command_entry.insert(0, "iperf3 -c 127.0.0.1 -P 4 --forceflush -t 10")
command_entry.grid(row=2, column=1, padx=10, pady=10, sticky=tk.W)

#开始测试按钮
start_button = tk.Button(root, text="Start", command=start_test)
start_button.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)

#停止测试按钮
stop_button = tk.Button(root, text="Stop", command=stop_test)
stop_button.grid(row=3, column=1, padx=10, pady=10, sticky=tk.W)

# 创建绘制图形的区域
fig, ax = plt.subplots(figsize=(6, 4))
ax.set_title("speeds vs times")
ax.set_xlabel("times (s)")
ax.set_ylabel("speeds (Mbps)")
line, = ax.plot([], [], label="speed")

canvas = FigureCanvasTkAgg(fig, master=root)
canvas.get_tk_widget().grid(row=6, column=0, padx=10, pady=10)

#创建UDP测试结果的表格
#UDP表格标签
udp_label = tk.Label(root, text="UDP test result:")
udp_label.grid(row=10, column=0, padx=10, pady=10, sticky=tk.W)
# 表格框架
table_frame = tk.Frame(root)
table_frame.grid(row=11, column=0, columnspan=4)
# 表格头部
header = ["Interval(sec)", "Transfer", "Bitrate", "Jitter(ms)", "Lost/Total(%)", "Datagrams"]
for col, text in enumerate(header):
    label = tk.Label(table_frame, text=text, font=("Arial", 10, "bold"), relief="solid", width=15)
    label.grid(row=0, column=col)
# 表格内容区域
table_labels = []
for row in range(1, 3):  # 两行显示数据
	#定义单元格集合列表
	row_labels = []
	for col in range(6):
		# 创建单元格
		label = tk.Label(table_frame, text="", relief="solid", width=17)
		# 将单元格布局在网格中
		label.grid(row=row, column=col)
		# 把单元格添加到row_labels列表
		row_labels.append(label)
	# 把row_labels添加到table_labels列表
	table_labels.append(row_labels)

# 运行主循环
root.mainloop()

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐