[python web scraping 2nd edition]4 笔记
并发并行下载
Alexa提供了最受欢迎的100万个网站列表(http://www.alexa.com/topsites ),我们也可以通过http://s3.amazonaws.com/alexa-static/top-1m.csv.zip 直接下载这一列表的压缩文件,这样就不用去提取Alexa网站的数据了。
解析Alexa list
- Download the .zip file.
- Extract the CSV file from the .zip file.
- Parse the CSV file.
- Iterate each row of the CSV file to extract the domain.
import csv
from zipfile import ZipFile
from io import TextIOWrapper, BytesIO
import requests
class AlexaCallback:
def __init__(self, max_urls=500):
self.max_urls = max_urls
self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
self.urls = []
def __call__(self):
resp = requests.get(self.seed_url, stream=True)
with ZipFile(BytesIO(resp.content)) as zf:
csv_filename = zf.namelist()[0]
with zf.open(csv_filename) as csv_file:
for _, website in csv.reader(TextIOWrapper(csv_file)):
self.urls.append('http://' + website)
if len(self.urls) == self.max_urls:
break
多线程下载
在一个进程中,程序的执行也是不同线程间进行切换的,每个线程执行程序的的不同部分。这就意味着当一个线程等待网页下载时,进程可以切换到其他线程执行,避免浪费处理器时间。
import re
import socket
import threading
import time
from urllib import robotparser
from urllib.parse import urljoin, urlparse
from ch3 import Downloader
SLEEP_TIME = 1
socket.setdefaulttimeout(60)
def get_robots_parser(robots_url):
" Return the robots parser object using the robots_url "
try:
rp = robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp
except Exception as e:
print('Error finding robots_url:', robots_url, e)
def get_links(html):
" Return a list of links (using simple regex matching) from the html content "
# a regular expression to extract all links from the webpage
webpage_regex = re.compile("""<a[^>]+href=["'](.*?)["']""", re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html)
def threaded_crawler(start_url, link_regex, user_agent='wswp', proxies=None,
delay=3, max_depth=4, num_retries=2, cache={}, max_threads=10, scraper_callback=None):
""" Crawl from the given start URLs following links matched by link_regex. In this
implementation, we do not actually scrape any information.
args:
start_url (str or list of strs): web site(s) to start crawl
link_regex (str): regex to match for links
kwargs:
user_agent (str): user agent (default: wswp)
proxies (list of dicts): a list of possible dicts for http / https proxies
For formatting, see the requests library
delay (int): seconds to throttle between requests to one domain (default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
num_retries (int): # of retries when 5xx error (default: 2)
cache (dict): cache dict with urls as keys and dicts for responses (default: {})
scraper_callback: function to be called on url and html content
"""
if isinstance(start_url, list):
crawl_queue = start_url
else:
crawl_queue = [start_url]
# keep track which URL's have seen before
seen, robots = {}, {}
D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies, cache=cache)
def process_queue():
while crawl_queue:
url = crawl_queue.pop()
no_robots = False
if not url or 'http' not in url:
continue
domain = '{}://{}'.format(urlparse(url).scheme, urlparse(url).netloc)
rp = robots.get(domain)
if not rp and domain not in robots:
robots_url = '{}/robots.txt'.format(domain)
rp = get_robots_parser(robots_url)
if not rp:
# issue finding robots.txt, still crawl
no_robots = True
robots[domain] = rp
elif domain in robots:
no_robots = True
# check url passes robots.txt restrictions
if no_robots or rp.can_fetch(user_agent, url):
depth = seen.get(url, 0)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
html = D(url, num_retries=num_retries)
if not html:
continue
if scraper_callback:
links = scraper_callback(url, html) or []
else:
links = []
# filter for links matching our regular expression
for link in get_links(html) + links:
if re.match(link_regex, link):
if 'http' not in link:
if link.startswith('//'):
link = '{}:{}'.format(urlparse(url).scheme, link)
elif link.startswith('://'):
link = '{}{}'.format(urlparse(url).scheme, link)
else:
link = urljoin(domain, link)
if link not in seen:
seen[link] = depth + 1
crawl_queue.append(link)
else:
print('Blocked by robots.txt:', url)
# wait for all download threads to finish
threads = []
print(max_threads)
while threads or crawl_queue:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
while len(threads) < max_threads and crawl_queue:
# can start some more threads
thread = threading.Thread(target=process_queue)
thread.setDaemon(True) # set daemon so main thread can exit w/ ctrl-c
thread.start()
threads.append(thread)
print(threads)
for thread in threads:
thread.join()
time.sleep(SLEEP_TIME)
import threading
import time
from ch3 import MongoCache
if __name__ == '__main__':
max_threads = threading.active_count()
AC = AlexaCallback()
AC()
cache = MongoCache()
#cache.clear()
start_time = time.time()
threaded_crawler(AC.urls, link_regex='$^',cache=cache, max_threads=max_threads)
print('Total time: %ss' % (time.time() - start_time))
threaded_crawler在循环中会不断创建线程,直到达到线程池threads的最大值。在爬取过程中,如果当前列队没有更多可以爬取的URL时,该线程会提前停止。
多进程下载
为了进一步提高性能,可以扩展多线程示例以支持多个过程。 目前,crawl queue保存在本地内存中,这意味着其他进程无法对同一crawl queue做出贡献。 要解决此问题,crawl queue将转移到mongo。 独立存储 queue意味着即使是不同服务器上的爬虫也可以在同一个crawl queue上进行协作。
import multiprocessing
from urllib.parse import urljoin, urlparse
def clean_link(url, domain, link):
if link.startswith('//'):
link = '{}:{}'.format(urlparse(url).scheme, link)
elif link.startswith('://'):
link = '{}{}'.format(urlparse(url).scheme, link)
else:
link = urljoin(domain, link)
return link
def get_links(html, link_regex):
" Return a list of links (using simple regex matching) from the html content "
# a regular expression to extract all links from the webpage
webpage_regex = re.compile("""<a[^>]+href=["'](.*?)["']""", re.IGNORECASE)
# list of all links from the webpage
links = webpage_regex.findall(html)
links = (link for link in links if re.match(link_regex, link))
return links
def threaded_crawler_rq(start_url, link_regex, user_agent='wswp', proxies=None,
delay=3, max_depth=4, num_retries=2, cache={}, max_threads=10, scraper_callback=None):
""" Crawl from the given start URLs following links matched by link_regex. In this
implementation, we do not actually scrape any information.
args:
start_url (str or list of strs): web site(s) to start crawl
link_regex (str): regex to match for links
kwargs:
user_agent (str): user agent (default: wswp)
proxies (list of dicts): a list of possible dicts
for http / https proxies
For formatting, see the requests library
delay (int): seconds to throttle between requests to one domain
(default: 3)
max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
num_retries (int): # of retries when 5xx error (default: 2)
cache (dict): cache dict with urls as keys
and dicts for responses (default: {})
scraper_callback: function to be called on url and html content
"""
crawl_queue = MongoQueue()
crawl_queue.clear()
crawl_queue.push(start_url)
# keep track which URL's have seen before
robots = {}
D = Downloader(delay=delay, user_agent=user_agent,
proxies=proxies, cache=cache)
def process_queue():
while len(crawl_queue):
url = crawl_queue.pop()
no_robots = False
if not url or 'http' not in url:
continue
domain = '{}://{}'.format(urlparse(url).scheme,
urlparse(url).netloc)
rp = robots.get(domain)
if not rp and domain not in robots:
robots_url = '{}/robots.txt'.format(domain)
rp = get_robots_parser(robots_url)
if not rp:
# issue finding robots.txt, still crawl
no_robots = True
robots[domain] = rp
elif domain in robots:
no_robots = True
# check url passes robots.txt restrictions
if no_robots or rp.can_fetch(user_agent, url):
depth = crawl_queue.get_depth(url)
if depth == max_depth:
print('Skipping %s due to depth' % url)
continue
html = D(url, num_retries=num_retries)
if not html:
continue
if scraper_callback:
links = scraper_callback(url, html) or []
else:
links = []
# filter for links matching our regular expression
for link in list(get_links(html, link_regex)) + links:
if 'http' not in link:
link = clean_link(url, domain, link)
crawl_queue.push(link)
crawl_queue.set_depth(link, depth + 1)
else:
print('Blocked by robots.txt:', url)
# wait for all download threads to finish
threads = []
while threads or len(crawl_queue):
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
while len(threads) < max_threads and crawl_queue:
# can start some more threads
thread = threading.Thread(target=process_queue)
thread.setDaemon(True) # set daemon so main thread can exit w/ ctrl-c
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
time.sleep(SLEEP_TIME)
def mp_threaded_crawler(*args, **kwargs):
""" create a multiprocessing threaded crawler """
processes = []
num_procs = kwargs.pop('num_procs')
if not num_procs:
num_procs = multiprocessing.cpu_count()
for _ in range(num_procs):
proc = multiprocessing.Process(target=threaded_crawler_rq, args=args, kwargs=kwargs)
proc.start()
processes.append(proc)
# wait for processes to complete
for proc in processes:
proc.join()
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MongoQueue:
"""
>>> timeout = 1
>>> url = 'http://example.webscraping.com'
>>> q = MongoQueue(timeout=timeout)
>>> q.clear() # ensure empty queue
>>> q.push(url) # add test URL
>>> q.peek() == q.pop() == url # pop back this URL
True
>>> q.repair() # immediate repair will do nothing
>>> q.pop() # another pop should be empty
>>> q.peek()
>>> import time; time.sleep(timeout) # wait for timeout
>>> q.repair() # now repair will release URL
Released: test
>>> q.pop() == url # pop URL again
True
>>> bool(q) # queue is still active while outstanding
True
>>> q.complete(url) # complete this URL
>>> bool(q) # queue is not complete
False
"""
# possible states of a download
OUTSTANDING, PROCESSING, COMPLETE = range(3)
def __init__(self, client=None, timeout=300):
"""
host: the host to connect to MongoDB
port: the port to connect to MongoDB
timeout: the number of seconds to allow for a timeout
"""
self.client = MongoClient() if client is None else client
self.db = self.client.cache
self.timeout = timeout
def __nonzero__(self):
"""Returns True if there are more jobs to process
"""
record = self.db.crawl_queue.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False
def push(self, url):
"""Add new URL to queue if does not exist
"""
try:
self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING})
except errors.DuplicateKeyError as e:
pass # this is already in the queue
def pop(self):
"""Get an outstanding URL from the queue and set its status to processing.
If the queue is empty a KeyError exception is raised.
"""
record = self.db.crawl_queue.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError()
def peek(self):
record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def repair(self):
"""Release stalled jobs
"""
record = self.db.crawl_queue.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print ('Released:', record['_id'])
def clear(self):
self.db.crawl_queue.drop()
运行程序出现上示错误,目前还未解决,猜测是pickle模块要对内部的成员变量进行序列化,自定义的数据库存储类MongoQueue, 造成进程池内部处理封装过程无法对其进行处理。