先看一下背景需求和实现构想:
萌新想实现一个需求,在短时间内以尽可能快的速度发包,考虑到有GIL锁的问题,经网上查询后打算用多进程+协程的方案实现,具体思路是:开好几个进程,每个进程里开协程,协程里放置发包函数来完成。
在协程发包这里,网上通用的方法是建立事件循环,将发包函数封装成Task,然后用loop.run_until_complete来循环执行这些任务,直至所有Task完成,然而我这里没有发包数量的限制,无法事先确定要封装的发包Task有多少个,所以打算用loop.run_forever启用一个永久的事件循环,再用run_coroutine_threadsafe来向其中动态地添加发包Task。
下面是碰到的问题:
我打算先做一个简化版本来调试,即先用一个子进程来测试,但我碰到了问题:
现在我在主函数中创建一个子进程,由于run_coroutine_threadsafe这个函数需要在一个新线程中运行,所以我在子进程中再次创建子线程,在子线程中启用事件循环,在子进程的主线程中动态地添加任务。
下面是我的代码,为了方便调试,我暂时把动态添加任务的操作注释掉了。
import urllib
from urllib import parse
import urllib.parse
import json, hashlib, hmac, base64, datetime, time, threading
import asyncio
import aiohttp
from aiohttp_requests import requests
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from multiprocessing import Pool
from multiprocessing import Process
# 开启一个时间监听循环
def start_loop(loop):
print('进入循环')
asyncio.set_event_loop(loop)
print("start loop", time.time())
loop.run_forever()
#发包测试函数
async def test():
r = await requests.get('https://www.huya.com/cache.php?m=LiveList&do=getLiveListByPage&tagAll=0&page={}')
print(r.status)
#进程入口函数,每个进程单独运行
#内部会创建新线程和事件循环
def processMain():
print("进入进程")
#创建新的事件循环
loop_detect = asyncio.new_event_loop()
#创建子线程,在线程中永久启动这个新创建的事件循环
t_detect = Thread(target=start_loop, args=(loop_detect,))
#启动这个新线程
t_detect.start()
#while True:
# asyncio.run_coroutine_threadsafe(test(), loop_detect)
# time.sleep(0.001)
#等待子线程结束
t_detect.join()
print("子进程终结")
# 主程序
if __name__ == "__main__":
print('启动程序')
process = Process(target=processMain)
process.start()
process.join()
print("结束")
点运行后,结果是:
程序一直没有退出,这很奇怪,我做了两个尝试:
1.尝试着把多进程去掉,直接使用协程,也就是直接在主函数里创建子线程,子线程用run_forever启动事件循环,并在主线程里动态添加任务,那个结果是正常的。
2.我仍然使用多进程,然后把上面代码的,loop.run_forever注释掉,这个时候的结果如下:
所以我想问的是,为什么启用了多进程就会有问题,而且就算有问题,为什么会导致连进入循环这样的语句都打印不出来了?
我本来猜测是不是启用子进程后控制台输出也启用了副本,所以看不到输出,但这样的话为什么单独使用多进程,不用协程就能看到子进程的输出?
百思不得其解,有大佬能帮我解答一下么?
看逻辑也没有问题,我把你的代码拿出来执行也没有问题。
run_forever会让事件一直在跑,因为你这边没有协程在loop中,肯定会卡在print("start loop", time.time()),而子进程的主线程等待子进程的子线程结束才能结束,而子线程又在运行loop,那么子线程中loop不结束,子线程不会结束,子线程的主线程也就是子进程不会结束,那么主进程也就是main不会结束,因此需要考虑关闭loop事件,loop可以等待所有协程执行完毕关闭
import urllib
from urllib import parse
import urllib.parse
import json, hashlib, hmac, base64, datetime, time, threading
import asyncio
import aiohttp
from aiohttp_requests import requests
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from multiprocessing import Pool
from multiprocessing import Process
# 开启一个时间监听循环
def start_loop(loop):
print('进入循环')
asyncio.set_event_loop(loop)
print("start loop", time.time())
task=[asyncio.ensure_future(test()),asyncio.ensure_future(test())]#创建协程任务列表
loop.run_until_complete(asyncio.wait(task))#该事件开始跑协程任务,协程任务结束,该事件就会结束
#发包测试函数
async def test():
r = await requests.get('https://www.huya.com/cache.php?m=LiveList&do=getLiveListByPage&tagAll=0&page={}')
print(r.status)
#进程入口函数,每个进程单独运行
#内部会创建新线程和事件循环
def processMain():
# 这是子进程的主线程
print("进入进程")
#创建新的事件循环
loop_detect = asyncio.new_event_loop()
#创建子线程,在线程中永久启动这个新创建的事件循环
t_detect = Thread(target=start_loop, args=(loop_detect,))
#启动这个新线程
t_detect.start()
#while True:
# asyncio.run_coroutine_threadsafe(test(), loop_detect)
# time.sleep(0.001)
#等待子线程结束 该主线程才能结束
t_detect.join()
print("子进程终结")
# 主程序
if __name__ == "__main__":
# 这是主进程
print('启动程序')
# 创建子进程
process = Process(target=processMain)
process.start()
# 等待子进程结束,主进程才结束
process.join()
print("结束")
修改的代码和运行结果