多传感器融合(.bag)
其中filtered_images、filtered_poses.txt、lidar 记录条数都是867,实现了多传感器融合。2、Coco-LIC算法计算时间戳位姿。基于点云数据对其图像数据。基于点云数据对其位姿数据。1、解析.bag文件。
·
测试,还有不少问题,选vip就希望你们别被误导
目录
1、解析.bag文件
import os
import rosbag
import cv2
import csv
import open3d as o3d
import numpy as np
from cv_bridge import CvBridge
def save_images_from_bag(bag_file, image_topic, output_dir, timestamp_file):
"""保存图像数据为 JPG 文件并记录时间戳"""
bag = rosbag.Bag(bag_file, 'r')
bridge = CvBridge()
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamps = []
for i, (topic, msg, t) in enumerate(bag.read_messages(topics=[image_topic])):
cv_img = bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
img_filename = os.path.join(output_dir, f"frame_{i:04d}.jpg")
cv2.imwrite(img_filename, cv_img)
# 记录时间戳和文件名
timestamps.append(f"{t.to_sec()} {img_filename}\n")
# 保存时间戳到文件
with open(timestamp_file, 'w') as f:
f.writelines(timestamps)
bag.close()
print(f"Saved {i + 1} images to {output_dir}")
print(f"Timestamps saved to {timestamp_file}")
def save_imu_from_bag(bag_file, imu_topic, output_file):
"""保存IMU数据为 CSV 文件"""
bag = rosbag.Bag(bag_file, 'r')
with open(output_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['time', 'orientation_x', 'orientation_y', 'orientation_z', 'orientation_w',
'angular_velocity_x', 'angular_velocity_y', 'angular_velocity_z',
'linear_acceleration_x', 'linear_acceleration_y', 'linear_acceleration_z'])
for i, (topic, msg, t) in enumerate(bag.read_messages(topics=[imu_topic])):
writer.writerow([t.to_sec(),
msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w,
msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z,
msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z])
bag.close()
print(f"Saved IMU data to {output_file}")
def save_lidar_data_from_bag(bag_file, lidar_topic, output_dir, timestamp_file):
"""保存激光雷达数据为 PCD 文件并记录时间戳"""
bag = rosbag.Bag(bag_file, 'r')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamps = []
for i, (topic, msg, t) in enumerate(bag.read_messages(topics=[lidar_topic])):
# 自定义解析消息,基于 `livox_ros_driver/CustomMsg` 结构
points = []
for point in msg.points:
points.append([point.x, point.y, point.z])
points = np.array(points)
# 保存点云数据为 PCD 文件
point_cloud = o3d.geometry.PointCloud()
point_cloud.points = o3d.utility.Vector3dVector(points)
pcd_filename = os.path.join(output_dir, f"frame_{i:04d}.pcd")
o3d.io.write_point_cloud(pcd_filename, point_cloud)
# 记录时间戳和文件名
timestamps.append(f"{t.to_sec()} {pcd_filename}\n")
# 保存时间戳到文件
with open(timestamp_file, 'w') as f:
f.writelines(timestamps)
bag.close()
print(f"Saved {i + 1} lidar frames to {output_dir}")
print(f"Timestamps saved to {timestamp_file}")
def process_bag_file(bag_file, output_base_dir):
"""整合的处理函数,用于解析并保存图像、IMU 和激光雷达数据"""
image_topic = '/camera/image_color'
imu_topic = '/livox/imu'
lidar_topic = '/livox/lidar'
image_output_dir = os.path.join(output_base_dir, 'images')
imu_output_file = os.path.join(output_base_dir, 'imu_data.csv')
lidar_output_dir = os.path.join(output_base_dir, 'lidar')
image_timestamp_file = os.path.join(image_output_dir, 'image_timestamps.txt')
lidar_timestamp_file = os.path.join(lidar_output_dir, 'lidar_timestamps.txt')
save_images_from_bag(bag_file, image_topic, image_output_dir, image_timestamp_file)
save_imu_from_bag(bag_file, imu_topic, imu_output_file)
save_lidar_data_from_bag(bag_file, lidar_topic, lidar_output_dir, lidar_timestamp_file)
if __name__ == "__main__":
bag_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/lic/degenerate_seq_00.bag'
output_base_dir = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag'
process_bag_file(bag_file, output_base_dir)
2、Coco-LIC算法计算时间戳位姿
https://github.com/APRIL-ZJU/Coco-LIC
3、多传感器融合
基于点云数据对其位姿数据
import numpy as np
def load_lidar_timestamps(timestamp_file):
"""加载LiDAR的时间戳"""
lidar_timestamps = []
with open(timestamp_file, 'r') as f:
for line in f:
timestamp = float(line.split()[0])
lidar_timestamps.append(timestamp)
return np.array(lidar_timestamps)
def load_pose_data(pose_file):
"""加载所有的位姿数据"""
pose_data = []
with open(pose_file, 'r') as f:
for line in f:
data = line.strip().split()
timestamp = float(data[0])
pose_data.append((timestamp, line.strip()))
return pose_data
def find_closest_poses(lidar_timestamps, pose_data):
"""根据LiDAR时间戳找到最接近的位姿数据"""
pose_timestamps = np.array([p[0] for p in pose_data])
closest_poses = []
for lidar_time in lidar_timestamps:
closest_index = np.argmin(np.abs(pose_timestamps - lidar_time))
closest_poses.append(pose_data[closest_index][1])
return closest_poses
def save_filtered_poses(filtered_poses, output_file):
"""保存过滤后的位姿数据"""
with open(output_file, 'w') as f:
for pose in filtered_poses:
f.write(f"{pose}\n")
def filter_poses_by_lidar_timestamps(lidar_timestamp_file, pose_file, output_pose_file):
"""主函数,结合LiDAR时间戳过滤位姿数据"""
# 加载LiDAR时间戳和位姿数据
lidar_timestamps = load_lidar_timestamps(lidar_timestamp_file)
pose_data = load_pose_data(pose_file)
# 找到与LiDAR时间戳最接近的位姿
filtered_poses = find_closest_poses(lidar_timestamps, pose_data)
# 保存过滤后的位姿数据
save_filtered_poses(filtered_poses, output_pose_file)
print(f"Filtered poses saved to {output_pose_file}")
if __name__ == "__main__":
lidar_timestamp_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/lidar/lidar_timestamps.txt'
pose_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/degenerate_seq_00_LICO.txt' # 位姿数据的文件路径
output_pose_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/filtered_poses.txt' # 保存过滤后位姿的文件路径
filter_poses_by_lidar_timestamps(lidar_timestamp_file, pose_file, output_pose_file)
基于点云数据对其图像数据
import os
import shutil
import numpy as np
def load_timestamps(timestamp_file):
"""加载时间戳文件并返回时间戳和对应文件名的列表"""
timestamps = []
filenames = []
with open(timestamp_file, 'r') as f:
for line in f:
timestamp, filename = line.strip().split()
timestamps.append(float(timestamp))
filenames.append(filename)
return np.array(timestamps), filenames
def find_closest_images(lidar_timestamps, image_timestamps, image_filenames):
"""根据LiDAR时间戳找到最接近的图像时间戳及对应文件名"""
closest_images = []
for lidar_time in lidar_timestamps:
closest_index = np.argmin(np.abs(image_timestamps - lidar_time))
closest_images.append(image_filenames[closest_index])
return closest_images
def save_filtered_images(filtered_images, output_dir):
"""保存筛选后的图像到指定目录"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for image_file in filtered_images:
shutil.copy(image_file, output_dir)
def filter_images_by_lidar_timestamps(lidar_timestamp_file, image_timestamp_file, output_dir):
"""主函数,结合LiDAR时间戳筛选图像数据"""
# 加载LiDAR和图像的时间戳文件
lidar_timestamps, _ = load_timestamps(lidar_timestamp_file)
image_timestamps, image_filenames = load_timestamps(image_timestamp_file)
# 找到与LiDAR时间戳最接近的图像
filtered_images = find_closest_images(lidar_timestamps, image_timestamps, image_filenames)
# 保存筛选后的图像
save_filtered_images(filtered_images, output_dir)
print(f"Filtered images saved to {output_dir}")
if __name__ == "__main__":
lidar_timestamp_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/lidar/lidar_timestamps.txt'
image_timestamp_file = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/images/image_timestamps.txt'
output_dir = '/media/kj/2B9747BF3C0EC4D0/SLAMData/parse_lic_bag/filtered_images'
filter_images_by_lidar_timestamps(lidar_timestamp_file, image_timestamp_file, output_dir)
其中filtered_images、filtered_poses.txt、lidar 记录条数都是867,实现了多传感器融合
更多推荐
所有评论(0)