社交媒体数据采集实战

微博、小红书等平台数据采集

简介:本文将分享社交媒体平台的数据采集经验,以微博和小红书为例,讲解如何采集用户发帖、评论、点赞等数据。社交媒体爬虫具有独特的挑战,如登录验证、动态加载、反爬机制等。

一、项目背景

应用场景:

  • 舆情监控和分析
  • 品牌声誉跟踪
  • 用户行为研究
  • 热点话题发现
  • 竞品分析

采集目标:

  • 用户发帖内容
  • 互动数据(点赞、评论、转发)
  • 用户信息(粉丝数、关注数)
  • 话题标签
  • 发布时间

二、微博爬虫实现

2.1 登录处理

from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time import pickle def weibo_login(username, password): """微博登录""" # 配置浏览器 options = webdriver.ChromeOptions() options.add_argument('--headless') # 无头模式 options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') driver = webdriver.Chrome(options=options) try: # 访问登录页面 driver.get("https://weibo.com/login.php") # 等待登录表单加载 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.NAME, "username")) ) # 输入用户名和密码 driver.find_element(By.NAME, "username").send_keys(username) driver.find_element(By.NAME, "password").send_keys(password) # 点击登录按钮 driver.find_element(By.CLASS_NAME, "btn_login").click() # 等待登录成功 WebDriverWait(driver, 20).until( EC.url_contains("home") ) print("登录成功") # 保存 Cookie cookies = driver.get_cookies() with open('weibo_cookies.pkl', 'wb') as f: pickle.dump(cookies, f) return driver except Exception as e: print(f"登录失败: {e}") driver.quit() return None

2.2 使用 Cookie 登录

def load_weibo_cookies(): """加载 Cookie""" options = webdriver.ChromeOptions() options.add_argument('--headless') driver = webdriver.Chrome(options=options) # 先访问微博 driver.get("https://weibo.com") # 加载 Cookie try: with open('weibo_cookies.pkl', 'rb') as f: cookies = pickle.load(f) for cookie in cookies: driver.add_cookie(cookie) # 刷新页面使 Cookie 生效 driver.refresh() print("Cookie 加载成功") return driver except FileNotFoundError: print("Cookie 文件不存在,需要重新登录") return None

2.3 采集微博信息

from bs4 import BeautifulSoup def get_weibo_posts(driver, user_id, max_pages=10): """获取微博帖子""" posts = [] for page in range(1, max_pages + 1): url = f"https://weibo.com/u/{user_id}?page={page}" driver.get(url) time.sleep(3) # 等待加载 # 获取页面源码 page_source = driver.page_source soup = BeautifulSoup(page_source, 'html.parser') # 查找微博卡片 weibo_cards = soup.find_all('div', class_='WB_cardwrap') for card in weibo_cards: try: post = { 'post_id': card.get('mid', ''), 'content': '', 'publish_time': '', 'like_count': 0, 'comment_count': 0, 'repost_count': 0, 'images': [] } # 提取内容 content_div = card.find('div', class_='WB_text') if content_div: post['content'] = content_div.get_text(strip=True) # 提取时间 time_tag = card.find('a', class_='S_txt2') if time_tag: post['publish_time'] = time_tag.text.strip() # 提取互动数据 action_data = card.find('div', class_='WB_row_line') if action_data: like_tag = action_data.find('span', class_='WB_like') if like_tag: post['like_count'] = int(like_tag.text.strip()) if like_tag.text.strip().isdigit() else 0 comment_tag = action_data.find('span', class_='WB_comment') if comment_tag: post['comment_count'] = int(comment_tag.text.strip()) if comment_tag.text.strip().isdigit() else 0 repost_tag = action_data.find('span', class_='WB_repost') if repost_tag: post['repost_count'] = int(repost_tag.text.strip()) if repost_tag.text.strip().isdigit() else 0 # 提取图片 img_list = card.find_all('img', class_='WB_img') post['images'] = [img.get('src', '') for img in img_list] posts.append(post) print(f"采集: {post['content'][:30]}...") except Exception as e: print(f"解析失败: {e}") continue print(f"第 {page} 页完成,共 {len(weibo_cards)} 条微博") return posts

2.4 搜索话题

