Skip to content

使用 Python 获取激光雷达数据

激光雷达:LD19 型号;Python 版本:3.8.10;ROS2 版本:humble。

需要特别注意,一定要注意串口的权限,否则无法打开串口(以/dev/ttyACM0为例)。

bash
# 查看设备权限
ls -la /dev/ttyACM0

# 给串口设备所有用户读写权限
sudo chmod 666 /dev/ttyACM0

一、上效果图

1.1 查看当前版本信息

An image

1.2 查看激光雷达连接是否 OK

An image

1.3 查看波特率

An image

An image

1.4 获取激光雷达数据

An image

二、上代码

1. 获取波特率的脚本:find_baudrate.py

python
#!/usr/bin/env python3
import serial
import time

port = '/dev/ttyACM0'
baudrates = [9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600]

print("测试不同波特率,寻找有效的0x54 0x2C包头...")

for baud in baudrates:
    print(f"\n=== 测试波特率: {baud} ===")
    try:
        ser = serial.Serial(port, baud, timeout=1)
        print(f"串口打开成功")

        # 清空缓冲区
        ser.flushInput()

        # 收集数据
        data_collected = bytes()
        start_time = time.time()

        while time.time() - start_time < 3:  # 收集3秒数据
            if ser.in_waiting > 0:
                data = ser.read(ser.in_waiting)
                data_collected += data
            time.sleep(0.01)

        ser.close()

        if len(data_collected) > 0:
            print(f"✅ 收到 {len(data_collected)} 字节数据")

            # 查找0x54 0x2C模式
            found = False
            for i in range(len(data_collected)-1):
                if data_collected[i] == 0x54 and data_collected[i+1] == 0x2C:
                    found = True
                    print(f"   在第 {i} 字节找到 0x54 0x2C 包头!")
                    # 显示包头附近的20字节
                    start = max(0, i-2)
                    end = min(len(data_collected), i+20)
                    print(f"   附近数据: {data_collected[start:end].hex()}")

            if not found:
                print(f"   ❌ 未找到 0x54 0x2C 包头")
                # 显示数据的前20字节
                print(f"   数据头20字节: {data_collected[:20].hex()}")
        else:
            print("❌ 没有收到数据")

    except Exception as e:
        print(f"❌ 错误: {e}")

2. 获取激光雷达数据的脚本:LD14-Python3.py

python
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
from __future__ import print_function
import serial
import time  # 确保这里导入了time模块

# 定义一个函数来解析数据包
def parse_data(data):
    speed = data[1] * 256 + data[0]  # 计算转速
    start_angle = (data[3] * 256 + data[2]) / 100.0  # 计算起始角度
    distances_and_intensities = []  # 初始化距离和光强列表

    # 遍历数据包中的距离和光强数据
    for x in range(4, 40, 3):
        distance = data[x+1] * 256 + data[x]  # 计算距离
        #打印某个角度范围内的距离
        # if  start_angle<100 and start_angle>80:
        #     print(distance)
        intensity = data[x + 2]  # 获取光强
        distances_and_intensities.append((distance, intensity))  # 添加到列表

    end_angle = (data[41] * 256 + data[40]) / 100.0  # 计算结束角度
    timestamp = data[43]*256+data[42]  # 注意:这里不要用time作为变量名,会和time模块冲突
    return speed, start_angle, distances_and_intensities, end_angle, timestamp  # 改为timestamp

# 定义一个函数来打印数据包的内容
def print_data(speed, start_angle, distances_and_intensities, end_angle, last_angle, timestamp):  # 这里也改为timestamp
    if last_angle - start_angle > 100:
        print("*******************************")

    # 打印转速、起始角度和数据点
    print("转速:", speed, end="\t")
    print("起始角度:", start_angle, end="\t")
    print("数据【距离(mm)|光强】*12个点:", end="\t")

    # 打印每个数据点的距离和光强
    for distance, intensity in distances_and_intensities:
        print(distance, "|", intensity, end="\t")
    print("时间戳:", timestamp, end="\t")  # 改为timestamp
    print("结束角度:", end_angle, end="\n\n")

# 主程序开始
if __name__ == '__main__':
    print("*******************************")
    last_angle = 0  # 初始化上一个角度

    # LD19波特率:230400     LD14波特率:115200
    print("正在打开串口 /dev/ttyACM0, 波特率 230400...")

    try:
        # 修改这里:添加更多参数确保连接稳定
        ser = serial.Serial(
            port="/dev/ttyACM0",
            baudrate=230400,
            timeout=5,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            xonxoff=False,
            rtscts=False,
            dsrdtr=False
        )

        print(f"✅ 串口打开成功: {ser.name}")
        print(f"✅ 串口设置: {ser.baudrate} bps")

        # 等待串口稳定
        time.sleep(0.5)

        # 清空缓冲区
        ser.flushInput()
        ser.flushOutput()

        print("开始监听雷达数据...")
        print("如果长时间没有输出,请检查:")
        print("1. 雷达是否上电(有指示灯亮)")
        print("2. 雷达是否开始旋转(如果是机械雷达)")
        print("3. 按 Ctrl+C 终止程序")
        print("-" * 50)

        # 循环读取数据
        read_count = 0
        error_count = 0
        while True:
            try:
                # 添加超时和计数
                read_count += 1
                if read_count % 1000 == 0:
                    print(f"已尝试读取 {read_count} 次...")

                # 读取第一个字节
                data = ser.read(1)

                if len(data) == 0:
                    # 每100次读不到数据时打印一次提示
                    if read_count % 100 == 0:
                        print(f"等待数据中... (尝试 {read_count} 次)")
                    continue

                # 打印接收到的原始字节(调试用,开始时打开)
                if read_count < 50:  # 只在前50次读取时显示
                    print(f"[{read_count}] 收到字节: 0x{data.hex()}")

                if data[0] == 0x54:
                    # 读取第二个字节
                    data2 = ser.read(1)

                    if len(data2) == 0:
                        continue

                    if read_count < 50:  # 只在前50次读取时显示
                        print(f"[{read_count}] 第二个字节: 0x{data2.hex()}")

                    if data2[0] == 0x2C:
                        if read_count < 50:
                            print("✅ 找到数据包头 0x54 0x2C")

                        # 读取剩余45个字节
                        data_rest = ser.read(45)

                        if len(data_rest) < 45:
                            error_count += 1
                            if error_count % 10 == 0:
                                print(f"⚠️  数据不完整,第{error_count}次,只收到 {len(data_rest)} 字节")
                            continue

                        # 解析和打印数据包
                        speed, start_angle, distances_and_intensities, end_angle, timestamp = parse_data(data_rest)
                        print_data(speed, start_angle, distances_and_intensities, end_angle, last_angle, timestamp)
                        last_angle = start_angle
                        error_count = 0  # 重置错误计数

            except KeyboardInterrupt:
                print("\n用户中断程序")
                break
            except Exception as e:
                error_count += 1
                if error_count % 10 == 0:
                    print(f"处理数据时出错({error_count}次): {e}")
                continue

    except Exception as e:
        print(f"❌ 打开串口失败: {e}")
        import traceback
        traceback.print_exc()

    # 确保串口被关闭
    try:
        if 'ser' in locals() and ser.is_open:
            ser.close()
            print("串口已关闭")
    except:
        pass