平时需要通过一个shell脚本和硬件进行通讯.
这里涉及到交互(一问一答).根据返回信息来判断,是否要进行账号密码验证.
用subprocess模块写了一个版本,但是获取返回结果之后就无法继续发指令了.
script_path = '脚本路径'
obj = subprocess.Popen([script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
obj.stdin.flush()
# 预先要输入的一系列指令
command_list = []
for _command in command_list:
command = f"{_command}\n".encode("utf-8")
obj.stdin.write(command)
obj.stdin.flush()
time.sleep(0.1)
obj.stdout.flush()
# 获取返回结果
out, err = obj.communicate()
print(out.decode('utf-8'))
目前这个版本,可以一口气执行多个指令.
尝试过,每次发一个指令就用communicate()获取一次结果,就执行不下去了.帮忙看看怎么改进代码?
您写的代码中,使用了obj.communicate()
函数来获取返回结果,但是该函数会导致进程阻塞,无法继续执行后续的指令发送。为了实现一问一答的指令交互,您可以使用obj.stdout.readline()
来读取子进程的输出结果,并使用obj.stdin.write()
来发送指令。下面是您的代码的改进版本,实现了一问一答的交互。
import subprocess
import time
script_path = '脚本路径'
obj = subprocess.Popen([script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
obj.stdin.flush()
# 预先要输入的一系列指令
command_list = []
for _command in command_list:
command = f"{_command}\n".encode("utf-8")
obj.stdin.write(command)
obj.stdin.flush()
response = obj.stdout.readline().decode('utf-8') # 读取子进程的输出结果
print(response)
time.sleep(0.1)
obj.stdout.flush()
# 获取返回结果
out, err = obj.communicate()
print(out.decode('utf-8'))
通过使用obj.stdout.readline()
来读取子进程的输出结果,您可以在发送指令后立即获取相应的返回结果,实现了一问一答的交互。
希望这个解决方案对您有帮助!如果您还有其他问题,请随时提问。