Web 爬虫开发最佳实践

从入门到精通的爬虫开发指南

简介:Web 爬虫是获取互联网数据的重要工具。本文将总结 Web 爬虫开发中的最佳实践,包括反爬虫应对策略、数据存储优化、并发处理等高级技巧,帮助开发者构建高效、稳定的爬虫系统。

一、爬虫开发基础原则

  • 遵守 robots.txt:尊重网站的爬虫协议
  • 控制请求频率:避免对目标网站造成压力
  • 设置合理的 User-Agent:模拟真实浏览器访问
  • 处理异常情况:网络错误、超时、404 等
  • 数据验证:确保获取的数据格式正确

二、请求优化

2.1 使用 Session 保持连接

import requests # 使用 Session 保持连接 session = requests.Session() # 设置 Session 参数 session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) # 复用连接 response1 = session.get('https://example.com/page1') response2 = session.get('https://example.com/page2') # 关闭 Session session.close()

2.2 设置超时和重试

from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 创建 Session session = requests.Session() # 配置重试策略 retry_strategy = Retry( total=3, # 总重试次数 backoff_factor=1, # 重试间隔因子 status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码 allowed_methods=["HEAD", "GET", "OPTIONS"] # 允许重试的方法 ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) # 发送请求(设置超时) try: response = session.get('https://example.com', timeout=10) print(response.text) except requests.exceptions.Timeout: print("请求超时") except requests.exceptions.RequestException as e: print(f"请求失败: {e}")

2.3 使用连接池

from requests.adapters import HTTPAdapter # 配置连接池 session = requests.Session() # 设置连接池大小 adapter = HTTPAdapter( pool_connections=10, # 连接池大小 pool_maxsize=100, # 最大连接数 max_retries=3 ) session.mount('http://', adapter) session.mount('https://', adapter)

三、反爬虫应对策略

3.1 User-Agent 轮换

import random # User-Agent 列表 USER_AGENTS = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15' ] def get_random_user_agent(): """获取随机 User-Agent""" return random.choice(USER_AGENTS) # 使用示例 headers = { 'User-Agent': get_random_user_agent() } response = requests.get('https://example.com', headers=headers)

3.2 IP 代理池

import requests # 代理池 PROXIES = [ {'http': 'http://proxy1.example.com:8080'}, {'http': 'http://proxy2.example.com:8080'}, {'http': 'http://proxy3.example.com:8080'} ] def get_random_proxy(): """获取随机代理""" return random.choice(PROXIES) # 使用代理 proxy = get_random_proxy() response = requests.get('https://example.com', proxies=proxy, timeout=10)

3.3 请求延迟

import time import random def random_delay(min_delay=1, max_delay=3): """随机延迟""" delay = random.uniform(min_delay, max_delay) time.sleep(delay) # 使用示例 for url in urls: response = requests.get(url) # 处理响应 random_delay() # 随机延迟 1-3 秒

3.4 Cookie 处理

import requests # 使用 Session 自动处理 Cookie session = requests.Session() # 首次请求获取 Cookie login_data = { 'username': 'your_username', 'password': 'your_password' } session.post('https://example.com/login', data=login_data) # 后续请求自动携带 Cookie response = session.get('https://example.com/protected_page') # 手动设置 Cookie cookies = { 'session_id': 'abc123', 'user_token': 'xyz789' } response = requests.get('https://example.com', cookies=cookies)

四、数据解析

4.1 使用 BeautifulSoup

from bs4 import BeautifulSoup import requests # 获取页面内容 response = requests.get('https://example.com') soup = BeautifulSoup(response.text, 'html.parser') # 查找元素 title = soup.find('h1').text links = soup.find_all('a', class_='link') # 提取数据 for link in links: print(link.get('href'), link.text)

4.2 使用 XPath

from lxml import html import requests # 获取页面内容 response = requests.get('https://example.com') tree = html.fromstring(response.text) # 使用 XPath 提取数据 title = tree.xpath('//h1/text()')[0] links = tree.xpath('//a[@class="link"]/@href') for link in links: print(link)

4.3 使用正则表达式

import re # 提取邮箱 email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' emails = re.findall(email_pattern, text) # 提取 URL url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' urls = re.findall(url_pattern, text) # 提取手机号 phone_pattern = r'1[3-9]\d{9}' phones = re.findall(phone_pattern, text)

