后端研发转型爬虫实战:Scrapy 二开爬虫框架的避坑指南

发布于:2025-08-04 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、前言

近期业务爬虫需求激增,现有爬虫团队资源紧张,需要后端研发同学协同支持。本文基于公司内部基于Scrapy二次开发的爬虫框架(Speedy)实战经验,分享爬虫项目中的核心开发模式、常见问题解决思路和实用技巧。

无论你是临时支援的后端研发,还是对爬虫技术感兴趣的同学,这些实战经验都能帮助你快速理解爬虫开发的关键要点,提高开发效率。

二、项目架构说明

爬虫项目架构使用 Speedy,项目名称自定义。

2.1 Speedy 爬虫框架

项目框架 Speedy 是基于 Scrapy 进行二开的爬虫框架。非开源且不对外开放的,没有官方网站。资料可以通过 Scrapy 间接学习。
在这里插入图片描述

2.1.1 Scrapy 网络爬虫框架

Scrapy 中文文档
Scrapy是一个快速、高效率的网络爬虫框架,用于抓取web站点并从页面中提取结构化的数据。 Scrapy被广泛用于数据挖掘、监测和自动化测试

2.1.2 生产者-消费者模式

(1)Speedy 框架采用生产者-消费者模式
生产者发送消息到 redis 中,消费者监听消费。增加失败重试兼容网络波动等场景;
使用 ack 机制避免丢失消息;
重试后仍失败的消息会放到 dlq 队列中,通过监控告警通知到研发负责人,人工处理。

2.1.3 消息消费过程说明

(1)调度平台 rundeck 触发定时任务,调用生产者添加爬虫任务体消息到 redis
在这里插入图片描述
在这里插入图片描述

(2)消费者监听 redis 中的消息,spider 进行消费
(3)spider 爬虫程序执行发出请求 request,接收响应 parse,存储响应结果
在这里插入图片描述

2.2 项目结构

● conf : 包含项目的配置文件,如基础配置、报警机器人配置、数据库会话配置、MySQL配置、Redis配置和任务队列工具配置。
● monitor_client : 监控客户端,包含监控指标、Redis字典和相关值的配置文件。
● scheduler : 调度器模块,包含后端实现、处理逻辑、监控、任务队列管理、令牌桶算法和工具函数。
● sites : 各个站点的具体实现,每个站点包含初始化文件、items定义、jobs脚本、中间件、管道、设置、爬虫、任务队列和工具函数。
● speedy_settings : speedy配置。
● third_party : 第三方库或工具。
● utils : 工具模块。

2.2.1 Redis 配置

● 文件路径:conf/redis.py

CONNECTION = {
    'host': '127.0.0.1',
    'port': 6379,
    'db': 2,
    'password': ''
}

2.2.2 MySQL 配置

● 文件路径:conf/mysql.py
定义多数据源 mysql_crawL_db、mysql_service_data

import platform

system = platform.system()


def create_mysql_config(host=None, port=None, user=None, password=None):
    return {
        'type': 'mysql',
        'conf': {
            'host': host,
            'port': port,
            'user': user,
            'password': password,
            'max_op_fail_retry': 3,
            'timeout': 60
        }
    }


mysql_crawL_db = create_mysql_config(
    host='192.168.8.101', port=3306, user='root', password='root'
)
mysql_service_data = create_mysql_config(
    host='192.168.8.102', port=3306, user='root', password='root'
)

2.3 python 和 nodejs 版本要求

(1)建议安装 Anaconda,可以管理多版本 python 环境

  • python --version
    Python 3.7.1
  • node -v
    v20.19.3

(2)然后拉取项目到本地,执行命令拉取所有依赖

pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

三、爬虫实现

3.1 需求表结构设计

(1)此处以抓取罗盘达人今日直播间信息为例。
今日直播卡片数据请求 base_url 为https://compass.jinritemai.com/compass_api/author/live/live_detail/today_live_room?
在这里插入图片描述

(2)响应 json 结果如下

