Web 爬虫性能优化完全指南

从零开始构建高效稳定的爬虫系统

简介:爬虫的性能直接影响数据采集的效率和成本。本文将系统性地介绍爬虫性能优化的各个方面,从基础优化到高级架构,帮助你构建高性能的爬虫系统。

一、性能优化的核心指标

  • 吞吐量(Throughput):单位时间内处理的请求数量
  • 延迟(Latency):单个请求的响应时间
  • 并发数(Concurrency):同时处理的请求数量
  • 成功率(Success Rate):成功请求占总请求的比例
  • 资源占用:CPU、内存、网络带宽的使用情况

二、基础优化策略

2.1 连接池管理

import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 创建 Session session = requests.Session() # 配置重试策略 retry_strategy = Retry( total=3, # 最大重试次数 backoff_factor=1, # 退避因子 status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码 allowed_methods=["GET", "POST"] # 允许重试的 HTTP 方法 ) # 配置适配器(连接池) adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=100, # 连接池大小 pool_maxsize=100, # 最大连接数 ) session.mount("http://", adapter) session.mount("https://", adapter)

2.2 Session 复用

# 不推荐:每次请求都创建新连接 def bad_example(urls): for url in urls: response = requests.get(url) # 每次都建立新连接 print(response.status_code) # 推荐:复用 Session def good_example(urls): session = requests.Session() # 创建一次 Session for url in urls: response = session.get(url) # 复用连接 print(response.status_code)

2.3 禁用 SSL 验证(开发环境)

# 生产环境不推荐,开发环境可提速 response = session.get(url, verify=False) # 或者直接在 Session 上禁用 session.verify = False

注意:禁用 SSL 验证会带来安全风险,仅在开发测试环境使用。

三、并发请求优化

3.1 使用 threading

import threading from queue import Queue import requests class Worker(threading.Thread): def __init__(self, queue): super().__init__() self.queue = queue self.session = requests.Session() # 每个线程有自己的 Session def run(self): while True: url = self.queue.get() if url is None: break try: response = self.session.get(url) print(f"{url}: {response.status_code}") except Exception as e: print(f"Error fetching {url}: {e}") finally: self.queue.task_done() # 创建任务队列 queue = Queue() for url in urls: queue.put(url) # 启动工作线程 threads = [] num_threads = 10 for _ in range(num_threads): worker = Worker(queue) worker.start() threads.append(worker) # 等待所有任务完成 queue.join() # 停止工作线程 for _ in range(num_threads): queue.put(None) for thread in threads: thread.join()

3.2 使用 concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed import requests def fetch_url(url): session = requests.Session() try: response = session.get(url) return {'url': url, 'status': response.status_code, 'data': response.text} except Exception as e: return {'url': url, 'error': str(e)} # 使用线程池 with ThreadPoolExecutor(max_workers=20) as executor: # 提交所有任务 futures = {executor.submit(fetch_url, url): url for url in urls} # 处理完成的任务 for future in as_completed(futures): result = future.result() print(f"Processed: {result['url']}")

四、异步爬虫优化

4.1 使用 asyncio + aiohttp

import asyncio import aiohttp from aiohttp import ClientSession, TCPConnector async def fetch_page(session, url): try: async with session.get(url) as response: return await response.text() except Exception as e: print(f"Error fetching {url}: {e}") return None async def fetch_all(urls, concurrency=100): # 配置连接池 connector = TCPConnector( limit=concurrency, # 最大连接数 limit_per_host=20, # 单 host 最大连接数 ttl_dns_cache=300, # DNS 缓存时间 ) timeout = aiohttp.ClientTimeout(total=30) # 超时设置 async with ClientSession(connector=connector, timeout=timeout) as session: tasks = [fetch_page(session, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) return results # 运行异步爬虫 async def main(): urls = ['https://example.com/page/' + str(i) for i in range(1, 1000)] results = await fetch_all(urls, concurrency=100) print(f"Fetched {len(results)} pages") asyncio.run(main())

4.2 限速控制

import asyncio class RateLimiter: def __init__(self, rate_limit, period=1.0): self.rate_limit = rate_limit # 每秒请求数 self.period = period self.semaphore = asyncio.Semaphore(rate_limit) async def acquire(self): async with self.semaphore: await asyncio.sleep(self.period / self.rate_limit) # 使用限速器 async def fetch_with_limit(session, url, limiter): await limiter.acquire() async with session.get(url) as response: return await response.text() async def main(): limiter = RateLimiter(rate_limit=10) # 每秒 10 个请求 async with aiohttp.ClientSession() as session: tasks = [fetch_with_limit(session, url, limiter) for url in urls] results = await asyncio.gather(*tasks)

五、分布式爬虫架构

5.1 使用 Redis 作为任务队列