def search_weibo_topic(driver, keyword, pages=5): """搜索微博话题""" results = [] for page in range(1, pages + 1): url = f"https://s.weibo.com/weibo?q={keyword}&page={page}" driver.get(url) time.sleep(3) soup = BeautifulSoup(driver.page_source, 'html.parser') # 查找搜索结果 search_items = soup.find_all('div', class_='card-wrap') for item in search_items: try: result = { 'user_name': '', 'user_url': '', 'content': '', 'publish_time': '', 'like_count': 0, 'comment_count': 0 } # 用户信息 user_link = item.find('a', class_='name') if user_link: result['user_name'] = user_link.text.strip() result['user_url'] = user_link['href'] # 内容 content_div = item.find('p', class_='txt') if content_div: result['content'] = content_div.get_text(strip=True) # 时间 time_tag = item.find('a', class_='date') if time_tag: result['publish_time'] = time_tag.text.strip() # 互动数据 action_list = item.find('div', class_='card-act') if action_list: spans = action_list.find_all('span') if len(spans) >= 2: result['like_count'] = int(spans[0].text.strip()) if spans[0].text.strip().isdigit() else 0 result['comment_count'] = int(spans[1].text.strip()) if spans[1].text.strip().isdigit() else 0 results.append(result) except Exception as e: continue print(f"搜索第 {page} 页完成") return results

三、小红书爬虫实现

3.1 采集笔记列表

import requests import json def get_xiaohongshu_notes(keyword, pages=5): """获取小红书笔记""" notes = [] headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://www.xiaohongshu.com/' } for page in range(1, pages + 1): url = f"https://www.xiaohongshu.com/web_api/sns/v1/search/notes" params = { 'keyword': keyword, 'page': page, 'page_size': 20, 'search_id': generate_search_id() } try: response = requests.get(url, headers=headers, params=params, timeout=10) response.raise_for_status() data = response.json() if 'data' in data and 'items' in data['data']: for item in data['data']['items']: note = { 'note_id': item.get('id', ''), 'title': item.get('display_title', ''), 'desc': item.get('desc', ''), 'user_name': item.get('user', {}).get('nickname', ''), 'like_count': item.get('liked_count', 0), 'collect_count': item.get('collected_count', 0), 'comment_count': item.get('comment_count', 0), 'cover_url': item.get('cover', {}).get('url_default', ''), 'type': item.get('type', '') } notes.append(note) print(f"采集: {note['title'][:30]}...") print(f"第 {page} 页完成") except Exception as e: print(f"请求失败: {e}") return notes def generate_search_id(): """生成搜索 ID""" import uuid return str(uuid.uuid4())

3.2 采集笔记详情

def get_xiaohongshu_note_detail(note_id): """获取小红书笔记详情""" url = f"https://www.xiaohongshu.com/web_api/sns/v1/note/{note_id}" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://www.xiaohongshu.com/' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() if 'data' in data: note = data['data'] detail = { 'note_id': note.get('id', ''), 'title': note.get('title', ''), 'desc': note.get('desc', ''), 'type': note.get('type', ''), 'user_id': note.get('user', {}).get('user_id', ''), 'user_name': note.get('user', {}).get('nickname', ''), 'like_count': note.get('liked_count', 0), 'collect_count': note.get('collected_count', 0), 'comment_count': note.get('comment_count', 0), 'share_count': note.get('share_count', 0), 'view_count': note.get('view_count', 0), 'publish_time': note.get('time', ''), 'images': [], 'tags': [] } # 提取图片 if 'image_list' in note: detail['images'] = [img.get('url_default', '') for img in note['image_list']] # 提取标签 if 'tag_list' in note: detail['tags'] = [tag.get('name', '') for tag in note['tag_list']] return detail except Exception as e: print(f"获取详情失败: {e}") return None

四、数据分析和可视化

4.1 情感分析

