简介:本文将分享社交媒体平台的数据采集经验,以微博和小红书为例,讲解如何采集用户发帖、评论、点赞等数据。社交媒体爬虫具有独特的挑战,如登录验证、动态加载、反爬机制等。
一、项目背景
应用场景:
- 舆情监控和分析
- 品牌声誉跟踪
- 用户行为研究
- 热点话题发现
- 竞品分析
采集目标:
- 用户发帖内容
- 互动数据(点赞、评论、转发)
- 用户信息(粉丝数、关注数)
- 话题标签
- 发布时间
二、微博爬虫实现
2.1 登录处理
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pickle
def weibo_login(username, password):
"""微博登录"""
# 配置浏览器
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 无头模式
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
driver = webdriver.Chrome(options=options)
try:
# 访问登录页面
driver.get("https://weibo.com/login.php")
# 等待登录表单加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, "username"))
)
# 输入用户名和密码
driver.find_element(By.NAME, "username").send_keys(username)
driver.find_element(By.NAME, "password").send_keys(password)
# 点击登录按钮
driver.find_element(By.CLASS_NAME, "btn_login").click()
# 等待登录成功
WebDriverWait(driver, 20).until(
EC.url_contains("home")
)
print("登录成功")
# 保存 Cookie
cookies = driver.get_cookies()
with open('weibo_cookies.pkl', 'wb') as f:
pickle.dump(cookies, f)
return driver
except Exception as e:
print(f"登录失败: {e}")
driver.quit()
return None
2.2 使用 Cookie 登录
def load_weibo_cookies():
"""加载 Cookie"""
options = webdriver.ChromeOptions()
options.add_argument('--headless')
driver = webdriver.Chrome(options=options)
# 先访问微博
driver.get("https://weibo.com")
# 加载 Cookie
try:
with open('weibo_cookies.pkl', 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
driver.add_cookie(cookie)
# 刷新页面使 Cookie 生效
driver.refresh()
print("Cookie 加载成功")
return driver
except FileNotFoundError:
print("Cookie 文件不存在,需要重新登录")
return None
2.3 采集微博信息
from bs4 import BeautifulSoup
def get_weibo_posts(driver, user_id, max_pages=10):
"""获取微博帖子"""
posts = []
for page in range(1, max_pages + 1):
url = f"https://weibo.com/u/{user_id}?page={page}"
driver.get(url)
time.sleep(3) # 等待加载
# 获取页面源码
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# 查找微博卡片
weibo_cards = soup.find_all('div', class_='WB_cardwrap')
for card in weibo_cards:
try:
post = {
'post_id': card.get('mid', ''),
'content': '',
'publish_time': '',
'like_count': 0,
'comment_count': 0,
'repost_count': 0,
'images': []
}
# 提取内容
content_div = card.find('div', class_='WB_text')
if content_div:
post['content'] = content_div.get_text(strip=True)
# 提取时间
time_tag = card.find('a', class_='S_txt2')
if time_tag:
post['publish_time'] = time_tag.text.strip()
# 提取互动数据
action_data = card.find('div', class_='WB_row_line')
if action_data:
like_tag = action_data.find('span', class_='WB_like')
if like_tag:
post['like_count'] = int(like_tag.text.strip()) if like_tag.text.strip().isdigit() else 0
comment_tag = action_data.find('span', class_='WB_comment')
if comment_tag:
post['comment_count'] = int(comment_tag.text.strip()) if comment_tag.text.strip().isdigit() else 0
repost_tag = action_data.find('span', class_='WB_repost')
if repost_tag:
post['repost_count'] = int(repost_tag.text.strip()) if repost_tag.text.strip().isdigit() else 0
# 提取图片
img_list = card.find_all('img', class_='WB_img')
post['images'] = [img.get('src', '') for img in img_list]
posts.append(post)
print(f"采集: {post['content'][:30]}...")
except Exception as e:
print(f"解析失败: {e}")
continue
print(f"第 {page} 页完成,共 {len(weibo_cards)} 条微博")
return posts
2.4 搜索话题
def search_weibo_topic(driver, keyword, pages=5):
"""搜索微博话题"""
results = []
for page in range(1, pages + 1):
url = f"https://s.weibo.com/weibo?q={keyword}&page={page}"
driver.get(url)
time.sleep(3)
soup = BeautifulSoup(driver.page_source, 'html.parser')
# 查找搜索结果
search_items = soup.find_all('div', class_='card-wrap')
for item in search_items:
try:
result = {
'user_name': '',
'user_url': '',
'content': '',
'publish_time': '',
'like_count': 0,
'comment_count': 0
}
# 用户信息
user_link = item.find('a', class_='name')
if user_link:
result['user_name'] = user_link.text.strip()
result['user_url'] = user_link['href']
# 内容
content_div = item.find('p', class_='txt')
if content_div:
result['content'] = content_div.get_text(strip=True)
# 时间
time_tag = item.find('a', class_='date')
if time_tag:
result['publish_time'] = time_tag.text.strip()
# 互动数据
action_list = item.find('div', class_='card-act')
if action_list:
spans = action_list.find_all('span')
if len(spans) >= 2:
result['like_count'] = int(spans[0].text.strip()) if spans[0].text.strip().isdigit() else 0
result['comment_count'] = int(spans[1].text.strip()) if spans[1].text.strip().isdigit() else 0
results.append(result)
except Exception as e:
continue
print(f"搜索第 {page} 页完成")
return results
三、小红书爬虫实现
3.1 采集笔记列表
import requests
import json
def get_xiaohongshu_notes(keyword, pages=5):
"""获取小红书笔记"""
notes = []
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://www.xiaohongshu.com/'
}
for page in range(1, pages + 1):
url = f"https://www.xiaohongshu.com/web_api/sns/v1/search/notes"
params = {
'keyword': keyword,
'page': page,
'page_size': 20,
'search_id': generate_search_id()
}
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if 'data' in data and 'items' in data['data']:
for item in data['data']['items']:
note = {
'note_id': item.get('id', ''),
'title': item.get('display_title', ''),
'desc': item.get('desc', ''),
'user_name': item.get('user', {}).get('nickname', ''),
'like_count': item.get('liked_count', 0),
'collect_count': item.get('collected_count', 0),
'comment_count': item.get('comment_count', 0),
'cover_url': item.get('cover', {}).get('url_default', ''),
'type': item.get('type', '')
}
notes.append(note)
print(f"采集: {note['title'][:30]}...")
print(f"第 {page} 页完成")
except Exception as e:
print(f"请求失败: {e}")
return notes
def generate_search_id():
"""生成搜索 ID"""
import uuid
return str(uuid.uuid4())
3.2 采集笔记详情
def get_xiaohongshu_note_detail(note_id):
"""获取小红书笔记详情"""
url = f"https://www.xiaohongshu.com/web_api/sns/v1/note/{note_id}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://www.xiaohongshu.com/'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if 'data' in data:
note = data['data']
detail = {
'note_id': note.get('id', ''),
'title': note.get('title', ''),
'desc': note.get('desc', ''),
'type': note.get('type', ''),
'user_id': note.get('user', {}).get('user_id', ''),
'user_name': note.get('user', {}).get('nickname', ''),
'like_count': note.get('liked_count', 0),
'collect_count': note.get('collected_count', 0),
'comment_count': note.get('comment_count', 0),
'share_count': note.get('share_count', 0),
'view_count': note.get('view_count', 0),
'publish_time': note.get('time', ''),
'images': [],
'tags': []
}
# 提取图片
if 'image_list' in note:
detail['images'] = [img.get('url_default', '') for img in note['image_list']]
# 提取标签
if 'tag_list' in note:
detail['tags'] = [tag.get('name', '') for tag in note['tag_list']]
return detail
except Exception as e:
print(f"获取详情失败: {e}")
return None
四、数据分析和可视化
4.1 情感分析
from snownlp import SnowNLP
def analyze_sentiment(text):
"""情感分析"""
s = SnowNLP(text)
return {
'sentiment': s.sentiments, # 0-1,越接近1越正面
'keywords': s.keywords(10) # 提取关键词
}
# 使用示例
post_content = "今天天气真好,心情很棒!"
result = analyze_sentiment(post_content)
print(f"情感值: {result['sentiment']}")
print(f"关键词: {result['keywords']}")
4.2 热门话题分析
from collections import Counter
import re
def extract_hashtags(text):
"""提取话题标签"""
pattern = r'#([^#]+)#'
return re.findall(pattern, text)
def analyze_trends(posts):
"""分析热门话题"""
all_tags = []
for post in posts:
tags = extract_hashtags(post['content'])
all_tags.extend(tags)
# 统计标签频率
tag_counter = Counter(all_tags)
return tag_counter.most_common(20)
4.3 用户影响力分析
def calculate_influence_score(post):
"""计算影响力得分"""
like_weight = 1
comment_weight = 5
repost_weight = 10
score = (
post['like_count'] * like_weight +
post['comment_count'] * comment_weight +
post['repost_count'] * repost_weight
)
return score
def analyze_user_influence(posts):
"""分析用户影响力"""
user_scores = {}
for post in posts:
user_id = post.get('user_id', '')
if user_id not in user_scores:
user_scores[user_id] = {
'posts': 0,
'total_likes': 0,
'total_comments': 0,
'total_reposts': 0
}
user_scores[user_id]['posts'] += 1
user_scores[user_id]['total_likes'] += post['like_count']
user_scores[user_id]['total_comments'] += post['comment_count']
user_scores[user_id]['total_reposts'] += post['repost_count']
# 计算综合得分
results = []
for user_id, data in user_scores.items():
avg_influence = (
data['total_likes'] +
data['total_comments'] * 5 +
data['total_reposts'] * 10
) / data['posts']
results.append({
'user_id': user_id,
'posts_count': data['posts'],
'avg_influence': avg_influence
})
# 按影响力排序
results.sort(key=lambda x: x['avg_influence'], reverse=True)
return results[:20] # 返回 Top 20
五、数据存储
5.1 存储到 MongoDB
from pymongo import MongoClient
from datetime import datetime
def save_to_mongodb(data, collection_name='social_posts'):
"""保存到 MongoDB"""
client = MongoClient('mongodb://localhost:27017/')
db = client['social_media']
collection = db[collection_name]
# 添加时间戳
data['crawled_at'] = datetime.now()
# 插入数据
result = collection.insert_one(data)
print(f"数据已保存,ID: {result.inserted_id}")
return result.inserted_id
# 批量保存
def save_batch_to_mongodb(posts, collection_name='social_posts'):
"""批量保存到 MongoDB"""
client = MongoClient('mongodb://localhost:27017/')
db = client['social_media']
collection = db[collection_name]
# 添加时间戳
for post in posts:
post['crawled_at'] = datetime.now()
# 批量插入
result = collection.insert_many(posts)
print(f"批量保存完成,共 {len(result.inserted_ids)} 条")
return result.inserted_ids
六、反爬虫应对
6.1 请求频率控制
import time
import random
class RateController:
"""请求频率控制器"""
def __init__(self, min_delay=2, max_delay=5):
self.min_delay = min_delay
self.max_delay = max_delay
def wait(self):
"""随机等待"""
delay = random.uniform(self.min_delay, self.max_delay)
time.sleep(delay)
def wait_with_jitter(self, base_delay):
"""带抖动的等待"""
jitter = random.uniform(-0.5, 0.5)
actual_delay = max(1, base_delay + jitter)
time.sleep(actual_delay)
# 使用
rate_controller = RateController(min_delay=3, max_delay=8)
for url in urls:
response = requests.get(url)
rate_controller.wait()
6.2 IP 代理池
class ProxyPool:
"""简单的代理池"""
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_proxy(self):
"""获取下一个代理"""
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# 使用
proxies = [
{'http': 'http://proxy1.example.com:8080'},
{'http': 'http://proxy2.example.com:8080'},
{'http': 'http://proxy3.example.com:8080'}
]
proxy_pool = ProxyPool(proxies)
proxy = proxy_pool.get_proxy()
response = requests.get(url, proxies=proxy)
七、总结
本文介绍了社交媒体数据采集的完整流程,包括:
- 微博和小红书的登录和数据采集方法
- 用户发帖、评论、互动数据的采集
- 情感分析和话题挖掘
- 用户影响力评估
- 反爬虫应对策略
使用 EasySpider 工具:可以使用 EasySpider 的 JSON 格式化工具分析社交媒体 API 返回的数据结构,快速定位需要提取的字段。
重要提醒:社交媒体数据采集涉及用户隐私和平台规则,请严格遵守相关法律法规和平台服务条款。本文仅供学习参考,切勿用于商业用途或侵犯他人权益。