五、数据存储

5.1 存储到 CSV

import csv # 写入 CSV with open('data.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['标题', '链接', '日期']) for item in data: writer.writerow([item['title'], item['url'], item['date']])

5.2 存储到 JSON

import json # 写入 JSON with open('data.json', 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) # 读取 JSON with open('data.json', 'r', encoding='utf-8') as f: data = json.load(f)

5.3 存储到数据库

import sqlite3 # 创建数据库连接 conn = sqlite3.connect('data.db') cursor = conn.cursor() # 创建表 cursor.execute(''' CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, url TEXT UNIQUE, content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 插入数据 for item in data: try: cursor.execute(''' INSERT INTO articles (title, url, content) VALUES (?, ?, ?) ''', (item['title'], item['url'], item['content'])) except sqlite3.IntegrityError: print(f"URL 已存在: {item['url']}") # 提交事务 conn.commit() conn.close()

六、并发处理

6.1 使用多线程

import threading import queue # 创建任务队列 task_queue = queue.Queue() results = [] # 添加任务 for url in urls: task_queue.put(url) # 工作线程函数 def worker(): while not task_queue.empty(): url = task_queue.get() try: response = requests.get(url) results.append(response.text) except Exception as e: print(f"Error fetching {url}: {e}") finally: task_queue.task_done() # 创建并启动线程 threads = [] for i in range(5): # 5 个线程 t = threading.Thread(target=worker) t.start() threads.append(t) # 等待所有线程完成 for t in threads: t.join()

6.2 使用异步请求

import asyncio import aiohttp async def fetch(session, url): """异步获取页面""" try: async with session.get(url) as response: return await response.text() except Exception as e: print(f"Error fetching {url}: {e}") return None async def fetch_all(urls): """并发获取多个页面""" async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] return await asyncio.gather(*tasks) # 使用示例 urls = ['https://example.com/page1', 'https://example.com/page2'] results = asyncio.run(fetch_all(urls))

七、使用 EasySpider 工具

EasySpider 提供的工具可以辅助爬虫开发:

  • Curl 转 Python:快速将浏览器请求转换为 Python 代码
  • JSON 格式化:解析和格式化 API 返回的 JSON 数据
  • URL 提取:从页面中提取和解析 URL 参数
  • 在线加解密:处理加密的请求参数
  • 文本对比:对比页面变化
  • IP 查询:获取 IP 地理位置信息

开发流程建议:

  • 使用浏览器开发者工具分析请求
  • 复制 Curl 命令
  • 使用 EasySpider 转换为 Python 代码
  • 根据需要调整代码
  • 测试和优化

八、错误处理和日志

import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='spider.log' ) logger = logging.getLogger(__name__) def fetch_with_retry(url, max_retries=3): """带重试的请求函数""" for attempt in range(max_retries): try: logger.info(f"Fetching {url} (attempt {attempt + 1})") response = requests.get(url, timeout=10) response.raise_for_status() logger.info(f"Successfully fetched {url}") return response except requests.exceptions.HTTPError as e: logger.error(f"HTTP error: {e}") if e.response.status_code == 404: logger.warning(f"Page not found: {url}") return None except requests.exceptions.Timeout: logger.error(f"Timeout: {url}") except requests.exceptions.RequestException as e: logger.error(f"Request error: {e}") if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 logger.error(f"Failed to fetch {url} after {max_retries} attempts") return None

九、监控和维护

  • 监控爬虫运行状态
  • 记录成功和失败的请求
  • 定期检查目标网站结构变化
  • 更新反爬虫策略
  • 优化性能和资源使用

十、法律和道德

重要提醒:

  • 遵守网站的 robots.txt 协议
  • 尊重版权和知识产权
  • 不要爬取个人隐私信息
  • 控制请求频率,避免对服务器造成压力
  • 遵守相关法律法规

总结

Web 爬虫开发需要综合考虑技术、性能、法律等多个方面。通过本文的学习,你应该能够:

  • 遵循爬虫开发的基本原则
  • 优化请求性能
  • 应对常见的反爬虫策略
  • 高效解析和存储数据
  • 实现并发处理
  • 使用 EasySpider 工具提高开发效率