import redis import json # Redis 任务队列 class TaskQueue: def __init__(self, host='localhost', port=6379, db=0): self.redis = redis.Redis(host=host, port=port, db=db, decode_responses=True) self.task_queue = 'spider:tasks' self.failed_queue = 'spider:failed' self.visited_set = 'spider:visited' def push_task(self, url): if not self.redis.sismember(self.visited_set, url): self.redis.lpush(self.task_queue, url) self.redis.sadd(self.visited_set, url) return True return False def pop_task(self): return self.redis.rpop(self.task_queue) def push_failed(self, url): self.redis.lpush(self.failed_queue, url) # 多进程消费者 from multiprocessing import Process, cpu_count def worker(task_queue): while True: url = task_queue.pop_task() if not url: break try: response = requests.get(url) print(f"Processed: {url}") except Exception as e: task_queue.push_failed(url) print(f"Failed: {url}, Error: {e}") # 启动多个进程 if __name__ == '__main__': task_queue = TaskQueue() processes = [] for _ in range(cpu_count()): p = Process(target=worker, args=(task_queue,)) p.start() processes.append(p) for p in processes: p.join()

5.2 使用 Scrapy 框架

# settings.py BOT_NAME = 'myspider' # 并发设置 CONCURRENT_REQUESTS = 16 CONCURRENT_REQUESTS_PER_DOMAIN = 8 CONCURRENT_REQUESTS_PER_IP = 8 # 下载延迟 DOWNLOAD_DELAY = 1 RANDOMIZE_DOWNLOAD_DELAY = True # 超时设置 DOWNLOAD_TIMEOUT = 30 # 重试设置 RETRY_ENABLED = True RETRY_TIMES = 3 RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429] # 缓存设置 HTTPCACHE_ENABLED = True HTTPCACHE_EXPIRATION_SECS = 0 HTTPCACHE_DIR = 'httpcache' # 中间件 DOWNLOADER_MIDDLEWARES = { 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 400, 'scrapy.downloadermiddlewares.retry.RetryMiddleware': 90, 'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 110, }

六、数据处理优化

6.1 流式处理大数据

import json def process_large_json(file_path, output_path): with open(file_path, 'r', encoding='utf-8') as infile, \ open(output_path, 'w', encoding='utf-8') as outfile: for line in infile: data = json.loads(line) # 处理数据 processed_data = transform_data(data) # 写入结果 outfile.write(json.dumps(processed_data, ensure_ascii=False) + '\n') def transform_data(data): # 数据转换逻辑 return data

6.2 使用生成器节省内存

# 不推荐:一次性加载所有数据 def bad_method(urls): results = [] for url in urls: data = fetch_data(url) results.append(data) return results # 占用大量内存 # 推荐:使用生成器 def good_method(urls): for url in urls: yield fetch_data(url) # 逐个返回,不占用大量内存 # 使用 for data in good_method(urls): process_data(data)

七、监控和日志

import logging from time import time # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('spider.log'), logging.StreamHandler() ] ) logger = logging.getLogger('spider') # 性能监控装饰器 def performance_monitor(func): def wrapper(*args, **kwargs): start_time = time() result = func(*args, **kwargs) end_time = time() logger.info(f"{func.__name__} executed in {end_time - start_time:.2f}s") return result return wrapper # 使用示例 @performance_monitor def fetch_page(url): response = requests.get(url) return response.text

八、异常处理和恢复

import time from functools import wraps def retry_on_failure(max_retries=3, backoff_factor=2): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 while retries < max_retries: try: return func(*args, **kwargs) except Exception as e: retries += 1 if retries == max_retries: raise wait_time = backoff_factor ** retries logger.warning(f"Retry {retries}/{max_retries} after {wait_time}s: {e}") time.sleep(wait_time) return wrapper return decorator # 使用示例 @retry_on_failure(max_retries=3) def fetch_with_retry(url): return requests.get(url)

九、优化建议总结

  • 选择合适的并发模型:小规模用线程,大规模用异步或分布式
  • 合理设置并发数:根据服务器承载能力和网络带宽调整
  • 连接池复用:避免频繁创建和销毁连接
  • 限速控制:避免触发反爬限制
  • 异常处理:完善的重试和恢复机制
  • 监控日志:实时监控爬虫状态,及时发现和解决问题
  • 数据缓存:缓存已爬取的数据,避免重复请求
  • 资源释放:及时关闭文件、连接等资源

温馨提示:使用 EasySpider 在线工具可以快速测试接口,获取正确的请求参数,为高性能爬虫的开发提供准确的参考数据。

总结

爬虫性能优化是一个系统工程,需要从多个维度综合考虑。通过本文的学习,你应该能够:

  • 理解性能优化的核心指标
  • 掌握基础和高级优化策略
  • 构建高效的并发和异步爬虫
  • 设计分布式爬虫架构
  • 实现完善的监控和恢复机制