关于#python#的问题:模拟串口通讯时,使用自定义的帧协议,并在PC电脑上实现数据的打包发出与解析接收,但代码在接收数据的时候跑不通,加了try后显示一直都是等待数据超时

模拟串口通讯时,使用自定义的帧协议,并在PC电脑上实现数据的打包发出与解析接收,但代码在接收数据的时候跑不通,直接卡死了,加了try后显示一直都是等待数据超时。各位请看看是什么情况

import time
import queue
from threading import Thread

# 定义帧协议格式
FRAME_LENGTH = 15
FRAME_HEADER = bytes.fromhex('AA55')
FRAME_RESERVE = bytes.fromhex('00000000000000')

def pack_data(raw_data):
    # 将原始数据转换成符合帧协议的数据
    # 在这个例子中,假设原始数据为 3 个整数
    data = list(raw_data)
    cs = sum(data) % 256
    frame = FRAME_HEADER + bytes([0x01] + data) + FRAME_RESERVE + bytes([cs])
    return frame

def unpack_data(frame):
    # 解析帧协议格式的数据,返回原始数据
    if len(frame) != FRAME_LENGTH or frame[:2] != FRAME_HEADER:
        return None
    data = tuple(frame[3:6])
    return data

class Serial:
    def __init__(self, port, baudrate, timeout):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.buffer = b''
    
    def write(self, data):
        # 模拟串口发送数据
        print('[发送数据]', data.hex())
        time.sleep(1)  # 模拟发送数据需要的时间
    
    def read(self, size):
        # 模拟串口接收数据
        while len(self.buffer) < size:
            time.sleep(0.1)  # 模拟接收数据需要的时间
        data = self.buffer[:size]
        self.buffer = self.buffer[size:]
        print('[接收数据]', data.hex())
        return data
    
    def receive(self, data):
        # 模拟串口接收数据
        self.buffer += data

def receive_data(ser, queue):
    while True:
        raw_frame = ser.read(FRAME_LENGTH)
        if raw_frame:
            # 解析接收到的数据,并输出原始数据
            data = unpack_data(raw_frame)
            if data:
                queue.put(data)

# 创建串口对象
ser = Serial(port='/dev/ttyUSB0', baudrate=9600, timeout=1)
# 创建队列对象
data_queue = queue.Queue()

# 启动线程,用于接收数据
thread = Thread(target=receive_data, args=(ser, data_queue))
thread.daemon = True
thread.start()

while True:
    # 读取原始数据
    raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
    try:
        # 将原始数据转换成帧协议格式的数据,并发送
        frame = pack_data(map(int, raw_data.split()))
        ser.write(frame)
        # 等待接收数据
        try:
            data = data_queue.get(block=True, timeout=10)
            print('已接收数据:', data)
        except queue.Empty:
            print('等待数据超时。')
    except ValueError:
        print('输入的数据不是 3 个整数,请重新输入。')
    except Exception as e:
        print('串口通信异常:', e)
    print('------------------------')

img

这个程序有两个线程。主线程接收键盘输入,然后用print模拟串口发数据。receive_data线程,调用Serial对象的read接口,模拟串口收数据。现在缺少一个线程调用Serial对象的receive接口,给串口模拟对象喂数据。所以receive_data始终收不到数据。因为Queue.get会阻塞等待,所以一直超时。

