Kafka与Flink打造流式数据采集方案:以二手房信息为例

发布于:2025-07-16 ⋅ 阅读:(20) ⋅ 点赞:(0)

爬虫代理

一、项目背景:为何房产类数据亟需“边采边处理”

近年来,国内多个城市的存量房市场呈现出波动频繁、挂牌量上升但成交周期拉长的结构性特征。特别是在一线与强二线城市中,房源更新节奏加快,用户浏览行为活跃,价格异动更加频繁。与此同时,政策层面也在不断优化限制措施,鼓励“以旧换新”“首付降低”等手段,进一步提升了市场活跃度。

在这一背景下,关注二手房信息变得尤为重要。不仅是购房者希望第一时间获取“优质房源”,房产平台、数据研究者也希望及时了解某区域、小区或价格段的变动趋势。但传统的数据采集流程,多为定时抓取+离线分析,存在明显延迟——某些房源变动可能已在几小时内完成,事后分析失去参考意义。

本项目尝试搭建一套基于 Kafka 与 Flink 的流式数据处理管道,从数据采集到实时计算再到存储分析,覆盖“从网页到洞察”的全过程,目标是打造一个面向高频变动场景的数据基础架构。


二、采集目标设定

本项目围绕贝壳平台的二手房频道(ke.com/ershoufang),采集北京地区最新房源信息,重点字段包括:

  • 小区名称
  • 总价
  • 面积
  • 单价
  • 地理位置
  • 更新时间

每轮采集抓取前五页搜索结果,确保前一百条热门房源能被完整纳入分析范围,并通过消息队列中转和实时窗口计算,对房价走势、小区热度等进行动态更新。


三、核心技术组件与设计动因

模块 技术工具 功能概述
数据采集 Python + 代理 + Headers设定 实现用户行为模拟与高成功率抓取
消息缓冲 Kafka 解耦采集与处理,提升稳定性
实时计算 Flink 多维窗口聚合与价格趋势计算
数据入库 MySQL 结构化存储分析结果
可视化 Grafana / Python绘图工具 展示挂牌热度、价格变化等指标

与传统“拉取-存储-分析”的方案不同,本项目强调从“数据进入系统开始即处理”,更符合动态市场对数据时效性的要求。


四、模块实现细节

4.1 爬虫脚本设计(Python)

采用 requests + XPath 进行页面解析,配合代理IP池、用户模拟,有效避开平台频控策略。

import requests
from lxml import etree
import json
import random
from kafka import KafkaProducer

# 代理配置(参考亿牛云爬虫代理 www.16yun.cn)
PROXIES = {
    "http": "http://16YUN:16IP@http://proxy.16yun.cn:3100",
    "https": "http://16YUN:16IP@http://proxy.16yun.cn:3100"
}

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64)...",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)..."
]

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda m: json.dumps(m).encode('utf-8')
)

def fetch_listing(url):
    headers = {
        'User-Agent': random.choice(USER_AGENTS),
        'Cookie': 'your_cookie_here'
    }
    response = requests.get(url, headers=headers, proxies=PROXIES, timeout=10)
    html = etree.HTML(response.text)

    listings = html.xpath('//div[@class="info clear"]')
    for li in listings:
        try:
            title = li.xpath('.//div[@class="title"]/a/text()')[0]
            price = li.xpath('.//div[@class="totalPrice"]/span/text()')[0]
            unit_price = li.xpath('.//div[@class="unitPrice"]/span/text()')[0]
            house_info = li.xpath('.//div[@class="houseInfo"]/text()')[0]
            position = li.xpath('.//div[@class="positionInfo"]/a[1]/text()')[0]

            area = house_info.split('|')[1].strip().replace('平米', '')

            result = {
                'community': title,
                'total_price': float(price),
                'unit_price': unit_price,
                'area': float(area),
                'location': position
            }

            producer.send('ershoufang_topic', value=result)
        except Exception as e:
            print(f"解析失败:{e}")

for page in range(1, 6):
    url = f'https://bj.ke.com/ershoufang/pg{page}/'
    fetch_listing(url)

4.2 Flink实时计算逻辑(Java)

使用 Kafka 作为输入流,Flink 执行滑动窗口内的房价聚合操作,并将结果写入数据库。

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>(
    "ershoufang_topic", new SimpleStringSchema(), kafkaProps));

DataStream<Tuple4<String, Double, Double, Integer>> result = stream
    .map(value -> {
        JSONObject obj = new JSONObject(value);
        return Tuple4.of(
            obj.getString("community"),
            obj.getDouble("total_price"),
            obj.getDouble("area"),
            1
        );
    })
    .keyBy(t -> t.f0)
    .window(SlidingProcessingTimeWindows.of(Time.minutes(60), Time.minutes(10)))
    .reduce((v1, v2) -> Tuple4.of(
        v1.f0, v1.f1 + v2.f1, v1.f2 + v2.f2, v1.f3 + v2.f3));

result.addSink(new MySQLSink());

4.3 数据存储与Sink配置

将窗口聚合结果存入结构化数据库中,便于后续使用脚本或可视化平台调用。

public class MySQLSink extends RichSinkFunction<Tuple4<String, Double, Double, Integer>> {
    private Connection conn;
    private PreparedStatement stmt;

    @Override
    public void open(Configuration parameters) {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/real_estate", "user", "pass");
        stmt = conn.prepareStatement("REPLACE INTO stat (community, avg_price, avg_area, count) VALUES (?, ?, ?, ?)");
    }

    @Override
    public void invoke(Tuple4<String, Double, Double, Integer> value, Context context) {
        stmt.setString(1, value.f0);
        stmt.setDouble(2, value.f1 / value.f3);
        stmt.setDouble(3, value.f2 / value.f3);
        stmt.setInt(4, value.f3);
        stmt.executeUpdate();
    }
}

五、数据展示与分析方向

在获取到数据之后,可通过以下方式进行可视化:

  • 基于时间窗口的价格波动折线图
  • 不同区域房源数量排名变化柱状图
  • 面积段分布饼图分析用户偏好

展示方式可以是连接 MySQL 的仪表盘工具,也可以使用 Python 中如 matplotlib/seaborn 等绘图库生成图像。


六、结语:让“流”替代“批”,抓住数据变化瞬间

房产市场的变化,是实时的;用户的需求,是即时的。只有构建起边采集、边处理、边输出的架构,才能真正支撑起精准的推荐算法、动态的市场分析和有意义的购房参考。

本项目以实际数据场景出发,借助 Kafka 与 Flink 实现了可扩展、可监控、可复用的流式采集方案,也为后续在其他高变动领域(如电商、财经、招聘等)提供了可迁移的架构参考。

如果你也在为“如何抓住变化的那一刻”而苦恼,不妨从这个方案开始。


网站公告

今日签到

点亮在社区的每一天
去签到