本人是爬虫初学者,尝试用副协程启动一个协程,另一个副线程启动一个多线程任务,中间用到了消费者生产者模式。但是在实际中代码跑起来,运行会中止不动,原因怎么也差不多出来。如果把代码简化成最简单的版本又可以跑成功。代码展示如下,望有高人能解惑
以下网上的简单demo,可以跑通:
import asyncio
#线程
def thread_running_loop(lp:asyncio.AbstractEventLoop):
print('loop running')
asyncio.set_event_loop(lp) #在此线程中设置一个新的事件循环,默认情况下事件循环是主线程的
lp.run_forever() #一直运行
#随意的一个协程
async def xxx(arg):
print('ready to work arg:' , arg)
await asyncio.sleep(1)
print('done ', arg)
#创建一个新的事件循环给子线程
newlp = asyncio.new_event_loop()
t = threading.Thread(target=thread_running_loop,args=(newlp,))
t.start()
#添加5个协程. 并指定事件循环 (第2个参数)
for i in range(5):
asyncio.run_coroutine_threadsafe(xxx(i),newlp)
以下是我实际任务中的代码:
import asyncio
import csv
import time
import aiofiles
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import requests
import queue
from threading import Thread
from pay_agent_getip import IpGet
import random
def thread_runing_loop(loop:asyncio.AbstractEventLoop):
print('循环事件启动')
asyncio.set_event_loop(loop)
loop.run_forever
def download_onePage(url,d,queue_dic_csv:queue.Queue,i,ip_lst):
resp_json = ''
while 'prodName' not in str(resp_json):
try:
if p in ip_lst:
ip_lst = IpPoolFlush(ip_lst,p)
p = random.choice(ip_lst)
print(p)
with requests.post(url, data=d, proxies=p) as resp:
resp_json = resp.json()
print(resp_json)
lst = resp_json['list']
for kind in lst:
data_row = [kind['prodName'], kind['lowPrice'], kind['highPrice'], kind['avgPrice']]
queue_dic_csv.put(data_row)
print(f'完成第{i}页数据的爬取,队列中现有{queue_dic_csv.qsize()}条数据....')
break
else:
p = random.choice(ip_lst)
print(p)
with requests.post(url, data=d, proxies=p) as resp:
resp_json = resp.json()
print(resp_json)
lst = resp_json['list']
for kind in lst:
data_row = [kind['prodName'], kind['lowPrice'], kind['highPrice'], kind['avgPrice']]
queue_dic_csv.put(data_row)
print(f'完成第{i}页数据的爬取,队列中现有{queue_dic_csv.qsize()}条数据...')
break
except:
pass
def MainMutiThreading(url,queue_dic_csv,ip_lst):
with ThreadPoolExecutor(3) as t:
# 线程的多少计算公式:cpu核心数*(1/cpu利用率)=cpu核心数*(1/(cpu耗时/cpu耗时+io耗时))=cpu核心数*(1+io耗时/cpu耗时)
# 然后通过目标服务器qps最大上限等比例调整,如果超限,可能会导致服务器策略性拒绝请求而使得爬虫卡住
for i in range(1, 100):
d = {
'limit': '',
'current': f"{i}",
'pubDateStartTime': "",
'pubDateEndTime': "",
'prodPcatid': "",
'prodCatid': "",
'prodName': "",
"Use-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36",
'referer': 'http://www.xinfadi.com.cn/priceDetail.html'
}
t.submit(download_onePage, url, d, queue_dic_csv,i,ip_lst)
def IpPoolCreat():
ip_lst =[IpGet() for i in range(2)]
return ip_lst
def IpPoolFlush(ip_lst,p):
ip_lst.remove(p)
ip_lst.append(IpGet())
print(f'代理池刷新{ip_lst}')
return ip_lst
async def Consumer(queue_dic_csv:queue.Queue):
while True :
print(f'队列已产生数据:{queue_dic_csv.qsize()}个,开始下载到本地...')
async with aiofiles.open('data_mutithread_async_queue.csv',mode='a') as f:
writer = csv.writer(f)
await writer.writerow(queue_dic_csv.get())
print(f'写入一条数据,队列中还剩{queue_dic_csv.qsize()}条数据...')
# def AsyCreatorTasks(queue_dic_csv):
# task = [Consumer(queue_dic_csv)]
# asyncio.run_coroutine_threadsafe(asyncio.wait(task))
# task = [asyncio.create_task(Consumer(queue_dic_csv))]
# await asyncio.wait(task) # !!!!挂起前面也要挂起
# asyncio.run(task)
def Main():
url = 'http://www.xinfadi.com.cn/getPriceData.html'
queue_dic_csv = queue.Queue()
ip_lst = IpPoolCreat()
print(f'代理池已建成:{ip_lst}')
newloop = asyncio.new_event_loop() # 这个括号不要忘记了
t2 = Thread(target=thread_runing_loop,args=(newloop,))
t2.start()
asyncio.run_coroutine_threadsafe(Consumer(queue_dic_csv),newloop)
t1 = Thread(target=MainMutiThreading, args=(url, queue_dic_csv, ip_lst,))
t1.start()
if __name__ =='__main__':
time1 = time.time()
Main()
time2 = time.time()
spend_time = time2 - time1
print(f'全部数据爬取完成!总共用时间{spend_time}秒...')
请各位不要纠结于协程本地处理数据的效率问题,我知道协程处理cpu密集型任务没什么含义,这个练习代码的目的主要还是像搞清楚各模块相互搭配使用的根本原理。所以用到了协程、多线程、线程池以及队列,相互交叉。当然这也造成了很多的耦合问题,但是就是想把这些产生的耦合问题梳理清楚 。
你 要说清楚,是哪里中止不动了?
你想多个线程访问同一个网站,会被阻止的。
你需要另写一个控制台程序,可以接收参数,然后你的程序调用这个程序;
或者你可以使用多进程任务模式,引用multiprocessing,而不是threading