解决方式:建议增加一个线程,给Serial对象喂数据。

  • 你可以参考下这个问题的回答, 看看是否对你有帮助, 链接: https://ask.csdn.net/questions/7611690
  • 你也可以参考下这篇文章:python 小编程练习——统计字符串中每个单词的词频并返回词频最大的单词,若词频相等,则返回按首字母排序后的第一个单词
  • 除此之外, 这篇博客: python如何将两个索引相同的列表的数据合并成一个新列表及绘图注意事项中的 在进行测试的过程中,我们的样品数量有很多,但是不同的样品测试完就形成了一个独立的文件,不同样品会形成不同的文件,因此,有时在进行数据处理之前,需要将这些数据列表进行合并统一分析,本文上传我的方法: 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 本文的内容一共分为文件打开,数据合并,数据保存三部分

  • 您还可以看一下 莫明作老师的树莓派基础学习&树莓派项目实战演示&Python语言课程中的 树莓派安装系统后自动连网(不需网线 不需路由器)小节, 巩固相关知识点
  • 以下回答由chatgpt基于相关博客总结生成:

    根据问题描述,你遇到了串口通讯接收数据超时的问题。可能原因有以下几种情况:

    1. 数据未发送成功:首先,检查是否成功发送数据到串口。确认你的代码正确配置了串口通讯参数,并且成功发送了数据。

    2. 接收超时时间设置过短:检查你设置的接收超时时间是否过短,导致在指定时间内没有接收到数据就触发了超时错误。可以尝试增加超时时间并重新运行程序。

    3. 数据传输速率不匹配:串口通讯可能存在数据传输速率不匹配的问题。确认你的代码和硬件之间的数据传输速率是否一致,包括波特率、数据位、停止位、奇偶校验位等。

    4. 代码逻辑错误:代码实现中可能存在逻辑错误,导致无法正常接收数据。可以检查接收数据的部分代码,确保正确解析和处理接收到的数据。

    解决方法:

    1. 确认数据发送成功:在接收数据前,先确认你的代码成功发送了数据到串口。可以添加打印语句或使用串口调试工具来检查数据是否成功发送。

    2. 调整接收超时时间:根据具体情况,适当调整接收超时时间,设置一个合理的值,确保在指定时间内能够接收到数据。

    3. 校验数据传输速率:确认代码和硬件之间的数据传输速率是否一致。比较串口的配置和代码中的配置是否匹配,包括波特率、数据位、停止位、奇偶校验位等。

    4. 检查代码逻辑:仔细检查接收数据的代码逻辑,确保正确解析和处理接收到的数据。可以使用调试工具进行单步调试,观察代码执行的每一步骤,排查可能存在的问题。

    如果以上解决方法都未能解决问题,可能需要更多的上下文信息和代码实现来进一步分析和定位问题。

Python 串口数据打包与解析
可以参考下

Python 串口数据打包与解析_python串口数据解析_马里奥的蘑菇云的博客-CSDN博客 Python 串口数据打包与解析介绍从字符串到字节流对字节流的解析介绍通常用python写好上位机要与板子通过串口通信的时候(比如命令,参数等),对于这之间的数据格式转换是有特定要求的,比如上位机到板子之间是以字节流进行数据传输,而用户输入到上位机的数据通常是字符串;当接收数据时,又需要将字节流进行相应格式的组合解析。从字符串到字节流法1:使用bytes(arg)函数,因为参数arg需要时列表[ ]类型,所以需要将传入的数据进行转换,直接上代码,如下:解析:input():获取用户输入的参数_python串口数据解析 https://blog.csdn.net/weixin_47409662/article/details/119489425

实现串口通信数据帧打包与解析,串口通信可靠传输,屡试不爽的数据封包与状态机数据解析程序_简单|纯粹的博客-CSDN博客 串口通信数据链路层驱动编写,实现串口通信数据帧封包与状态机数据解析,实现可靠传输。_串口通信数据帧 https://blog.csdn.net/xhl9434826546/article/details/126097374

问题点:接收数据异常
分析思路: self.buffer 一直没有接收到数据.

感谢各位朋友,这个是因为代码写的逻辑问题,不是硬件,写的函数没有调用,这是修改之后的能够正常运行的代码

import serial
import time

# 定义帧协议格式
FRAME_LENGTH = 15
FRAME_HEADER = bytes.fromhex('AA55')
FRAME_RESERVE = bytes.fromhex('0000000000000000')

def pack_data(raw_data):
    # 将原始数据转换成符合帧协议的数据
    # 在这个例子中,假设原始数据为 3 个整数
    data = list(raw_data)
    cs = sum(data) % 256
    frame = FRAME_HEADER + bytes([0x01] + data) + FRAME_RESERVE + bytes([cs])
    return frame

