subprocess模块实现指令交互

问题遇到的现象和发生背景

平时需要通过一个shell脚本和硬件进行通讯.

遇到的现象和发生背景,请写出第一个错误信息

这里涉及到交互(一问一答).根据返回信息来判断,是否要进行账号密码验证.
用subprocess模块写了一个版本,但是获取返回结果之后就无法继续发指令了.

    script_path = '脚本路径'
    obj = subprocess.Popen([script_path],
                           stdin=subprocess.PIPE,
                           stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    obj.stdin.flush()
    
  # 预先要输入的一系列指令
    command_list = []
    for _command in command_list:
        command = f"{_command}\n".encode("utf-8")
        obj.stdin.write(command)
        obj.stdin.flush()
        time.sleep(0.1)

    obj.stdout.flush()
     # 获取返回结果
    out, err = obj.communicate()
    print(out.decode('utf-8'))
运行结果及详细报错内容

目前这个版本,可以一口气执行多个指令.
尝试过,每次发一个指令就用communicate()获取一次结果,就执行不下去了.帮忙看看怎么改进代码?

不知道你这个问题是否已经解决, 如果还没有解决的话:
  • 以下回答由chatgpt基于相关博客总结生成:

    问题解决方案

    您写的代码中,使用了obj.communicate()函数来获取返回结果,但是该函数会导致进程阻塞,无法继续执行后续的指令发送。为了实现一问一答的指令交互,您可以使用obj.stdout.readline()来读取子进程的输出结果,并使用obj.stdin.write()来发送指令。下面是您的代码的改进版本,实现了一问一答的交互。

    import subprocess
    import time
    
    script_path = '脚本路径'
    obj = subprocess.Popen([script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    obj.stdin.flush()
    
    # 预先要输入的一系列指令
    command_list = []
    for _command in command_list:
        command = f"{_command}\n".encode("utf-8")
        obj.stdin.write(command)
        obj.stdin.flush()
        response = obj.stdout.readline().decode('utf-8')  # 读取子进程的输出结果
        print(response)
        time.sleep(0.1)
    
    obj.stdout.flush()
    # 获取返回结果
    out, err = obj.communicate()
    print(out.decode('utf-8'))
    

    通过使用obj.stdout.readline()来读取子进程的输出结果,您可以在发送指令后立即获取相应的返回结果,实现了一问一答的交互。

    希望这个解决方案对您有帮助!如果您还有其他问题,请随时提问。


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^