基于Python的新闻爬虫:实时追踪行业动态

发布于:2025-07-24 ⋅ 阅读:(19) ⋅ 点赞:(0)

引言

在信息时代,行业动态瞬息万变。金融从业者需要实时了解政策变化,科技公司需要跟踪技术趋势,市场营销人员需要掌握竞品动向。传统的人工信息收集方式效率低下,难以满足实时性需求。Python爬虫技术为解决这一问题提供了高效方案。

本文将详细介绍如何使用Python构建新闻爬虫系统,实现行业动态的实时追踪。我们将从技术选型、爬虫实现、数据存储到可视化分析进行完整讲解,并提供可运行的代码示例。

1. 技术方案设计

1.1 系统架构

完整的新闻追踪系统包含以下组件:

  • 爬虫模块:负责网页抓取和数据提取
  • 存储模块:结构化存储采集的数据
  • 分析模块:数据处理和特征提取
  • 可视化模块:数据展示和趋势分析
  • 通知模块:重要新闻实时提醒

1.2 技术选型

组件 技术方案 优势
网页抓取 Requests/Scrapy 高效稳定
HTML解析 BeautifulSoup/lxml 解析精准
数据存储 MySQL/MongoDB 结构化存储
数据分析 Pandas/Numpy 处理便捷
可视化 Matplotlib/PyEcharts 直观展示
定时任务 APScheduler 自动化运行

2. 爬虫实现

2.1 基础爬虫实现

我们以36氪快讯(https://36kr.com/newsflashes)为例,抓取实时行业快讯。

import requests
from bs4 import BeautifulSoup
import pandas as pd

def fetch_36kr_news():
    url = "https://36kr.com/newsflashes"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.text, 'html.parser')
    
    news_items = []
    for item in soup.select('.newsflash-item'):
        title = item.select_one('.item-title').text.strip()
        time = item.select_one('.time').text.strip()
        abstract = item.select_one('.item-desc').text.strip()
        link = "https://36kr.com" + item.select_one('a')['href']
        
        news_items.append({
            "title": title,
            "time": time,
            "abstract": abstract,
            "link": link
        })
    
    return news_items

# 测试抓取
news_data = fetch_36kr_news()
df = pd.DataFrame(news_data)
print(df.head())

2.2 反反爬策略

为防止被网站封禁,需要采取以下措施:

  1. 设置随机User-Agent
  2. 使用代理IP池
  3. 控制请求频率
  4. 处理验证码
from fake_useragent import UserAgent
import random
import time
import requests

# 代理信息
proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"

def get_random_headers():
    ua = UserAgent()
    return {
        "User-Agent": ua.random,
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://www.google.com/"
    }

def fetch_with_retry(url, max_retries=3):
    # 设置代理
    proxyMeta = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"
    proxies = {
        "http": proxyMeta,
        "https": proxyMeta,
    }
    
    for i in range(max_retries):
        try:
            response = requests.get(
                url, 
                headers=get_random_headers(),
                proxies=proxies,
                timeout=10
            )
            if response.status_code == 200:
                return response
            time.sleep(random.uniform(1, 3))
        except requests.exceptions.RequestException as e:
            print(f"Attempt {i+1} failed: {str(e)}")
            time.sleep(5)
    return None

3. 数据存储与管理

3.1 MySQL存储方案

import pymysql
from datetime import datetime

def setup_mysql_db():
    connection = pymysql.connect(
        host='localhost',
        user='root',
        password='yourpassword',
        database='news_monitor'
    )
    
    with connection.cursor() as cursor:
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS industry_news (
            id INT AUTO_INCREMENT PRIMARY KEY,
            title VARCHAR(255) NOT NULL,
            content TEXT,
            publish_time DATETIME,
            source VARCHAR(100),
            url VARCHAR(255),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """)
    connection.commit()
    return connection

def save_to_mysql(news_items):
    conn = setup_mysql_db()
    with conn.cursor() as cursor:
        for item in news_items:
            cursor.execute("""
            INSERT INTO industry_news (title, content, publish_time, source, url)
            VALUES (%s, %s, %s, %s, %s)
            """, (item['title'], item['abstract'], item['time'], '36kr', item['link']))
    conn.commit()
    conn.close()

3.2 数据去重方案

def check_duplicate(title):
    conn = setup_mysql_db()
    with conn.cursor() as cursor:
        cursor.execute("SELECT COUNT(*) FROM industry_news WHERE title = %s", (title,))
        count = cursor.fetchone()[0]
    conn.close()
    return count > 0

4. 数据分析与可视化

4.1 关键词提取

import jieba.analyse
from collections import Counter

def extract_keywords(texts, top_n=20):
    all_text = " ".join(texts)
    keywords = jieba.analyse.extract_tags(all_text, topK=top_n)
    return keywords

# 从数据库读取新闻内容
def get_news_contents():
    conn = setup_mysql_db()
    with conn.cursor() as cursor:
        cursor.execute("SELECT content FROM industry_news")
        contents = [row[0] for row in cursor.fetchall()]
    conn.close()
    return contents

contents = get_news_contents()
keywords = extract_keywords(contents)
print("Top Keywords:", keywords)

4.2 可视化展示

import matplotlib.pyplot as plt
from wordcloud import WordCloud

def generate_wordcloud(keywords):
    wordcloud = WordCloud(
        font_path='simhei.ttf',
        background_color='white',
        width=800,
        height=600
    ).generate(" ".join(keywords))
    
    plt.figure(figsize=(12, 8))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.show()

generate_wordcloud(keywords)

5. 总结

本文介绍了基于Python的新闻爬虫系统实现方案,从数据采集、存储到分析可视化的完整流程。这套系统可以:

  • 实时监控多个新闻源
  • 自动识别重要行业动态
  • 提供数据分析和趋势预测
  • 支持多种通知方式


网站公告

今日签到

点亮在社区的每一天
去签到