使用 Python 协程多并发下载 任务数卡住不动

在使用 python 协程下载图片中,最终协程的任务数 卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
下图是我运行结果,附上代码。
图片说明

# -*- coding: utf-8 -*-
import requests
from lxml import etree
import time
import os
import pandas as pd
import asyncio
import aiohttp
import aiomysql
from random import randint
import cchardet
import aiofiles
import logging


class sikupicture_Spider(object):
    def __init__(self):
        # self.seens_url = []
        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.PriorityQueue()
        self._workers = 0  # 当前工作数
        self._max_workers = 150  # 最大工作数
        self.overtime = {}  # {url: times,} 记录失败的URL的次数
        self.overtime_threshold = 4
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
        }
        self.list_content = []

    async def init_url(self):
        info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
        for ite in info.itertuples():
            await self.queue.put((randint(1, 5), getattr(ite, 'url')))

    async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if status_code == 403:
                    print("url-403", url)
                    if url in self.overtime:
                        self.overtime[url] += 1
                        if self.overtime[url] > self.overtime_threshold:
                            pass
                        await self.queue.put((randint(1, 5), url))
                    else:
                        self.overtime[url] = 1
                        await self.queue.put((randint(1, 5), url))
                    status_code = 0
                    html = None
                if binary:
                    text = await resp.read()
                    encoding = cchardet.detect(text)
                    html = text.encode(encoding, errors='ignore')
                else:
                    html = await resp.text()

        except TimeoutError:
            print("url-overtime", url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if binary:
                    html = await resp.read()
                else:
                    html = await resp.text()
        except TimeoutError:
            print("url-overtime", img_url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                else:
                    await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    def parse_source(self, source):
        try:
            response_1 = etree.HTML(source)
        except Exception as err:
            logging.error(f'parse error:{err}')
            url = ""
        else:
            img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
                response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
        return img_url

    async def process(self, session, url, timeout):
        status, source = await self.fetch(session, url, timeout)
        file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
        if status == 200:
            img_url = self.parse_source(source)
            img_status, img_source = await self.download_img(session, img_url, timeout, url)
            if img_status == 200:
                async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
                    await f.write(img_source)
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status, "img_status", img_status)
        else:
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status,)

    async def loop_crawl(self):
        await self.init_url()
        timeout = aiohttp.ClientTimeout(total=20)
        conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
        session = aiohttp.ClientSession(connector=conn, timeout=timeout)
        while True:
            if self._workers >= self._max_workers:
                print("work 的判断")
                await asyncio.sleep(5)
                continue
            if self.queue.empty():
                print("队列是否为空....", self._workers)
                await asyncio.sleep(5)
                if self._workers == 0:
                    break
                continue
            _, url = await self.queue.get()
            asyncio.ensure_future(self.process(session, url, timeout))
            self._workers += 1
            print("队列剩余数量", self.queue.qsize(), self._workers)
        await session.close()

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            self.loop.close()

if __name__ == '__main__':
    sp = sikupicture_Spider()
    sp.run()

https://www.csdn.net/gather_23/MtjakgysMTA3MTAtYmxvZwO0O0OO0O0O.html