{
    "data": {
        "card_list": [
            {
                "incr_fans_cnt": {
                    "unit": "number",
                    "value": 161
                },
                "live_duration": "8小时51分钟",
                "live_id": "753xxx35",
                "live_status": true,
                "live_title": "欢迎来到xxx直播间",
                "operation": {
                    "live_app_id": 2079,
                    "live_id": "753xxx35",
                    "show_big_screen": true,
                    "show_detail": true
                },
                "pay_order_cnt": {
                    "unit": "number",
                    "value": 404
                },
                "pay_order_gmv": {
                    "unit": "price",
                    "value": 100000
                },
                "start_time": "2025/07/31 05:58",
                "watch_ucnt": {
                    "unit": "number",
                    "value": 16364
                }
            },
            {
                "incr_fans_cnt": {
                    "unit": "number",
                    "value": 4655
                },
                "live_duration": "6天20小时",
                "live_id": "753xxx40",
                "live_status": false,
                "live_title": "欢迎来到xxx直播间",
                "operation": {
                    "live_app_id": 2079,
                    "live_id": "753xxx40",
                    "show_big_screen": true,
                    "show_detail": true
                },
                "pay_order_cnt": {
                    "unit": "number",
                    "value": 8290
                },
                "pay_order_gmv": {
                    "unit": "price",
                    "value": 1000000
                },
                "start_time": "2025/07/24 05:04",
                "watch_ucnt": {
                    "unit": "number",
                    "value": 244988
                }
            }
        ],
        "has_more": false,
        "show": false
    },
    "msg": "",
    "st": 0
}

(3)今日直播表结构设计 dy_live_room_info_test
只记录直播间唯一标识、直播状态基本信息,不记录粉丝数量、成交金额信息。

