Python实现多进程+协程发包方面,新手碰到一些问题

先看一下背景需求和实现构想:

萌新想实现一个需求,在短时间内以尽可能快的速度发包,考虑到有GIL锁的问题,经网上查询后打算用多进程+协程的方案实现,具体思路是:开好几个进程,每个进程里开协程,协程里放置发包函数来完成。

在协程发包这里,网上通用的方法是建立事件循环,将发包函数封装成Task,然后用loop.run_until_complete来循环执行这些任务,直至所有Task完成,然而我这里没有发包数量的限制,无法事先确定要封装的发包Task有多少个,所以打算用loop.run_forever启用一个永久的事件循环,再用run_coroutine_threadsafe来向其中动态地添加发包Task。

 

下面是碰到的问题:

    我打算先做一个简化版本来调试,即先用一个子进程来测试,但我碰到了问题:

    现在我在主函数中创建一个子进程,由于run_coroutine_threadsafe这个函数需要在一个新线程中运行,所以我在子进程中再次创建子线程,在子线程中启用事件循环,在子进程的主线程中动态地添加任务。

    下面是我的代码,为了方便调试,我暂时把动态添加任务的操作注释掉了。

import urllib
from urllib import parse
import urllib.parse
import json, hashlib, hmac, base64, datetime, time, threading
import asyncio
import aiohttp
from aiohttp_requests import requests
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from multiprocessing import Pool
from multiprocessing import Process


# 开启一个时间监听循环
def start_loop(loop):
	print('进入循环')
	asyncio.set_event_loop(loop)
	print("start loop", time.time())
	loop.run_forever()

#发包测试函数
async def test():
    r = await requests.get('https://www.huya.com/cache.php?m=LiveList&do=getLiveListByPage&tagAll=0&page={}')
    print(r.status)

#进程入口函数,每个进程单独运行
#内部会创建新线程和事件循环
def processMain():
	print("进入进程")
    #创建新的事件循环
	loop_detect = asyncio.new_event_loop()
    #创建子线程,在线程中永久启动这个新创建的事件循环
	t_detect = Thread(target=start_loop, args=(loop_detect,))    
    #启动这个新线程
	t_detect.start()
	#while True:
	#    asyncio.run_coroutine_threadsafe(test(), loop_detect)
	#    time.sleep(0.001)

	#等待子线程结束
	t_detect.join()
	print("子进程终结")

# 主程序
if __name__ == "__main__":
	print('启动程序')
	process = Process(target=processMain)
	process.start()
	process.join()
	print("结束")

    点运行后,结果是:

程序一直没有退出,这很奇怪,我做了两个尝试:

1.尝试着把多进程去掉,直接使用协程,也就是直接在主函数里创建子线程,子线程用run_forever启动事件循环,并在主线程里动态添加任务,那个结果是正常的。

2.我仍然使用多进程,然后把上面代码的,loop.run_forever注释掉,这个时候的结果如下:

所以我想问的是,为什么启用了多进程就会有问题,而且就算有问题,为什么会导致连进入循环这样的语句都打印不出来了?

我本来猜测是不是启用子进程后控制台输出也启用了副本,所以看不到输出,但这样的话为什么单独使用多进程,不用协程就能看到子进程的输出?

百思不得其解,有大佬能帮我解答一下么?

看逻辑也没有问题,我把你的代码拿出来执行也没有问题。

run_forever会让事件一直在跑,因为你这边没有协程在loop中,肯定会卡在print("start loop", time.time()),而子进程的主线程等待子进程的子线程结束才能结束,而子线程又在运行loop,那么子线程中loop不结束,子线程不会结束,子线程的主线程也就是子进程不会结束,那么主进程也就是main不会结束,因此需要考虑关闭loop事件,loop可以等待所有协程执行完毕关闭

import urllib
from urllib import parse
import urllib.parse
import json, hashlib, hmac, base64, datetime, time, threading
import asyncio
import aiohttp
from aiohttp_requests import requests
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from multiprocessing import Pool
from multiprocessing import Process
 
# 开启一个时间监听循环
def start_loop(loop):
    print('进入循环')
    asyncio.set_event_loop(loop)
    print("start loop", time.time())
    task=[asyncio.ensure_future(test()),asyncio.ensure_future(test())]#创建协程任务列表
    loop.run_until_complete(asyncio.wait(task))#该事件开始跑协程任务,协程任务结束,该事件就会结束

#发包测试函数
async def test():
    r = await requests.get('https://www.huya.com/cache.php?m=LiveList&do=getLiveListByPage&tagAll=0&page={}')
    print(r.status)
#进程入口函数,每个进程单独运行
#内部会创建新线程和事件循环
def processMain():
    # 这是子进程的主线程
	print("进入进程")
    #创建新的事件循环
	loop_detect = asyncio.new_event_loop()
    #创建子线程,在线程中永久启动这个新创建的事件循环
	t_detect = Thread(target=start_loop, args=(loop_detect,))    
    #启动这个新线程
	t_detect.start()
	#while True:
	#    asyncio.run_coroutine_threadsafe(test(), loop_detect)
	#    time.sleep(0.001)
	#等待子线程结束 该主线程才能结束
	t_detect.join()
	print("子进程终结")
# 主程序
if __name__ == "__main__":
    # 这是主进程
	print('启动程序')
    # 创建子进程
	process = Process(target=processMain)
	process.start()
    # 等待子进程结束,主进程才结束
	process.join()
	print("结束")

修改的代码和运行结果