简介:爬虫的性能直接影响数据采集的效率和成本。本文将系统性地介绍爬虫性能优化的各个方面,从基础优化到高级架构,帮助你构建高性能的爬虫系统。
一、性能优化的核心指标
- 吞吐量(Throughput):单位时间内处理的请求数量
- 延迟(Latency):单个请求的响应时间
- 并发数(Concurrency):同时处理的请求数量
- 成功率(Success Rate):成功请求占总请求的比例
- 资源占用:CPU、内存、网络带宽的使用情况
二、基础优化策略
2.1 连接池管理
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 创建 Session
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3, # 最大重试次数
backoff_factor=1, # 退避因子
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
allowed_methods=["GET", "POST"] # 允许重试的 HTTP 方法
)
# 配置适配器(连接池)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=100, # 连接池大小
pool_maxsize=100, # 最大连接数
)
session.mount("http://", adapter)
session.mount("https://", adapter)
2.2 Session 复用
# 不推荐:每次请求都创建新连接
def bad_example(urls):
for url in urls:
response = requests.get(url) # 每次都建立新连接
print(response.status_code)
# 推荐:复用 Session
def good_example(urls):
session = requests.Session() # 创建一次 Session
for url in urls:
response = session.get(url) # 复用连接
print(response.status_code)
2.3 禁用 SSL 验证(开发环境)
# 生产环境不推荐,开发环境可提速
response = session.get(url, verify=False)
# 或者直接在 Session 上禁用
session.verify = False
注意:禁用 SSL 验证会带来安全风险,仅在开发测试环境使用。
三、并发请求优化
3.1 使用 threading
import threading
from queue import Queue
import requests
class Worker(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
self.session = requests.Session() # 每个线程有自己的 Session
def run(self):
while True:
url = self.queue.get()
if url is None:
break
try:
response = self.session.get(url)
print(f"{url}: {response.status_code}")
except Exception as e:
print(f"Error fetching {url}: {e}")
finally:
self.queue.task_done()
# 创建任务队列
queue = Queue()
for url in urls:
queue.put(url)
# 启动工作线程
threads = []
num_threads = 10
for _ in range(num_threads):
worker = Worker(queue)
worker.start()
threads.append(worker)
# 等待所有任务完成
queue.join()
# 停止工作线程
for _ in range(num_threads):
queue.put(None)
for thread in threads:
thread.join()
3.2 使用 concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_url(url):
session = requests.Session()
try:
response = session.get(url)
return {'url': url, 'status': response.status_code, 'data': response.text}
except Exception as e:
return {'url': url, 'error': str(e)}
# 使用线程池
with ThreadPoolExecutor(max_workers=20) as executor:
# 提交所有任务
futures = {executor.submit(fetch_url, url): url for url in urls}
# 处理完成的任务
for future in as_completed(futures):
result = future.result()
print(f"Processed: {result['url']}")
四、异步爬虫优化
4.1 使用 asyncio + aiohttp
import asyncio
import aiohttp
from aiohttp import ClientSession, TCPConnector
async def fetch_page(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def fetch_all(urls, concurrency=100):
# 配置连接池
connector = TCPConnector(
limit=concurrency, # 最大连接数
limit_per_host=20, # 单 host 最大连接数
ttl_dns_cache=300, # DNS 缓存时间
)
timeout = aiohttp.ClientTimeout(total=30) # 超时设置
async with ClientSession(connector=connector, timeout=timeout) as session:
tasks = [fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 运行异步爬虫
async def main():
urls = ['https://example.com/page/' + str(i) for i in range(1, 1000)]
results = await fetch_all(urls, concurrency=100)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
4.2 限速控制
import asyncio
class RateLimiter:
def __init__(self, rate_limit, period=1.0):
self.rate_limit = rate_limit # 每秒请求数
self.period = period
self.semaphore = asyncio.Semaphore(rate_limit)
async def acquire(self):
async with self.semaphore:
await asyncio.sleep(self.period / self.rate_limit)
# 使用限速器
async def fetch_with_limit(session, url, limiter):
await limiter.acquire()
async with session.get(url) as response:
return await response.text()
async def main():
limiter = RateLimiter(rate_limit=10) # 每秒 10 个请求
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, limiter) for url in urls]
results = await asyncio.gather(*tasks)
五、分布式爬虫架构
5.1 使用 Redis 作为任务队列
import redis
import json
# Redis 任务队列
class TaskQueue:
def __init__(self, host='localhost', port=6379, db=0):
self.redis = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.task_queue = 'spider:tasks'
self.failed_queue = 'spider:failed'
self.visited_set = 'spider:visited'
def push_task(self, url):
if not self.redis.sismember(self.visited_set, url):
self.redis.lpush(self.task_queue, url)
self.redis.sadd(self.visited_set, url)
return True
return False
def pop_task(self):
return self.redis.rpop(self.task_queue)
def push_failed(self, url):
self.redis.lpush(self.failed_queue, url)
# 多进程消费者
from multiprocessing import Process, cpu_count
def worker(task_queue):
while True:
url = task_queue.pop_task()
if not url:
break
try:
response = requests.get(url)
print(f"Processed: {url}")
except Exception as e:
task_queue.push_failed(url)
print(f"Failed: {url}, Error: {e}")
# 启动多个进程
if __name__ == '__main__':
task_queue = TaskQueue()
processes = []
for _ in range(cpu_count()):
p = Process(target=worker, args=(task_queue,))
p.start()
processes.append(p)
for p in processes:
p.join()
5.2 使用 Scrapy 框架
# settings.py
BOT_NAME = 'myspider'
# 并发设置
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8
CONCURRENT_REQUESTS_PER_IP = 8
# 下载延迟
DOWNLOAD_DELAY = 1
RANDOMIZE_DOWNLOAD_DELAY = True
# 超时设置
DOWNLOAD_TIMEOUT = 30
# 重试设置
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
# 缓存设置
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0
HTTPCACHE_DIR = 'httpcache'
# 中间件
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 400,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 90,
'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 110,
}
六、数据处理优化
6.1 流式处理大数据
import json
def process_large_json(file_path, output_path):
with open(file_path, 'r', encoding='utf-8') as infile, \
open(output_path, 'w', encoding='utf-8') as outfile:
for line in infile:
data = json.loads(line)
# 处理数据
processed_data = transform_data(data)
# 写入结果
outfile.write(json.dumps(processed_data, ensure_ascii=False) + '\n')
def transform_data(data):
# 数据转换逻辑
return data
6.2 使用生成器节省内存
# 不推荐:一次性加载所有数据
def bad_method(urls):
results = []
for url in urls:
data = fetch_data(url)
results.append(data)
return results # 占用大量内存
# 推荐:使用生成器
def good_method(urls):
for url in urls:
yield fetch_data(url) # 逐个返回,不占用大量内存
# 使用
for data in good_method(urls):
process_data(data)
七、监控和日志
import logging
from time import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spider.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('spider')
# 性能监控装饰器
def performance_monitor(func):
def wrapper(*args, **kwargs):
start_time = time()
result = func(*args, **kwargs)
end_time = time()
logger.info(f"{func.__name__} executed in {end_time - start_time:.2f}s")
return result
return wrapper
# 使用示例
@performance_monitor
def fetch_page(url):
response = requests.get(url)
return response.text
八、异常处理和恢复
import time
from functools import wraps
def retry_on_failure(max_retries=3, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries == max_retries:
raise
wait_time = backoff_factor ** retries
logger.warning(f"Retry {retries}/{max_retries} after {wait_time}s: {e}")
time.sleep(wait_time)
return wrapper
return decorator
# 使用示例
@retry_on_failure(max_retries=3)
def fetch_with_retry(url):
return requests.get(url)
九、优化建议总结
- 选择合适的并发模型:小规模用线程,大规模用异步或分布式
- 合理设置并发数:根据服务器承载能力和网络带宽调整
- 连接池复用:避免频繁创建和销毁连接
- 限速控制:避免触发反爬限制
- 异常处理:完善的重试和恢复机制
- 监控日志:实时监控爬虫状态,及时发现和解决问题
- 数据缓存:缓存已爬取的数据,避免重复请求
- 资源释放:及时关闭文件、连接等资源
温馨提示:使用 EasySpider 在线工具可以快速测试接口,获取正确的请求参数,为高性能爬虫的开发提供准确的参考数据。
总结
爬虫性能优化是一个系统工程,需要从多个维度综合考虑。通过本文的学习,你应该能够:
- 理解性能优化的核心指标
- 掌握基础和高级优化策略
- 构建高效的并发和异步爬虫
- 设计分布式爬虫架构
- 实现完善的监控和恢复机制