模拟串口通讯时,使用自定义的帧协议,并在PC电脑上实现数据的打包发出与解析接收,但代码在接收数据的时候跑不通,直接卡死了,加了try后显示一直都是等待数据超时。各位请看看是什么情况
import time
import queue
from threading import Thread
# 定义帧协议格式
FRAME_LENGTH = 15
FRAME_HEADER = bytes.fromhex('AA55')
FRAME_RESERVE = bytes.fromhex('00000000000000')
def pack_data(raw_data):
# 将原始数据转换成符合帧协议的数据
# 在这个例子中,假设原始数据为 3 个整数
data = list(raw_data)
cs = sum(data) % 256
frame = FRAME_HEADER + bytes([0x01] + data) + FRAME_RESERVE + bytes([cs])
return frame
def unpack_data(frame):
# 解析帧协议格式的数据,返回原始数据
if len(frame) != FRAME_LENGTH or frame[:2] != FRAME_HEADER:
return None
data = tuple(frame[3:6])
return data
class Serial:
def __init__(self, port, baudrate, timeout):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.buffer = b''
def write(self, data):
# 模拟串口发送数据
print('[发送数据]', data.hex())
time.sleep(1) # 模拟发送数据需要的时间
def read(self, size):
# 模拟串口接收数据
while len(self.buffer) < size:
time.sleep(0.1) # 模拟接收数据需要的时间
data = self.buffer[:size]
self.buffer = self.buffer[size:]
print('[接收数据]', data.hex())
return data
def receive(self, data):
# 模拟串口接收数据
self.buffer += data
def receive_data(ser, queue):
while True:
raw_frame = ser.read(FRAME_LENGTH)
if raw_frame:
# 解析接收到的数据,并输出原始数据
data = unpack_data(raw_frame)
if data:
queue.put(data)
# 创建串口对象
ser = Serial(port='/dev/ttyUSB0', baudrate=9600, timeout=1)
# 创建队列对象
data_queue = queue.Queue()
# 启动线程,用于接收数据
thread = Thread(target=receive_data, args=(ser, data_queue))
thread.daemon = True
thread.start()
while True:
# 读取原始数据
raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
try:
# 将原始数据转换成帧协议格式的数据,并发送
frame = pack_data(map(int, raw_data.split()))
ser.write(frame)
# 等待接收数据
try:
data = data_queue.get(block=True, timeout=10)
print('已接收数据:', data)
except queue.Empty:
print('等待数据超时。')
except ValueError:
print('输入的数据不是 3 个整数,请重新输入。')
except Exception as e:
print('串口通信异常:', e)
print('------------------------')
这个程序有两个线程。主线程接收键盘输入,然后用print模拟串口发数据。receive_data线程,调用Serial对象的read接口,模拟串口收数据。现在缺少一个线程调用Serial对象的receive接口,给串口模拟对象喂数据。所以receive_data始终收不到数据。因为Queue.get会阻塞等待,所以一直超时。
解决方式:建议增加一个线程,给Serial对象喂数据。
本文的内容一共分为文件打开,数据合并,数据保存三部分
根据问题描述,你遇到了串口通讯接收数据超时的问题。可能原因有以下几种情况:
数据未发送成功:首先,检查是否成功发送数据到串口。确认你的代码正确配置了串口通讯参数,并且成功发送了数据。
接收超时时间设置过短:检查你设置的接收超时时间是否过短,导致在指定时间内没有接收到数据就触发了超时错误。可以尝试增加超时时间并重新运行程序。
数据传输速率不匹配:串口通讯可能存在数据传输速率不匹配的问题。确认你的代码和硬件之间的数据传输速率是否一致,包括波特率、数据位、停止位、奇偶校验位等。
代码逻辑错误:代码实现中可能存在逻辑错误,导致无法正常接收数据。可以检查接收数据的部分代码,确保正确解析和处理接收到的数据。
解决方法:
确认数据发送成功:在接收数据前,先确认你的代码成功发送了数据到串口。可以添加打印语句或使用串口调试工具来检查数据是否成功发送。
调整接收超时时间:根据具体情况,适当调整接收超时时间,设置一个合理的值,确保在指定时间内能够接收到数据。
校验数据传输速率:确认代码和硬件之间的数据传输速率是否一致。比较串口的配置和代码中的配置是否匹配,包括波特率、数据位、停止位、奇偶校验位等。
检查代码逻辑:仔细检查接收数据的代码逻辑,确保正确解析和处理接收到的数据。可以使用调试工具进行单步调试,观察代码执行的每一步骤,排查可能存在的问题。
如果以上解决方法都未能解决问题,可能需要更多的上下文信息和代码实现来进一步分析和定位问题。
问题点:接收数据异常
分析思路: self.buffer 一直没有接收到数据.
感谢各位朋友,这个是因为代码写的逻辑问题,不是硬件,写的函数没有调用,这是修改之后的能够正常运行的代码
import serial
import time
# 定义帧协议格式
FRAME_LENGTH = 15
FRAME_HEADER = bytes.fromhex('AA55')
FRAME_RESERVE = bytes.fromhex('0000000000000000')
def pack_data(raw_data):
# 将原始数据转换成符合帧协议的数据
# 在这个例子中,假设原始数据为 3 个整数
data = list(raw_data)
cs = sum(data) % 256
frame = FRAME_HEADER + bytes([0x01] + data) + FRAME_RESERVE + bytes([cs])
return frame
def unpack_data(frame):
# 解析帧协议格式的数据,返回原始数据
if len(frame) != FRAME_LENGTH or frame[:2] != FRAME_HEADER :
return None
data = frame[3:6]
checksum = sum(data) & 0xFF
if checksum != frame[-1]:
return None
return tuple(data)
# 创建串口对象
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)
while True:
# 读取原始数据
raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
try:
# 将原始数据转换成帧协议格式的数据,并发送
frame = pack_data(map(int, raw_data.split()))
print('[发送数据]', frame.hex())#bytes
ser.write(frame)
# 等待接收数据
raw_frame = ser.read(FRAME_LENGTH)
print("接受数据",raw_frame.hex())
if raw_frame!=b'':
data = unpack_data(raw_frame)
if data:
print('已接收数据:', data)
except ValueError:
print('输入的数据不是 3 个整数,请重新输入。')
except Exception as e:
print('串口通信异常:', e)
print('------------------------')
采用chatgpt:
在你的代码中,接收数据的函数receive_data并没有被调用,导致接收的数据无法存入缓冲区buffer,造成了死锁情况。你需要在Serial类的receive方法中调用receive_data函数。
修改代码如下:
class Serial:
def __init__(self, port, baudrate, timeout):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.buffer = b''
self.receive_data() # 添加调用接收数据的方法
def write(self, data):
# 模拟串口发送数据
print('[发送数据]', data.hex())
time.sleep(1) # 模拟发送数据需要的时间
def read(self, size):
# 模拟串口接收数据
while len(self.buffer) < size:
time.sleep(0.1) # 模拟接收数据需要的时间
data = self.buffer[:size]
self.buffer = self.buffer[size:]
print('[接收数据]', data.hex())
return data
def receive(self):
# 模拟串口接收数据
while True:
raw_frame = self.read(FRAME_LENGTH)
if raw_frame:
# 解析接收到的数据,并输出原始数据
data = unpack_data(raw_frame)
if data:
data_queue.put(data) # 存入队列
def receive_data():
while True:
data = data_queue.get() # 从队列中获取数据
print('已接收数据:', data)
# 创建串口对象
ser = Serial(port='/dev/ttyUSB0', baudrate=9600, timeout=1)
# 创建队列对象
data_queue = queue.Queue()
# 启动线程,用于接收数据
thread = Thread(target=receive_data)
thread.daemon = True
thread.start()
while True:
# 读取原始数据
raw_data = input('请输入原始数据(3 个整数,以空格分隔):')
try:
# 将原始数据转换成帧协议格式的数据,并发送
frame = pack_data(map(int, raw_data.split()))
ser.write(frame)
# 等待接收数据
try:
data = data_queue.get(block=True, timeout=10)
print('已接收数据:', data)
except queue.Empty:
print('等待数据超时。')
except ValueError:
print('输入的数据不是 3 个整数,请重新输入。')
except Exception as e:
print('串口通信异常:', e)
print('------------------------')
通过这个修改,Serial类在初始化时会调用receive_data方法,不断从队列中获取数据并打印。同时,receive_data函数也需要修改,因为data_queue是全局变量,不需要作为参数传递。
此外,确保你的串口设备连接正确,并且串口的波特率、超时设置与代码中一致。
参考gpt:
根据我的搜索结果,Python串口通讯数据超时的原因可能有以下几种:
您没有设置串口的timeout参数,导致read()方法一直阻塞,等待接收所有数据。您可以在创建serial.Serial()实例时,传入一个具体的timeout值,表示读取的超时时间(以秒为单位)。例如,ser = serial.Serial(port=“COM17”, baudrate=115200, timeout=0.5)表示如果在0.5秒内没有读取到所有数据,则直接返回。
您的串口设备没有正确连接或配置,导致没有发送或接收到数据。您可以检查一下您的串口设备是否正常工作,以及是否与Python代码中的配置参数(如波特率、数据位、校验位等)一致。
您的write()方法发送的数据类型不是bytes类型,导致数据无法被识别。您需要对字符串进行encode编码,例如ser.write(data.encode(“UTF-8”))。