跳转至

使用ProcessPoolExecutor模块设计网络爬虫

正如concurrent.futures模块提供了ThreadPoolExecutor,方便创建和操作多个线程,进程属于ProcessPoolExecutor类。 ProcessPoolExecutor 类也包含在 concurrent.futures 包中,用于实现我们的并行 Web 爬虫。 为了实施这个案例研究,我们创建了一个名为 process_pool_executor_web_crawler.pyPython 模块。

代码以前面示例中已知的导入启动,例如requestsManager 模块等。 关于任务的定义,以及线程的使用,与上一章的示例相比几乎没有变化,只是现在我们发送数据以通过函数参数形式进行; 参考以下说明:

group_urls_task函数定义如下:

def group_urls_task(urls, result_dict, html_link_regex)

crawl_task函数定义如下:

def crawl_task(url, html_link_regex)

现在让我们看一下代码块,其中有一些细微但相关的变化。进入主代码块,我们声明了一个Manager类型的对象,现在它将允许共享队列,而不仅仅是包含处理结果的字典。为了定义这个包含需要抓取的URL的名为urls的队列,我们将使用Manager.Queue对象。对于结果字典,我们将使用Manager.dict对象,旨在使用一个由代理管理的字典。下面示例代码说明了这些定义。

if __name__ == '__main__':
    manager = Manager()
    urls = manager.Queue()
    urls.put("http://br.bing.com/")
    urls.put("https://github.com")
    result_dict = manager.dict()

然后,我们定义了爬虫阶段要使用的正则表达式,并获得了运行该程序的机器的处理器数量,如以下代码所示:

html_link_regex = re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
number_of_cpus = cpu_count()

在最后一个代码块中,我们可以注意到concurrent.futures模块中的API的一致性。下面这块内容正是我们在上一章中提到的使用ThreadPoolExecutor的例子中使用的。然而,通过改变内部行为和解决CPU绑定进程的GIL问题,将该类改为ProcessPoolExecutor就足够了,而不会破坏代码。检查以下几个代码块;这两块都创建了ProcessPoolExecutor,其工作者的限制等于机器中处理器的数量。第一个executor用于将字典中的URL以标准的None值分组。第二个executor进行抓取阶段。

下面是第一个executor的一代码块。

with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:
        for i in range(urls.qsize()):
            group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)

第二个executor的代码块如下:

with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:
        future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}
        for future in concurrent.futures.as_completed(future_tasks):
            result_dict[future.result()[0]] = future.result()[1]

使用 concurrent.futures 从多线程模式切换到多进程稍微简单一些。

程序运行结果如下图:

$ python process_pool_executor_web_crawler.py
[SpawnProcess-4] putting url [http://www.google.com] in dictionary...
[SpawnProcess-4] putting url [http://br.bing.com/] in dictionary...
[SpawnProcess-4] putting url [https://duckduckgo.com/] in dictionary...
[SpawnProcess-4] putting url [http://br.search.yahoo.com/] in dictionary...
[SpawnProcess-2] putting url [https://github.com/] in dictionary...
[SpawnProcess-11] crawling url [https://duckduckgo.com/] ...
[SpawnProcess-10] crawling url [http://www.google.com] ...
[SpawnProcess-8] crawling url [https://github.com/] ...
[SpawnProcess-7] crawling url [http://br.bing.com/] ...
[SpawnProcess-9] crawling url [http://br.search.yahoo.com/] ...
[http://www.google.com] with links: [http://www.google.com.hk/imghp?hl=zh-TW&tab=wi...
[http://br.bing.com/] with links: [javascript:void(0);...
[https://duckduckgo.com/] with links: [/about...
[http://br.search.yahoo.com/] with links: [https://br.yahoo.com/...
[https://github.com/] with links: [#start-of-content...

完整示例

译者注:

import sys
import re
import queue 
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Manager, cpu_count, current_process

import requests


result_dict = {}

def group_urls_task(urls, result_dict, html_link_regex):
    try:
        url = urls.get(True, 0.05)  # true表示阻塞其他线程访问这个队列,0.05表示阻塞的超时时间
        result_dict[url] = None
        print("[%s] putting url [%s] in dictionary..." % (current_process().name, url))
    except queue.Empty:
        print('Nothing to be done, queue is empty')

def crawl_task(url, html_link_regex):
    links = []
    try:
        request_data = requests.get(url)
        print("[%s] crawling url [%s] ..." % (current_process().name, url))
        links = html_link_regex.findall(request_data.text)
    except:
        print(f"error: {sys.exc_info()[0]}")
        raise
    finally:
        return (url, links)

if __name__ == "__main__":

    manager = Manager()
    urls = manager.Queue()
    urls.put('http://www.google.com')
    urls.put('http://br.bing.com/')
    urls.put('https://duckduckgo.com/')
    urls.put('https://github.com/')
    urls.put('http://br.search.yahoo.com/')
    result_dict = manager.dict()

    html_link_regex = re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')
    number_of_cpus = cpu_count()

    with ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:
        for i in range(urls.qsize()):
            group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)

    with ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:
        future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}
        for future in as_completed(future_tasks):
            result_dict[future.result()[0]] = future.result()[1]

    for url, links in result_dict.items():
        print(f"[{url}] with links: [{links[0]}...")

最后更新: 2023年3月1日
创建日期: 2023年3月1日