C# 多线程怎么分配任务?

网页爬虫,用户输入一批网址加入未爬取队列,多线程进行爬取。提取网页里的链接又加入到未爬取队列。循环往复。那我的运行逻辑是使用并发集合ConcurrentQueue,开启线程循环取任务,访问网址提取源码里的A标签链接再加入到未爬取队列。 以下是我的代码框架,我想知道有没有更好的方法。

class Spider
    {
        //未爬取队列
        private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>() ;

        /// <summary>
        /// 设置初始化未爬取的链接
        /// </summary>
        public void SetLink(string[] m_link)
        {
            foreach (var item in m_link)
            {
                _queue.Enqueue(item);
            }
        }

        /// <summary>
        /// 启动爬行
        /// </summary>
        public void Start()
        {
            //启动50个线程循环取任务
            for (int i = 0; i < 50; i++)
            {
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        string link;

                        if (_queue.TryDequeue(out link))
                        {
                            GetLink(link);
                        }

                        Thread.Sleep(1);
                    }

                },TaskCreationOptions.LongRunning);
            }
        }

        //访问链接进行提取
        private void GetLink(string link)
        {
            string html = httpRequest(link);//获取源码后进行提取
            

            //...然后把提取到链接又加回到 _queue里
        }
    }

这里可以通过多线程并发来处理队列,且可以控制处理线程的数量,以下是代码实例(缺少重复Uri判断,请自行通过缓存已处理的Uri判断处理)


    /// <summary>
    /// 请求队列
    /// </summary>
    public class RequestQueue
    {
        #region Constructors
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="minThreadCount">最小线程数量</param>
        /// <param name="maxThreadCount">最大线程数量</param>
        /// <param name="maxRequestCount">最大请求数量</param>
        public RequestQueue(int minThreadCount, int maxThreadCount, int maxRequestCount)
        {
            m_queue = new Queue<Uri>();
            m_minThreadCount = minThreadCount;
            m_maxThreadCount = maxThreadCount;
            m_maxRequestCount = maxRequestCount;
            m_totalThreadCount = 0;
            m_activeThreadCount = 0;
            m_stopped = false;
        }
        #endregion

        #region IDisposable Members
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                lock (m_lock)
                {
                    m_stopped = true;

                    Monitor.PulseAll(m_lock);

                    m_queue.Clear();
                }
            }
        }
        #endregion

        #region Public Members
        /// <summary>
        /// 将处理请求加入计划队列
        /// </summary>
        /// <param name="request">请求处理的Uri</param>
        public void ScheduleIncomingRequest(Uri request)
        {
            bool tooManyRequests = false;

            // 加入队列
            lock (m_lock)
            {
                // 检查请求数量
                if (m_stopped || m_queue.Count >= m_maxRequestCount)
                {
                    tooManyRequests = true;
                }
                else
                {
                    m_queue.Enqueue(request);

                    // 如果有空闲线程,则唤醒空闲线程去处理
                    if (m_activeThreadCount < m_totalThreadCount)
                    {
                        Monitor.Pulse(m_lock);
                    }
                    // 如果没有空闲线程,且线程总数没有超过最大线程数量,则开启一个新线程去处理
                    else if (m_totalThreadCount < m_maxThreadCount)
                    {
                        Thread thread = new Thread(OnProcessRequestQueue);
                        thread.IsBackground = true;
                        thread.Start(null);
                        m_totalThreadCount++;
                        m_activeThreadCount++;  // 增加一个活动线程

                        LogHelper.Info("开启新线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
                    }
                }
            }

            if (tooManyRequests)
            {
                throw new Exception("请求数量过多,请稍后再提交请求");
            }
        }
        #endregion

        #region Private Methods
        /// <summary>
        /// 处理请求队列
        /// </summary>
        private void OnProcessRequestQueue()
        {
            lock (m_lock)
            {
                while (true)
                {
                    // 检查队列是否为空
                    while (m_queue.Count == 0)
                    {
                        m_activeThreadCount--;

                        // 等待请求, 如果超时没有获取到新的请求,且线程数量大于最小线程数量,就释放该线程
                        if (m_stopped || (!Monitor.Wait(m_lock, 30000) && m_totalThreadCount > m_minThreadCount))
                        {
                            m_totalThreadCount--;

                            LogHelper.Info("终止线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);

                            return; // 跳出,终止线程
                        }

                        m_activeThreadCount++;
                    }

                    Uri request = m_queue.Dequeue();

                    Monitor.Exit(m_lock);

                    try
                    {
                        // 这里调用爬虫方法,对Uri进行处理
                        // GetLink(Uri)
                    }
                    catch (Exception ex)
                    {
                        LogHelper.Error(ex, "处理Uri发生未知错误");
                    }
                    finally
                    {
                        Monitor.Enter(m_lock);
                    }
                }
            }
        }
        #endregion

        #region Private Fields
        private object m_lock = new object();
        private Queue<Uri> m_queue;
        private int m_totalThreadCount;
        private int m_activeThreadCount;
        private int m_maxThreadCount;
        private int m_minThreadCount;
        private int m_maxRequestCount;
        private bool m_stopped;
        #endregion
    }

建议使用BurpSuit进行爬网,不仅能爬取所有的链接,还能检测漏洞

递归开启子线程,爬到没有链接继续时就停止。
用task1.continuewith(task2)比较方便,爬出来方便链接分组

东西是这么个东西,你写的没有啥问题。不过在数据结构选择上。 BlockingCollection 代码更直观点

当然这玩意目前是个粗胚,还有很多东西需要处理
1.无论是这个队列,还是 BlockingCollection,你那50个task里需要处理try,不然出现异常后task自己就结束了
2 你目前缺少一些已采集过的判定,这个理论上用redis更好,量不大的情况用内存级的布隆过滤器也行,量大的情况请考虑bloom过滤器+redis去过滤掉已经采集过的地址,避免重复加入相同地址的任务
3.小心处理内存,并行开发的情况。应该使用限流手段。比如你这个50并行控制就不错

你可以使用生产消费模型来处理,一个生产网址,一个爬取网址。使用task来处理,不用考虑并发处理。