新闻资讯网站爬取实战

实时新闻采集、热点追踪

简介:本文将详细介绍如何从新闻资讯网站采集实时新闻数据,包括新闻标题、正文、发布时间、分类等信息。我们将实现一个自动化的新闻采集系统,支持增量更新和热点追踪功能。

一、需求分析

采集目标:

  • 新闻列表(标题、摘要、链接、发布时间)
  • 新闻详情(正文内容、作者、来源)
  • 新闻分类(科技、财经、娱乐、体育等)
  • 相关推荐和评论数

功能特性:

  • 定时自动采集(每小时更新)
  • 去重处理(避免重复采集)
  • 增量更新(只采集新内容)
  • 热点分析(统计热门关键词)

二、技术实现

2.1 基础配置

import requests from bs4 import BeautifulSoup import hashlib import sqlite3 from datetime import datetime import time import random # 配置参数 CONFIG = { 'base_url': 'https://news.example.com', 'categories': ['tech', 'finance', 'entertainment', 'sports'], 'update_interval': 3600, # 1小时更新一次 'request_delay': (1, 3), # 请求延迟范围 'max_retries': 3 } # 请求头 HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } # 创建 Session session = requests.Session() session.headers.update(HEADERS)

2.2 数据库设计

def init_database(db_file='news.db'): """初始化数据库""" conn = sqlite3.connect(db_file) cursor = conn.cursor() # 新闻表 cursor.execute(''' CREATE TABLE IF NOT EXISTS news ( id INTEGER PRIMARY KEY AUTOINCREMENT, url_hash TEXT UNIQUE, url TEXT, title TEXT, category TEXT, summary TEXT, content TEXT, author TEXT, source TEXT, publish_time TEXT, view_count INTEGER, comment_count INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # URL 索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_url_hash ON news(url_hash)') # 时间索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_publish_time ON news(publish_time)') # 分类索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON news(category)') conn.commit() conn.close() print("数据库初始化完成")

2.3 新闻列表采集

def get_news_list(category, page=1): """获取新闻列表""" url = f"{CONFIG['base_url']}/{category}/page/{page}" for attempt in range(CONFIG['max_retries']): try: response = session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # 查找新闻项 news_items = soup.find_all('div', class_='news-item') news_list = [] for item in news_items: try: news = { 'url': item.find('a')['href'], 'title': item.find('h3', class_='title').text.strip(), 'summary': item.find('p', class_='summary').text.strip() if item.find('p', class_='summary') else '', 'publish_time': item.find('span', class_='time').text.strip(), 'view_count': int(item.find('span', class_='views').text.strip()) if item.find('span', class_='views') else 0, 'comment_count': int(item.find('span', class_='comments').text.strip()) if item.find('span', class_='comments') else 0, 'category': category } news_list.append(news) except (AttributeError, ValueError) as e: continue print(f"采集 {category} 第 {page} 页: {len(news_list)} 条") time.sleep(random.uniform(*CONFIG['request_delay'])) return news_list except Exception as e: print(f"请求失败 (尝试 {attempt + 1}/{CONFIG['max_retries']}): {e}") if attempt < CONFIG['max_retries'] - 1: time.sleep(5) return []

2.4 新闻详情采集