def unpack_data(frame):
    # 解析帧协议格式的数据,返回原始数据
    if len(frame) != FRAME_LENGTH or frame[:2] != FRAME_HEADER :
        return None
    data = frame[3:6]
    checksum = sum(data) & 0xFF
    if checksum != frame[-1]:
        return None
    return tuple(data)

# 创建串口对象
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)

while True:
    # 读取原始数据
    raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
    try:
        # 将原始数据转换成帧协议格式的数据,并发送
        frame = pack_data(map(int, raw_data.split()))
        print('[发送数据]', frame.hex())#bytes
        ser.write(frame)
        # 等待接收数据
        raw_frame = ser.read(FRAME_LENGTH)
        print("接受数据",raw_frame.hex())
        if raw_frame!=b'':
            data = unpack_data(raw_frame)
            if data:
                    print('已接收数据:', data)
    except ValueError:
        print('输入的数据不是 3 个整数,请重新输入。')
    except Exception as e:
        print('串口通信异常:', e)
    print('------------------------')

    

采用chatgpt:
在你的代码中,接收数据的函数receive_data并没有被调用,导致接收的数据无法存入缓冲区buffer,造成了死锁情况。你需要在Serial类的receive方法中调用receive_data函数。

修改代码如下:

class Serial:
    def __init__(self, port, baudrate, timeout):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.buffer = b''
        self.receive_data()  # 添加调用接收数据的方法
    
    def write(self, data):
        # 模拟串口发送数据
        print('[发送数据]', data.hex())
        time.sleep(1)  # 模拟发送数据需要的时间
    
    def read(self, size):
        # 模拟串口接收数据
        while len(self.buffer) < size:
            time.sleep(0.1)  # 模拟接收数据需要的时间
        data = self.buffer[:size]
        self.buffer = self.buffer[size:]
        print('[接收数据]', data.hex())
        return data
    
    def receive(self):
        # 模拟串口接收数据
        while True:
            raw_frame = self.read(FRAME_LENGTH)
            if raw_frame:
                # 解析接收到的数据,并输出原始数据
                data = unpack_data(raw_frame)
                if data:
                    data_queue.put(data)  # 存入队列

def receive_data():
    while True:
        data = data_queue.get()  # 从队列中获取数据
        print('已接收数据:', data)

# 创建串口对象
ser = Serial(port='/dev/ttyUSB0', baudrate=9600, timeout=1)
# 创建队列对象
data_queue = queue.Queue()

# 启动线程,用于接收数据
thread = Thread(target=receive_data)
thread.daemon = True
thread.start()

while True:
    # 读取原始数据
    raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
    try:
        # 将原始数据转换成帧协议格式的数据,并发送
        frame = pack_data(map(int, raw_data.split()))
        ser.write(frame)
        # 等待接收数据
        try:
            data = data_queue.get(block=True, timeout=10)
            print('已接收数据:', data)
        except queue.Empty:
            print('等待数据超时。')
    except ValueError:
        print('输入的数据不是 3 个整数,请重新输入。')
    except Exception as e:
        print('串口通信异常:', e)
    print('------------------------')

通过这个修改,Serial类在初始化时会调用receive_data方法,不断从队列中获取数据并打印。同时,receive_data函数也需要修改,因为data_queue是全局变量,不需要作为参数传递。

此外,确保你的串口设备连接正确,并且串口的波特率、超时设置与代码中一致。

参考gpt:
根据我的搜索结果,Python串口通讯数据超时的原因可能有以下几种:
您没有设置串口的timeout参数,导致read()方法一直阻塞,等待接收所有数据。您可以在创建serial.Serial()实例时,传入一个具体的timeout值,表示读取的超时时间(以秒为单位)。例如,ser = serial.Serial(port=“COM17”, baudrate=115200, timeout=0.5)表示如果在0.5秒内没有读取到所有数据,则直接返回。
您的串口设备没有正确连接或配置,导致没有发送或接收到数据。您可以检查一下您的串口设备是否正常工作,以及是否与Python代码中的配置参数(如波特率、数据位、校验位等)一致。
您的write()方法发送的数据类型不是bytes类型,导致数据无法被识别。您需要对字符串进行encode编码,例如ser.write(data.encode(“UTF-8”))。