简介:本文将详细介绍如何从新闻资讯网站采集实时新闻数据,包括新闻标题、正文、发布时间、分类等信息。我们将实现一个自动化的新闻采集系统,支持增量更新和热点追踪功能。
一、需求分析
采集目标:
- 新闻列表(标题、摘要、链接、发布时间)
- 新闻详情(正文内容、作者、来源)
- 新闻分类(科技、财经、娱乐、体育等)
- 相关推荐和评论数
功能特性:
- 定时自动采集(每小时更新)
- 去重处理(避免重复采集)
- 增量更新(只采集新内容)
- 热点分析(统计热门关键词)
二、技术实现
2.1 基础配置
import requests
from bs4 import BeautifulSoup
import hashlib
import sqlite3
from datetime import datetime
import time
import random
# 配置参数
CONFIG = {
'base_url': 'https://news.example.com',
'categories': ['tech', 'finance', 'entertainment', 'sports'],
'update_interval': 3600, # 1小时更新一次
'request_delay': (1, 3), # 请求延迟范围
'max_retries': 3
}
# 请求头
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
# 创建 Session
session = requests.Session()
session.headers.update(HEADERS)
2.2 数据库设计
def init_database(db_file='news.db'):
"""初始化数据库"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# 新闻表
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url_hash TEXT UNIQUE,
url TEXT,
title TEXT,
category TEXT,
summary TEXT,
content TEXT,
author TEXT,
source TEXT,
publish_time TEXT,
view_count INTEGER,
comment_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# URL 索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_url_hash ON news(url_hash)')
# 时间索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_publish_time ON news(publish_time)')
# 分类索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON news(category)')
conn.commit()
conn.close()
print("数据库初始化完成")
2.3 新闻列表采集
def get_news_list(category, page=1):
"""获取新闻列表"""
url = f"{CONFIG['base_url']}/{category}/page/{page}"
for attempt in range(CONFIG['max_retries']):
try:
response = session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 查找新闻项
news_items = soup.find_all('div', class_='news-item')
news_list = []
for item in news_items:
try:
news = {
'url': item.find('a')['href'],
'title': item.find('h3', class_='title').text.strip(),
'summary': item.find('p', class_='summary').text.strip() if item.find('p', class_='summary') else '',
'publish_time': item.find('span', class_='time').text.strip(),
'view_count': int(item.find('span', class_='views').text.strip()) if item.find('span', class_='views') else 0,
'comment_count': int(item.find('span', class_='comments').text.strip()) if item.find('span', class_='comments') else 0,
'category': category
}
news_list.append(news)
except (AttributeError, ValueError) as e:
continue
print(f"采集 {category} 第 {page} 页: {len(news_list)} 条")
time.sleep(random.uniform(*CONFIG['request_delay']))
return news_list
except Exception as e:
print(f"请求失败 (尝试 {attempt + 1}/{CONFIG['max_retries']}): {e}")
if attempt < CONFIG['max_retries'] - 1:
time.sleep(5)
return []
2.4 新闻详情采集
def get_news_detail(url):
"""获取新闻详情"""
full_url = f"{CONFIG['base_url']}{url}" if not url.startswith('http') else url
try:
response = session.get(full_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取详情
detail = {
'content': '',
'author': '',
'source': ''
}
# 正文内容
content_div = soup.find('div', class_='article-content')
if content_div:
# 移除广告等无关元素
for ad in content_div.find_all('div', class_='ad'):
ad.decompose()
detail['content'] = content_div.get_text(separator='\n', strip=True)
# 作者信息
author_tag = soup.find('span', class_='author')
if author_tag:
detail['author'] = author_tag.text.strip()
# 来源信息
source_tag = soup.find('span', class_='source')
if source_tag:
detail['source'] = source_tag.text.strip()
return detail
except Exception as e:
print(f"获取详情失败: {e}")
return {}
2.5 去重和保存
def save_news(news_item, db_file='news.db'):
"""保存新闻(去重)"""
# 生成 URL 哈希
url_hash = hashlib.md5(news_item['url'].encode()).hexdigest()
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
try:
# 检查是否已存在
cursor.execute('SELECT id FROM news WHERE url_hash = ?', (url_hash,))
if cursor.fetchone():
print(f"新闻已存在: {news_item['title'][:30]}...")
return False
# 插入新数据
cursor.execute('''
INSERT INTO news (
url_hash, url, title, category, summary, content,
author, source, publish_time, view_count, comment_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
url_hash,
news_item['url'],
news_item['title'],
news_item['category'],
news_item['summary'],
news_item['content'],
news_item['author'],
news_item['source'],
news_item['publish_time'],
news_item['view_count'],
news_item['comment_count']
))
conn.commit()
print(f"保存成功: {news_item['title'][:30]}...")
return True
except Exception as e:
print(f"保存失败: {e}")
return False
finally:
conn.close()
三、增量采集
3.1 获取上次采集时间
def get_last_crawl_time(db_file='news.db'):
"""获取上次采集时间"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute('SELECT MAX(publish_time) FROM news')
result = cursor.fetchone()[0]
conn.close()
if result:
return datetime.strptime(result, '%Y-%m-%d %H:%M:%S')
return None
3.2 判断是否需要采集
def should_crawl(news_item, last_crawl_time):
"""判断是否需要采集"""
if last_crawl_time is None:
return True
try:
# 解析发布时间
publish_time = parse_publish_time(news_item['publish_time'])
if publish_time and publish_time > last_crawl_time:
return True
except Exception as e:
print(f"时间解析失败: {e}")
return False
def parse_publish_time(time_str):
"""解析发布时间字符串"""
now = datetime.now()
# 处理相对时间(如:2小时前、30分钟前)
if '小时前' in time_str:
hours = int(time_str.replace('小时前', ''))
return now.replace(hour=now.hour - hours)
elif '分钟前' in time_str:
minutes = int(time_str.replace('分钟前', ''))
return now.replace(minute=now.minute - minutes)
elif '刚刚' in time_str:
return now
# 处理绝对时间
try:
return datetime.strptime(time_str, '%Y-%m-%d %H:%M')
except ValueError:
try:
return datetime.strptime(time_str, '%Y-%m-%d')
except ValueError:
return None
四、热点分析
4.1 提取关键词
import jieba
import jieba.analyse
from collections import Counter
def extract_keywords(text, topk=10):
"""提取关键词"""
# 使用 TF-IDF 提取关键词
keywords = jieba.analyse.extract_tags(
text,
topK=topk,
withWeight=True
)
return keywords
def analyze_hot_topics(db_file='news.db', days=1):
"""分析热门话题"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# 获取最近N天的新闻
cursor.execute('''
SELECT title, content FROM news
WHERE created_at >= datetime('now', '-{} days')
'''.format(days))
rows = cursor.fetchall()
conn.close()
# 提取所有关键词
all_keywords = []
for title, content in rows:
text = title + ' ' + content
keywords = extract_keywords(text, topk=5)
all_keywords.extend([kw[0] for kw in keywords])
# 统计频率
keyword_counter = Counter(all_keywords)
return keyword_counter.most_common(20)
4.2 分类统计
def get_category_stats(db_file='news.db'):
"""获取分类统计"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# 各分类新闻数量
cursor.execute('''
SELECT category, COUNT(*) as count
FROM news
GROUP BY category
ORDER BY count DESC
''')
stats = cursor.fetchall()
conn.close()
return stats
五、定时任务
import schedule
import threading
import time
def crawl_news():
"""采集新闻任务"""
print(f"\n{'='*50}")
print(f"开始采集新闻 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*50}\n")
# 获取上次采集时间
last_crawl_time = get_last_crawl_time()
print(f"上次采集时间: {last_crawl_time}\n")
total_new = 0
total_updated = 0
# 遍历所有分类
for category in CONFIG['categories']:
print(f"\n采集分类: {category}")
print("-" * 30)
# 获取列表(假设最多10页)
for page in range(1, 11):
news_list = get_news_list(category, page)
if not news_list:
break
for news_item in news_list:
# 判断是否需要采集
if last_crawl_time and not should_crawl(news_item, last_crawl_time):
print(f"跳过旧新闻: {news_item['title'][:30]}...")
continue
# 获取详情
detail = get_news_detail(news_item['url'])
news_item.update(detail)
# 保存
if save_news(news_item):
total_new += 1
total_updated += 1
# 随机延迟
time.sleep(random.uniform(*CONFIG['request_delay']))
print(f"\n{'='*50}")
print(f"采集完成!新增: {total_new} 条, 更新: {total_updated} 条")
print(f"{'='*50}\n")
# 分析热点
print("分析热门话题...")
hot_topics = analyze_hot_topics(days=1)
print("Top 10 热门话题:")
for i, (topic, count) in enumerate(hot_topics[:10], 1):
print(f"{i}. {topic}: {count} 次")
def run_scheduler():
"""运行定时任务"""
# 立即执行一次
crawl_news()
# 设置定时任务
schedule.every(CONFIG['update_interval']).seconds.do(crawl_news)
print(f"定时任务已设置,每 {CONFIG['update_interval']} 秒执行一次")
# 持续运行
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == '__main__':
# 初始化数据库
init_database()
# 启动定时任务
run_scheduler()
六、数据导出
6.1 导出为 CSV
import pandas as pd
def export_to_csv(db_file='news.db', output_file='news_export.csv'):
"""导出为 CSV"""
conn = sqlite3.connect(db_file)
df = pd.read_sql_query('SELECT * FROM news ORDER BY publish_time DESC', conn)
conn.close()
df.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"数据已导出到 {output_file}")
return df
6.2 生成日报
def generate_daily_report(db_file='news.db', output_file='daily_report.txt'):
"""生成日报"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
today = datetime.now().strftime('%Y-%m-%d')
# 获取今日数据
cursor.execute('''
SELECT * FROM news
WHERE DATE(created_at) = ?
ORDER BY publish_time DESC
''', (today,))
rows = cursor.fetchall()
conn.close()
# 生成报告
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"新闻采集日报\n")
f.write(f"{'='*50}\n")
f.write(f"日期: {today}\n")
f.write(f"采集数量: {len(rows)}\n\n")
# 分类统计
f.write(f"分类统计:\n")
f.write(f"{'-'*30}\n")
categories = [row[3] for row in rows]
from collections import Counter
cat_stats = Counter(categories)
for cat, count in cat_stats.most_common():
f.write(f"{cat}: {count}\n")
f.write(f"\n{'='*50}\n\n")
f.write(f"新闻列表:\n")
f.write(f"{'-'*50}\n")
for i, row in enumerate(rows[:50], 1): # 只显示前50条
f.write(f"{i}. [{row[3]}] {row[4]}\n")
f.write(f" {row[5]}\n\n")
print(f"日报已生成: {output_file}")
七、总结
本文实现了一个完整的新闻采集系统,包括:
- 多分类新闻列表采集
- 新闻详情采集和解析
- 去重机制(基于 URL 哈希)
- 增量更新(基于时间判断)
- 定时自动采集
- 热点话题分析
- 数据导出和报表生成
扩展建议:可以添加邮件通知功能,当检测到重大新闻或关键词时自动发送提醒。也可以接入消息队列,实现分布式采集。
版权提醒:采集新闻数据时请遵守版权法和服务条款,仅用于学习和研究目的。转载新闻内容时请注明来源。