
本文旨在介绍如何使用 Python 的 subprocess 模块并发执行多个子进程,并通过线程池来显著提高程序的执行效率。我们将分析常见的使用 subprocess.Popen 和 .communicate() 方法的场景,并提供使用 ThreadPool 并发等待子进程完成的示例代码。
在使用 subprocess 模块执行多个子进程时,一个常见的误解是 Popen 函数会阻塞程序的执行。实际上,Popen 函数是非阻塞的,它会立即返回一个 Popen 对象,允许程序继续执行。然而,Popen 对象的 communicate() 方法是阻塞的,它会等待子进程执行完毕并返回其输出。如果在循环中依次调用 communicate() 方法,实际上会导致子进程按顺序执行,从而降低程序的效率。
为了解决这个问题,可以使用线程池来并发等待子进程完成。线程池可以创建多个线程,每个线程负责等待一个子进程完成。这样,多个子进程可以同时运行,从而提高程序的执行效率。
下面是一个使用线程池并发等待子进程完成的示例代码:
import subprocessimport loggingfrom multiprocessing.pool import ThreadPoollogging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')log = logging.getLogger(__name__)def runShowCommands(cmdTable) -> dict: """return a dictionary of captured output from commands defined in cmdTable.""" procOutput = {} # dict to store the output text from show commands procHandles = {} for cmd, command in cmdTable.items(): try: log.debug(f"running subprocess {cmd} -- {command}") procHandles[cmd] = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) except Exception as e: log.error(f"Error launching subprocess {cmd}: {e}") continue def handle_proc_stdout(handle): try: proc = procHandles[handle] stdout, stderr = proc.communicate(timeout=180) procOutput[handle] = stdout.decode("utf-8") # turn stdout portion into text log.debug(f"subprocess returned {handle}") if stderr: log.error(f"subprocess {handle} returned stderr: {stderr.decode('utf-8')}") except subprocess.TimeoutExpired: log.error(f"subprocess {handle} timed out") proc.kill() # Terminate the process except Exception as e: log.error(f"Error handling subprocess {handle}: {e}") threadpool = ThreadPool() threadpool.map(handle_proc_stdout, procHandles.keys()) threadpool.close() threadpool.join() return procOutputif __name__ == '__main__': cmdTable = { 'himom': "echo hi there momma", 'goodbye': "echo goodbye", 'date': "date", 'sleep': "sleep 2 && echo slept" } output = runShowCommands(cmdTable) for cmd, out in output.items(): print(f"Output from {cmd}:n{out}")
代码解释:
runShowCommands(cmdTable) 函数:
接受一个字典 cmdTable,其中键是命令的名称,值是要执行的命令字符串。创建一个空字典 procOutput 来存储每个命令的输出。创建一个空字典 procHandles 来存储每个 Popen 对象。循环遍历 cmdTable 中的每个命令:使用 subprocess.Popen 启动子进程,并将 stdout 和 stderr 重定向到管道。 shell=True 允许直接执行字符串命令,但要注意安全性。将 Popen 对象存储在 procHandles 字典中,键是命令名称。定义一个内部函数 handle_proc_stdout(handle):此函数负责处理单个子进程的输出。使用 procHandles[handle].communicate(timeout=180) 获取子进程的输出,并设置超时时间为 180 秒。将输出解码为 UTF-8 字符串,并将其存储在 procOutput 字典中。记录子进程返回的消息。处理 TimeoutExpired 异常,如果子进程超时,则记录错误并终止该进程。处理其他异常,如果发生任何其他错误,则记录错误消息。创建一个 ThreadPool 对象。使用 threadpool.map(handle_proc_stdout, procHandles.keys()) 将 handle_proc_stdout 函数应用于 procHandles 字典中的每个键(命令名称)。 这会在线程池中并行执行 handle_proc_stdout 函数。调用 threadpool.close() 以防止向线程池提交更多任务。调用 threadpool.join() 以等待所有线程完成。返回 procOutput 字典。
if __name__ == ‘__main__’: 块:
创建一个示例 cmdTable 字典。调用 runShowCommands(cmdTable) 执行命令并获取输出。循环遍历 output 字典,并打印每个命令的输出。
注意事项:
超时处理: communicate(timeout=180) 设置了超时时间,防止子进程无限期运行。如果子进程在指定时间内未完成,将引发 TimeoutExpired 异常,并且该进程将被终止。错误处理: 代码包含 try…except 块,用于捕获可能发生的异常,例如子进程启动失败或超时。线程安全: 确保在多线程环境中访问和修改共享资源(例如 procOutput 字典)是线程安全的。 在此示例中,由于每个线程都写入不同的键,因此字典的写入操作是线程安全的。资源限制: 创建过多的线程可能会消耗大量系统资源。 线程池的大小应根据系统资源和任务的性质进行调整。安全性: 使用 shell=True 执行命令时,需要注意命令注入的风险。 确保命令字符串来自可信来源,或者对输入进行适当的转义。日志记录: 使用 logging 模块记录程序的运行状态,方便调试和排错。
总结:
通过使用线程池,可以并发执行多个子进程,从而显著提高程序的执行效率。 在处理大量并发任务时,线程池是一种非常有用的技术。 请记住考虑超时处理、错误处理、线程安全性和资源限制等因素,以确保程序的正确性和稳定性。
以上就是使用线程池并发执行子进程以提高效率的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1371780.html
微信扫一扫
支付宝扫一扫