def get_news_detail(url): """获取新闻详情""" full_url = f"{CONFIG['base_url']}{url}" if not url.startswith('http') else url try: response = session.get(full_url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # 提取详情 detail = { 'content': '', 'author': '', 'source': '' } # 正文内容 content_div = soup.find('div', class_='article-content') if content_div: # 移除广告等无关元素 for ad in content_div.find_all('div', class_='ad'): ad.decompose() detail['content'] = content_div.get_text(separator='\n', strip=True) # 作者信息 author_tag = soup.find('span', class_='author') if author_tag: detail['author'] = author_tag.text.strip() # 来源信息 source_tag = soup.find('span', class_='source') if source_tag: detail['source'] = source_tag.text.strip() return detail except Exception as e: print(f"获取详情失败: {e}") return {}

2.5 去重和保存

def save_news(news_item, db_file='news.db'): """保存新闻(去重)""" # 生成 URL 哈希 url_hash = hashlib.md5(news_item['url'].encode()).hexdigest() conn = sqlite3.connect(db_file) cursor = conn.cursor() try: # 检查是否已存在 cursor.execute('SELECT id FROM news WHERE url_hash = ?', (url_hash,)) if cursor.fetchone(): print(f"新闻已存在: {news_item['title'][:30]}...") return False # 插入新数据 cursor.execute(''' INSERT INTO news ( url_hash, url, title, category, summary, content, author, source, publish_time, view_count, comment_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( url_hash, news_item['url'], news_item['title'], news_item['category'], news_item['summary'], news_item['content'], news_item['author'], news_item['source'], news_item['publish_time'], news_item['view_count'], news_item['comment_count'] )) conn.commit() print(f"保存成功: {news_item['title'][:30]}...") return True except Exception as e: print(f"保存失败: {e}") return False finally: conn.close()

三、增量采集

3.1 获取上次采集时间

def get_last_crawl_time(db_file='news.db'): """获取上次采集时间""" conn = sqlite3.connect(db_file) cursor = conn.cursor() cursor.execute('SELECT MAX(publish_time) FROM news') result = cursor.fetchone()[0] conn.close() if result: return datetime.strptime(result, '%Y-%m-%d %H:%M:%S') return None

3.2 判断是否需要采集

def should_crawl(news_item, last_crawl_time): """判断是否需要采集""" if last_crawl_time is None: return True try: # 解析发布时间 publish_time = parse_publish_time(news_item['publish_time']) if publish_time and publish_time > last_crawl_time: return True except Exception as e: print(f"时间解析失败: {e}") return False def parse_publish_time(time_str): """解析发布时间字符串""" now = datetime.now() # 处理相对时间(如:2小时前、30分钟前) if '小时前' in time_str: hours = int(time_str.replace('小时前', '')) return now.replace(hour=now.hour - hours) elif '分钟前' in time_str: minutes = int(time_str.replace('分钟前', '')) return now.replace(minute=now.minute - minutes) elif '刚刚' in time_str: return now # 处理绝对时间 try: return datetime.strptime(time_str, '%Y-%m-%d %H:%M') except ValueError: try: return datetime.strptime(time_str, '%Y-%m-%d') except ValueError: return None

四、热点分析

4.1 提取关键词

import jieba import jieba.analyse from collections import Counter def extract_keywords(text, topk=10): """提取关键词""" # 使用 TF-IDF 提取关键词 keywords = jieba.analyse.extract_tags( text, topK=topk, withWeight=True ) return keywords def analyze_hot_topics(db_file='news.db', days=1): """分析热门话题""" conn = sqlite3.connect(db_file) cursor = conn.cursor() # 获取最近N天的新闻 cursor.execute(''' SELECT title, content FROM news WHERE created_at >= datetime('now', '-{} days') '''.format(days)) rows = cursor.fetchall() conn.close() # 提取所有关键词 all_keywords = [] for title, content in rows: text = title + ' ' + content keywords = extract_keywords(text, topk=5) all_keywords.extend([kw[0] for kw in keywords]) # 统计频率 keyword_counter = Counter(all_keywords) return keyword_counter.most_common(20)

4.2 分类统计

def get_category_stats(db_file='news.db'): """获取分类统计""" conn = sqlite3.connect(db_file) cursor = conn.cursor() # 各分类新闻数量 cursor.execute(''' SELECT category, COUNT(*) as count FROM news GROUP BY category ORDER BY count DESC ''') stats = cursor.fetchall() conn.close() return stats

五、定时任务

import schedule import threading import time def crawl_news(): """采集新闻任务""" print(f"\n{'='*50}") print(f"开始采集新闻 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"{'='*50}\n") # 获取上次采集时间 last_crawl_time = get_last_crawl_time() print(f"上次采集时间: {last_crawl_time}\n") total_new = 0 total_updated = 0 # 遍历所有分类 for category in CONFIG['categories']: print(f"\n采集分类: {category}") print("-" * 30) # 获取列表(假设最多10页) for page in range(1, 11): news_list = get_news_list(category, page) if not news_list: break for news_item in news_list: # 判断是否需要采集 if last_crawl_time and not should_crawl(news_item, last_crawl_time): print(f"跳过旧新闻: {news_item['title'][:30]}...") continue # 获取详情 detail = get_news_detail(news_item['url']) news_item.update(detail) # 保存 if save_news(news_item): total_new += 1 total_updated += 1 # 随机延迟 time.sleep(random.uniform(*CONFIG['request_delay'])) print(f"\n{'='*50}") print(f"采集完成!新增: {total_new} 条, 更新: {total_updated} 条") print(f"{'='*50}\n") # 分析热点 print("分析热门话题...") hot_topics = analyze_hot_topics(days=1) print("Top 10 热门话题:") for i, (topic, count) in enumerate(hot_topics[:10], 1): print(f"{i}. {topic}: {count} 次") def run_scheduler(): """运行定时任务""" # 立即执行一次 crawl_news() # 设置定时任务 schedule.every(CONFIG['update_interval']).seconds.do(crawl_news) print(f"定时任务已设置,每 {CONFIG['update_interval']} 秒执行一次") # 持续运行 while True: schedule.run_pending() time.sleep(60) if __name__ == '__main__': # 初始化数据库 init_database() # 启动定时任务 run_scheduler()

六、数据导出

6.1 导出为 CSV

import pandas as pd def export_to_csv(db_file='news.db', output_file='news_export.csv'): """导出为 CSV""" conn = sqlite3.connect(db_file) df = pd.read_sql_query('SELECT * FROM news ORDER BY publish_time DESC', conn) conn.close() df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"数据已导出到 {output_file}") return df

6.2 生成日报

def generate_daily_report(db_file='news.db', output_file='daily_report.txt'): """生成日报""" conn = sqlite3.connect(db_file) cursor = conn.cursor() today = datetime.now().strftime('%Y-%m-%d') # 获取今日数据 cursor.execute(''' SELECT * FROM news WHERE DATE(created_at) = ? ORDER BY publish_time DESC ''', (today,)) rows = cursor.fetchall() conn.close() # 生成报告 with open(output_file, 'w', encoding='utf-8') as f: f.write(f"新闻采集日报\n") f.write(f"{'='*50}\n") f.write(f"日期: {today}\n") f.write(f"采集数量: {len(rows)}\n\n") # 分类统计 f.write(f"分类统计:\n") f.write(f"{'-'*30}\n") categories = [row[3] for row in rows] from collections import Counter cat_stats = Counter(categories) for cat, count in cat_stats.most_common(): f.write(f"{cat}: {count}\n") f.write(f"\n{'='*50}\n\n") f.write(f"新闻列表:\n") f.write(f"{'-'*50}\n") for i, row in enumerate(rows[:50], 1): # 只显示前50条 f.write(f"{i}. [{row[3]}] {row[4]}\n") f.write(f" {row[5]}\n\n") print(f"日报已生成: {output_file}")

七、总结

本文实现了一个完整的新闻采集系统,包括:

  • 多分类新闻列表采集
  • 新闻详情采集和解析
  • 去重机制(基于 URL 哈希)
  • 增量更新(基于时间判断)
  • 定时自动采集
  • 热点话题分析
  • 数据导出和报表生成

扩展建议:可以添加邮件通知功能,当检测到重大新闻或关键词时自动发送提醒。也可以接入消息队列,实现分布式采集。

版权提醒:采集新闻数据时请遵守版权法和服务条款,仅用于学习和研究目的。转载新闻内容时请注明来源。