from snownlp import SnowNLP def analyze_sentiment(text): """情感分析""" s = SnowNLP(text) return { 'sentiment': s.sentiments, # 0-1,越接近1越正面 'keywords': s.keywords(10) # 提取关键词 } # 使用示例 post_content = "今天天气真好,心情很棒!" result = analyze_sentiment(post_content) print(f"情感值: {result['sentiment']}") print(f"关键词: {result['keywords']}")

4.2 热门话题分析

from collections import Counter import re def extract_hashtags(text): """提取话题标签""" pattern = r'#([^#]+)#' return re.findall(pattern, text) def analyze_trends(posts): """分析热门话题""" all_tags = [] for post in posts: tags = extract_hashtags(post['content']) all_tags.extend(tags) # 统计标签频率 tag_counter = Counter(all_tags) return tag_counter.most_common(20)

4.3 用户影响力分析

def calculate_influence_score(post): """计算影响力得分""" like_weight = 1 comment_weight = 5 repost_weight = 10 score = ( post['like_count'] * like_weight + post['comment_count'] * comment_weight + post['repost_count'] * repost_weight ) return score def analyze_user_influence(posts): """分析用户影响力""" user_scores = {} for post in posts: user_id = post.get('user_id', '') if user_id not in user_scores: user_scores[user_id] = { 'posts': 0, 'total_likes': 0, 'total_comments': 0, 'total_reposts': 0 } user_scores[user_id]['posts'] += 1 user_scores[user_id]['total_likes'] += post['like_count'] user_scores[user_id]['total_comments'] += post['comment_count'] user_scores[user_id]['total_reposts'] += post['repost_count'] # 计算综合得分 results = [] for user_id, data in user_scores.items(): avg_influence = ( data['total_likes'] + data['total_comments'] * 5 + data['total_reposts'] * 10 ) / data['posts'] results.append({ 'user_id': user_id, 'posts_count': data['posts'], 'avg_influence': avg_influence }) # 按影响力排序 results.sort(key=lambda x: x['avg_influence'], reverse=True) return results[:20] # 返回 Top 20

五、数据存储

5.1 存储到 MongoDB

from pymongo import MongoClient from datetime import datetime def save_to_mongodb(data, collection_name='social_posts'): """保存到 MongoDB""" client = MongoClient('mongodb://localhost:27017/') db = client['social_media'] collection = db[collection_name] # 添加时间戳 data['crawled_at'] = datetime.now() # 插入数据 result = collection.insert_one(data) print(f"数据已保存,ID: {result.inserted_id}") return result.inserted_id # 批量保存 def save_batch_to_mongodb(posts, collection_name='social_posts'): """批量保存到 MongoDB""" client = MongoClient('mongodb://localhost:27017/') db = client['social_media'] collection = db[collection_name] # 添加时间戳 for post in posts: post['crawled_at'] = datetime.now() # 批量插入 result = collection.insert_many(posts) print(f"批量保存完成,共 {len(result.inserted_ids)} 条") return result.inserted_ids

六、反爬虫应对

6.1 请求频率控制

import time import random class RateController: """请求频率控制器""" def __init__(self, min_delay=2, max_delay=5): self.min_delay = min_delay self.max_delay = max_delay def wait(self): """随机等待""" delay = random.uniform(self.min_delay, self.max_delay) time.sleep(delay) def wait_with_jitter(self, base_delay): """带抖动的等待""" jitter = random.uniform(-0.5, 0.5) actual_delay = max(1, base_delay + jitter) time.sleep(actual_delay) # 使用 rate_controller = RateController(min_delay=3, max_delay=8) for url in urls: response = requests.get(url) rate_controller.wait()

6.2 IP 代理池

class ProxyPool: """简单的代理池""" def __init__(self, proxies): self.proxies = proxies self.current_index = 0 def get_proxy(self): """获取下一个代理""" proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy # 使用 proxies = [ {'http': 'http://proxy1.example.com:8080'}, {'http': 'http://proxy2.example.com:8080'}, {'http': 'http://proxy3.example.com:8080'} ] proxy_pool = ProxyPool(proxies) proxy = proxy_pool.get_proxy() response = requests.get(url, proxies=proxy)

七、总结

本文介绍了社交媒体数据采集的完整流程,包括:

  • 微博和小红书的登录和数据采集方法
  • 用户发帖、评论、互动数据的采集
  • 情感分析和话题挖掘
  • 用户影响力评估
  • 反爬虫应对策略

使用 EasySpider 工具:可以使用 EasySpider 的 JSON 格式化工具分析社交媒体 API 返回的数据结构,快速定位需要提取的字段。

重要提醒:社交媒体数据采集涉及用户隐私和平台规则,请严格遵守相关法律法规和平台服务条款。本文仅供学习参考,切勿用于商业用途或侵犯他人权益。