CREATE TABLE `dy_live_room_info_test` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `nick_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '抖音号名称',
  `aweme_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '抖音号',
  `in_living` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '是否直播,1:直播,0: 未直播',
  `live_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播间id',
  `live_title` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播间名称',
  `live_room_start_time` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播开始时间',
  `live_duration` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '持续时长',
  `bd_create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `bd_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_live_start_date` (`live_id`,`live_room_start_time`) USING BTREE COMMENT 'live_id 和 live_room_start_time 的唯一组合',
  KEY `bd_create_time` (`bd_create_time`) USING BTREE,
  KEY `bd_update_time` (`bd_update_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

3.2 创建project

一般建议一个服务平台,可以对应创建一个 project。
同一个平台内部的 token、cookie 是互通的,便于封装方法,抽取公共实现;和其他服务平台做好业务隔离。
(1)创建project,名称为 dy_luopan。
python -m speedy startproject --project-name dy_luopan
在这里插入图片描述

(2)在 project 下创建具体的 spider,名称为 live_room
python -m speedy genspider --project-name dy_luopan --spider-name live_room
在这里插入图片描述

3.3 写爬虫程序

3.3.1 数据库表 items 维护

● 文件路径
sites/dy_luopan/items.py
● 基于 BaseItem 写 DyLiveRoomInfoDataItem
类似于Java中的mode、entity 层。
需要指定数据源 session,数据表名称、字段、类型及校验。
BaseItem 是自动生成的,大部分不用修改。可以按需改下插入模式,比如这里改为 speedy.ItemMode.REPLACE

import speedy
from speedy.config import config
from conf.dbsession import mysql_crawL_db_session

db_session = config.get_db_session('tidb')


class BaseItem(speedy.Item):
    # 数据库名称
    DATABASE = 'dy_luopan'

    # 分组,用于文档生成是归类分组
    GROUP = None
    # 是否可导出用于生成sql
    EXPORT_DB = True
    # 是否可导出用于文档
    EXPORT_DOC = True
    # 表名称
    TABLE = None
    # 插入模式: ItemMode
    MODE: speedy.ItemMode = speedy.ItemMode.REPLACE
    # 压缩字段, 只支持tidb
    COMPRESS_FIELDS = []

    # 主键字段,如果没有在fields里会默认其为自增主键
    PRIMARY_KEY = []
    # 主键是否自增
    PRIMARY_KEY_AUTO_INC = False
    # 联合主键
    UNIQUE_KEY = []
    # 主键
    KEY = []
    # 数据库类型
    DB_TYPE = 'tidb'

    # curd.Session对象
    session = db_session

    # 快照时间
    snapshot_time = speedy.TimestampField(required=True, comment='快照时间')


class DyLiveRoomInfoDataItem(BaseItem):
    session = mysql_crawL_db_session

    TABLE = 'dy_live_room_info_test'

    nick_name = speedy.StringField(max_length=255, comment='抖音号名称')
    aweme_id = speedy.StringField(max_length=50, comment='抖音号')
    in_living = speedy.StringField(max_length=10, comment='是否直播,1:直播,0: 未直播')
    live_id = speedy.StringField(max_length=50, comment='直播间id')
    live_title = speedy.StringField(max_length=255, comment='直播间名称')
    live_room_start_time = speedy.StringField(max_length=100, comment='直播开始时间')
    live_duration = speedy.StringField(max_length=255, comment='持续时长')
    pass

3.3.2 任务队列 task_queues 维护

(1)当前 project 的 redis db 配置
● 文件路径
sites/dy_luopan/task_queues/init.py
● 使用不同的db,做好业务隔离

CONNECTION.update({
    'db': 7
})
redis_client = Connection(**CONNECTION).client()

(2)具体爬虫任务的队列配置
● 文件路径
sites/dy_luopan/task_queues/live_room.py
● 指定spider监听消费的队列配置
创建 spider 时已通过模板自动创建,按需修改配置。

GROUPS_SETTINGS = {
    'name': 'live_room',
    # 基础队列ID
    # 'base_queue_id': 'live_room',
    # 优先级列表
    # 'priority_list': [0, 1, 2],
    # 每秒并发数
    # 'rate': 100,
    # ack超时(单位ms)
    # 'ack_timeout': 180000,
    # 最大重试次数
    # 'max_retry': 5,
    # 设置过滤器
    # 'filter': 'live_room_filter'
}

# FILTERS_SETTINGS = {
    # 'id': 'live_room_filter',
    # 'type': 'set',  # 'set' or 'bloom'
    # 'size': 10000000,  # bloom过滤器需要设置最大过滤数
    # 'expires': '0 12 * * *',  # 过滤器重置时间, crontab
# }

3.3.3 编写 spider 爬虫实现

● 文件路径
sites/dy_luopan/spiders/live_room.py
● 编写爬虫程序
分析爬取地址 url,分析请求参数,封装 url 的 param。
分析 response 结果,解析 response.text,维护目标记录入库。

3.3.3.1 debug_task 与 Debug 模式

只写必要的信息,比如账号信息、数据筛选时间范围、业务类型标识等。
避免 body 过大,body 体在实际运行中,会作为消息内容,发送到 redis 中。body 过大不便观测数据排查问题。

    def debug_task(self):
        body = {
            'douyin_id': 'lxxx17',
            'douyin_name': '抖音号名称'
        }
        return {'body': body}

(1)debug模式
debug_task()方法里写debug模式调试下的请求体
当指定 BASE_SETTINGS[‘DEBUG’] = True 时,启用 debug 模式。
● debug_task
debug_task() 的请求体直接发送到 start_task() 下。
● start_task
start_task() 基于请求体的账号查询业务表获取到 cookie、token 信息,封装请求参数访问 api。
● parse
parse() 接收响应的结果。判断业务响应码是否为0,不为0抛出异常。全局拦截后触发告警通知到开发负责人。
业务码正常响应符合预期时,读取响应体中的数据,然后封装 item 的字典数据,通过 yield 批量入库。
(2)非debug模式
当指定 BASE_SETTINGS[‘DEBUG’] = False 时,不启用 debug 模式。
生产者生产消息发送到 redis,spider监听队列消息,进行消费。
spider通过任务队列组获取的task都会调用 start_task() 方法开始爬虫处理。

3.3.3.2 start_task

task 会通过 meta 默认传递,在 task 中包含请求体的所有信息
通过索引获取到请求体中账号,然后查询业务表获取账号的 cookie,维护到 headers,发出请求

self.Request()
    def start_task(self, task):
        task_body = task['body']
        douyin_id = task_body['douyin_id']
        lp_list = mysql_service_data_session.filter(
            tb_dy_baiying_luopan_table,
            filters=[('=', 'douyin_id', douyin_id)],
            fields=['douyin_id', 'douyin_name', 'luopan_dt']
        )
        headers = core_headers.copy()
        headers['referer'] = 'https://compass.jinritemai.com/shop/live-detail'
        headers['Cookie'] = lp_list[0]['luopan_dt']
        yield self.Request(url=self.today_live_url, headers=headers)
3.3.3.3 parse

parse 接收 Request 的响应
针对普通的 json 响应结果,使用 response.text 取出即可
解析结果中的目标数据,封装为字典 update_info:{}
通过 yield XxxItem(update_info) 批量入库

    def parse(self, response):
        task = response.meta['task']
        task_body = task['body']
        douyin_id = task_body['douyin_id']
        douyin_name = task_body['douyin_name']
        data = json.loads(response.text)
        # response code码为0正常处理数据
        st = data.get('st', 0)
        if st != 0:
            raise Exception(f"[罗盘达人]今日直播数据获取失败,Failed to get data: {data}")

        card_list = data.get('data', {}).get('card_list', [])
        for item in card_list:
            live_id = item.get("live_id")
            live_title = item.get("live_title")
            in_living = '0' if not item.get("live_status") else '1'
            update_info = {
                'nick_name': douyin_name,
                'aweme_id': douyin_id,
                'in_living': in_living,
                'live_id': live_id,
                'live_title': live_title,
                'live_room_start_time': item.get("start_time"),
                'live_duration': item.get("live_duration")
            }
            yield DyLiveRoomInfoDataItem(update_info)
        pass
3.3.3.4 Debug 模式运行截图

在这里插入图片描述

Debug 模式运行日志
Debug 模式不会真正的入库,只是打印日志。
在这里插入图片描述

3.3.4 生产者调度任务 job 维护

● 文件路径
sites/dy_luopan/jobs/add_task.py
● 编写添加任务的方法

from conf.dbsession import mysql_service_data_session
from sites.dy_luopan.task_queues import task_queue_manager
from sites.dy_luopan.utils.common import tb_dy_baiying_luopan_table
from speedy import Job


class DyLuopanJob(Job):
    name = 'dy_luopan'

    @staticmethod
    def add_live_room_task():
        my_group = task_queue_manager.get('live_room')
        lp_list = mysql_service_data_session.filter(
            tb_dy_baiying_luopan_table,
            fields=['douyin_id', 'douyin_name']
        )
        for lp_info in lp_list:
            body = {
                'douyin_id': lp_info['douyin_id'],
                'douyin_name': lp_info['douyin_name']
            }
            my_group.publish(0, body)
        print(f'添加任务数量:{len(lp_list)}')


if __name__ == "__main__":
    DyLuopanJob().run()

3.3.5 任务部署配置

● 文件路径
sites/dy_luopan/deployments.yml
● 编写任务调度配置文件
维护消费者和生产者及调度频率

group: sites/dy_luopan

services:
- name: live_room
  replicas: 1
  spider: sites.dy_luopan.spiders.live_room

jobs:
- name: add_live_room_task
  timeout: 5m
  retry: 0
  cron: '0 * * * *'
  script: sites.dy_luopan.jobs.add_task add_live_room_task

job_emails: ['xxx@qq.com']

3.4 本地测试爬虫程序

job:sites/dy_luopan/jobs/add_task.py
spider:sites/dy_luopan/spiders/live_room.py

(1)运行 spider 程序,非Debug模式,监听消息进行消费
(2)将 job中的 DyLuopanJob().run() 改为具体的方法 DyLuopanJob().add_live_room_task()
(3)运行 add_task.py,消息发送到 redis。spider 进行消费,进入 start_task() 方法

在这里插入图片描述

3.5 失败队列处理

在这里插入图片描述

dlq 中的失败消息,复制出 body 内容,单独走 Debug 模式断点调试,看失败原因是啥。
然后来完善爬虫程序规避正常的业务场景,或者丢弃无效数据。

四、发布任务到线上服务器

(1)添加自己的 ssh 公钥(id_rsa.pub)到线上服务器 authorized_keys 里面

(2)提交代码到 git 仓库线上分支。
(3)执行同步线上代码
python -m speedy deploy sync-codes
(4)发布任务到服务器并运行
python -m speedy deploy publish -f sites/dy_luopan/deployments.yml
取消任务
python -m speedy deploy unpublish -f sites/dy_luopan/deployments.yml
发布任务后在 rundeck 平台可以看到,这里可以点击按钮主动触发一次任务,及时观察日志,程序运行是否正常。

(5)查看任务运行状态
python -m speedy deploy list-services -f sites/dy_luopan/deployments.yml
确认服务在运行。
(6)登录服务器,在任何路径下可以执行下面的命令,查看爬虫程序的日志
supervisorctl tail -f speedy-sites_dy_luopan-live_room

日志信息如下:

(base) [root@izbp153j8cx9nm5c4a03kzz ~]# supervisorctl tail -f speedy-sites_dy_luopan-live_room
==> Press Ctrl-C to exit <==
] DEBUG: Crawled (200) <POST https://xxx.com/ad/api/data/v1/common/statQuery?reqFrom=xxx&aavid=181xxx62&a_bogus=Dy8hQ5hkdDIivDuf5UKLfY3qV4a3YQol0SVkMDhedd3tpL39HMTz9exow7zvMIubZsQmIebjy4haOpKhrQAy8r6UHuXiWdQ2myuZKl5Q5xSSs1fee6mBnsJx-J44FerM5id3EckMovKGzYuZ09OH-hevPjoja3LkFk6FOoQs> (referer: https://xxx.com/dataV2/roi2-live-analysis)
2025-07-31 18:46:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 15 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:47:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:48:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:49:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)

(7)如有异常,及时关停程序,减小对账号的影响
停止supervisor

python -m speedy deploy stop-services -f sites/dy_luopan/deployments.yml

重新运行supervisor

python -m speedy deploy restart-services -f sites/dy_luopan/deployments.yml

五、规范

● python 非必要注释不需要写;文件名称,抽取的方法,变量等要见名知意。
● spider 的 class 上面空 2 行。
● spider 导入依赖或者其他文件,三方包要写在前面和业务文件空 1 行
● spider 的常量与 task_queue_group 空 1 行,与下面的 def 方法空 1 行
● debug_task 的 body 写法遵循规范

def debug_task(self):
    body = {
        'name1': 'value1',
        'name2': 'value2'
    }
    return {'body': body}
● start_task 只允许发出一个 Request
● parse 的 yield XxxItem遵循规范
    update_info = {
        'nick_name': douyin_name,
        'aweme_id': douyin_id,
        'in_living': in_living,
        'live_id': live_id,
        'live_title': live_title,
        'live_room_start_time': item.get("start_time"),
        'live_duration': item.get("live_duration")
    }
    yield DyLiveRoomInfoDataItem(update_info)

● spider 的最后一个方法结束与下面空 2 行
● utils/common 中的 def 方法之间空 2 行
● spider 中的Request属性=符号前后不要空格 yield self.Request(url=url, headers=headers, callback=self.parse_core)
● spider 的start_task中的请求参数param(比如抓取30+数据指标)长的话也不用抽取到 common 中

六、技巧

● 排序查询
此处根据 live_room_start_time 倒序排序

live_list = mysql_crawL_db_session.filter(
    dy_live_room_info_table,
    filters=[('>', 'live_room_start_time', start_time),
             ('<', 'live_room_start_time', end_time),
             ('=', 'in_living', '0')],
    fields=['aweme_id', 'nick_name', 'live_id', 'live_title'],
    order_by='-live_room_start_time'
)

order_by源码

if self.order_by and not self.count:
    segs = []
    for o in self.order_by:
        if o.startswith('-'):
            segs.append(six.text_type(o[1:]) + ' DESC')
        else:
            segs.append(six.text_type(o))
    
    qs += ['ORDER BY {0}'.format(', '.join(segs))]

● 业务流程长的任务可以发送多个 Task
任务会进入队列池,等待消费。

core_body = {
    **live_base_body,
    'api_type': 'core_gmv',
    'date_time': datetime.now().strftime('%Y/%m/%d %H:00'),
}
yield self.Task(1, core_body, group='dy_lp_live_core')

time_type = '1'
if in_living:
    time_type = '0'
crowd_body = {
    **live_base_body,
    'time_type': time_type
}
yield self.Task(1, crowd_body, group='dy_lp_live_crowd')

product_body = {
    **live_base_body,
    'in_living': in_living
}
yield self.Task(1, product_body, group='dy_lp_live_product')

● Task 优先级,业务流程长的场景,深度优先的任务,优先级应该设置的比上层高。以便优先处理任务。

yield self.Task(2, product_body, group='dy_lp_live_product')
yield self.Task(1, product_body, group='dy_lp_live_product')

● ItemMode 三种插入模式IGNORE、INSERT、REPLACE
● 普通 json text 结果使用 response.text 接收,下载接口使用 response.body 接收

res_dic = json.loads(response.text)
excel_data = BytesIO(response.body)

● 推荐 yield XxxItem,框架实现批量入库。不推荐自己写 sql
● 自定义 callback parse
start_task 只允许发出一个 Request,多个业务场景可以自定义 callback parse

def start_task(self, task):
    if not next_spider:
        yield self.Request(url=self.get_live_room_url, headers=headers)
    else:
        yield self.Request(self.get_device_id_url, headers=headers, callback=self.parse_device_id)

def parse(self, response):
    return None
    
def parse_device_id(self, response):
    return None

● Request 发送 POST 请求

    params = {
        "live_id": live_id
    }
    yield self.Request(self.url, method='POST', headers=headers, body=json.dumps(params))

Powered By niaonao


网站公告

今日签到

点亮在社区的每一天
去签到