网页爬虫,用户输入一批网址加入未爬取队列,多线程进行爬取。提取网页里的链接又加入到未爬取队列。循环往复。那我的运行逻辑是使用并发集合ConcurrentQueue,开启线程循环取任务,访问网址提取源码里的A标签链接再加入到未爬取队列。 以下是我的代码框架,我想知道有没有更好的方法。
class Spider
{
//未爬取队列
private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>() ;
/// <summary>
/// 设置初始化未爬取的链接
/// </summary>
public void SetLink(string[] m_link)
{
foreach (var item in m_link)
{
_queue.Enqueue(item);
}
}
/// <summary>
/// 启动爬行
/// </summary>
public void Start()
{
//启动50个线程循环取任务
for (int i = 0; i < 50; i++)
{
Task.Factory.StartNew(() =>
{
while (true)
{
string link;
if (_queue.TryDequeue(out link))
{
GetLink(link);
}
Thread.Sleep(1);
}
},TaskCreationOptions.LongRunning);
}
}
//访问链接进行提取
private void GetLink(string link)
{
string html = httpRequest(link);//获取源码后进行提取
//...然后把提取到链接又加回到 _queue里
}
}
这里可以通过多线程并发来处理队列,且可以控制处理线程的数量,以下是代码实例(缺少重复Uri判断,请自行通过缓存已处理的Uri判断处理)
/// <summary>
/// 请求队列
/// </summary>
public class RequestQueue
{
#region Constructors
/// <summary>
/// 构造函数
/// </summary>
/// <param name="minThreadCount">最小线程数量</param>
/// <param name="maxThreadCount">最大线程数量</param>
/// <param name="maxRequestCount">最大请求数量</param>
public RequestQueue(int minThreadCount, int maxThreadCount, int maxRequestCount)
{
m_queue = new Queue<Uri>();
m_minThreadCount = minThreadCount;
m_maxThreadCount = maxThreadCount;
m_maxRequestCount = maxRequestCount;
m_totalThreadCount = 0;
m_activeThreadCount = 0;
m_stopped = false;
}
#endregion
#region IDisposable Members
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// 释放资源
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
lock (m_lock)
{
m_stopped = true;
Monitor.PulseAll(m_lock);
m_queue.Clear();
}
}
}
#endregion
#region Public Members
/// <summary>
/// 将处理请求加入计划队列
/// </summary>
/// <param name="request">请求处理的Uri</param>
public void ScheduleIncomingRequest(Uri request)
{
bool tooManyRequests = false;
// 加入队列
lock (m_lock)
{
// 检查请求数量
if (m_stopped || m_queue.Count >= m_maxRequestCount)
{
tooManyRequests = true;
}
else
{
m_queue.Enqueue(request);
// 如果有空闲线程,则唤醒空闲线程去处理
if (m_activeThreadCount < m_totalThreadCount)
{
Monitor.Pulse(m_lock);
}
// 如果没有空闲线程,且线程总数没有超过最大线程数量,则开启一个新线程去处理
else if (m_totalThreadCount < m_maxThreadCount)
{
Thread thread = new Thread(OnProcessRequestQueue);
thread.IsBackground = true;
thread.Start(null);
m_totalThreadCount++;
m_activeThreadCount++; // 增加一个活动线程
LogHelper.Info("开启新线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
}
}
}
if (tooManyRequests)
{
throw new Exception("请求数量过多,请稍后再提交请求");
}
}
#endregion
#region Private Methods
/// <summary>
/// 处理请求队列
/// </summary>
private void OnProcessRequestQueue()
{
lock (m_lock)
{
while (true)
{
// 检查队列是否为空
while (m_queue.Count == 0)
{
m_activeThreadCount--;
// 等待请求, 如果超时没有获取到新的请求,且线程数量大于最小线程数量,就释放该线程
if (m_stopped || (!Monitor.Wait(m_lock, 30000) && m_totalThreadCount > m_minThreadCount))
{
m_totalThreadCount--;
LogHelper.Info("终止线程: " + Thread.CurrentThread.ManagedThreadId + ",当前线程总数: " + m_totalThreadCount + ",活动线程数量" + m_activeThreadCount);
return; // 跳出,终止线程
}
m_activeThreadCount++;
}
Uri request = m_queue.Dequeue();
Monitor.Exit(m_lock);
try
{
// 这里调用爬虫方法,对Uri进行处理
// GetLink(Uri)
}
catch (Exception ex)
{
LogHelper.Error(ex, "处理Uri发生未知错误");
}
finally
{
Monitor.Enter(m_lock);
}
}
}
}
#endregion
#region Private Fields
private object m_lock = new object();
private Queue<Uri> m_queue;
private int m_totalThreadCount;
private int m_activeThreadCount;
private int m_maxThreadCount;
private int m_minThreadCount;
private int m_maxRequestCount;
private bool m_stopped;
#endregion
}
建议使用BurpSuit进行爬网,不仅能爬取所有的链接,还能检测漏洞
递归开启子线程,爬到没有链接继续时就停止。
用task1.continuewith(task2)比较方便,爬出来方便链接分组
东西是这么个东西,你写的没有啥问题。不过在数据结构选择上。 BlockingCollection 代码更直观点
当然这玩意目前是个粗胚,还有很多东西需要处理
1.无论是这个队列,还是 BlockingCollection,你那50个task里需要处理try,不然出现异常后task自己就结束了
2 你目前缺少一些已采集过的判定,这个理论上用redis更好,量不大的情况用内存级的布隆过滤器也行,量大的情况请考虑bloom过滤器+redis去过滤掉已经采集过的地址,避免重复加入相同地址的任务
3.小心处理内存,并行开发的情况。应该使用限流手段。比如你这个50并行控制就不错
你可以使用生产消费模型来处理,一个生产网址,一个爬取网址。使用task来处理,不用考虑并发处理。