一、前言
近期业务爬虫需求激增,现有爬虫团队资源紧张,需要后端研发同学协同支持。本文基于公司内部基于Scrapy二次开发的爬虫框架(Speedy)实战经验,分享爬虫项目中的核心开发模式、常见问题解决思路和实用技巧。
无论你是临时支援的后端研发,还是对爬虫技术感兴趣的同学,这些实战经验都能帮助你快速理解爬虫开发的关键要点,提高开发效率。
二、项目架构说明
爬虫项目架构使用 Speedy,项目名称自定义。
2.1 Speedy 爬虫框架
项目框架 Speedy 是基于 Scrapy 进行二开的爬虫框架。非开源且不对外开放的,没有官方网站。资料可以通过 Scrapy 间接学习。
2.1.1 Scrapy 网络爬虫框架
Scrapy 中文文档
Scrapy是一个快速、高效率的网络爬虫框架,用于抓取web站点并从页面中提取结构化的数据。 Scrapy被广泛用于数据挖掘、监测和自动化测试
2.1.2 生产者-消费者模式
(1)Speedy 框架采用生产者-消费者模式
生产者发送消息到 redis 中,消费者监听消费。增加失败重试兼容网络波动等场景;
使用 ack 机制避免丢失消息;
重试后仍失败的消息会放到 dlq 队列中,通过监控告警通知到研发负责人,人工处理。
2.1.3 消息消费过程说明
(1)调度平台 rundeck 触发定时任务,调用生产者添加爬虫任务体消息到 redis
(2)消费者监听 redis 中的消息,spider 进行消费
(3)spider 爬虫程序执行发出请求 request,接收响应 parse,存储响应结果
2.2 项目结构
● conf : 包含项目的配置文件,如基础配置、报警机器人配置、数据库会话配置、MySQL配置、Redis配置和任务队列工具配置。
● monitor_client : 监控客户端,包含监控指标、Redis字典和相关值的配置文件。
● scheduler : 调度器模块,包含后端实现、处理逻辑、监控、任务队列管理、令牌桶算法和工具函数。
● sites : 各个站点的具体实现,每个站点包含初始化文件、items定义、jobs脚本、中间件、管道、设置、爬虫、任务队列和工具函数。
● speedy_settings : speedy配置。
● third_party : 第三方库或工具。
● utils : 工具模块。
2.2.1 Redis 配置
● 文件路径:conf/redis.py
CONNECTION = {
'host': '127.0.0.1',
'port': 6379,
'db': 2,
'password': ''
}
2.2.2 MySQL 配置
● 文件路径:conf/mysql.py
定义多数据源 mysql_crawL_db、mysql_service_data
import platform
system = platform.system()
def create_mysql_config(host=None, port=None, user=None, password=None):
return {
'type': 'mysql',
'conf': {
'host': host,
'port': port,
'user': user,
'password': password,
'max_op_fail_retry': 3,
'timeout': 60
}
}
mysql_crawL_db = create_mysql_config(
host='192.168.8.101', port=3306, user='root', password='root'
)
mysql_service_data = create_mysql_config(
host='192.168.8.102', port=3306, user='root', password='root'
)
2.3 python 和 nodejs 版本要求
(1)建议安装 Anaconda,可以管理多版本 python 环境
- python --version
Python 3.7.1 - node -v
v20.19.3
(2)然后拉取项目到本地,执行命令拉取所有依赖
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
三、爬虫实现
3.1 需求表结构设计
(1)此处以抓取罗盘达人今日直播间信息为例。
今日直播卡片数据请求 base_url 为https://compass.jinritemai.com/compass_api/author/live/live_detail/today_live_room?
(2)响应 json 结果如下
{
"data": {
"card_list": [
{
"incr_fans_cnt": {
"unit": "number",
"value": 161
},
"live_duration": "8小时51分钟",
"live_id": "753xxx35",
"live_status": true,
"live_title": "欢迎来到xxx直播间",
"operation": {
"live_app_id": 2079,
"live_id": "753xxx35",
"show_big_screen": true,
"show_detail": true
},
"pay_order_cnt": {
"unit": "number",
"value": 404
},
"pay_order_gmv": {
"unit": "price",
"value": 100000
},
"start_time": "2025/07/31 05:58",
"watch_ucnt": {
"unit": "number",
"value": 16364
}
},
{
"incr_fans_cnt": {
"unit": "number",
"value": 4655
},
"live_duration": "6天20小时",
"live_id": "753xxx40",
"live_status": false,
"live_title": "欢迎来到xxx直播间",
"operation": {
"live_app_id": 2079,
"live_id": "753xxx40",
"show_big_screen": true,
"show_detail": true
},
"pay_order_cnt": {
"unit": "number",
"value": 8290
},
"pay_order_gmv": {
"unit": "price",
"value": 1000000
},
"start_time": "2025/07/24 05:04",
"watch_ucnt": {
"unit": "number",
"value": 244988
}
}
],
"has_more": false,
"show": false
},
"msg": "",
"st": 0
}
(3)今日直播表结构设计 dy_live_room_info_test
只记录直播间唯一标识、直播状态基本信息,不记录粉丝数量、成交金额信息。
CREATE TABLE `dy_live_room_info_test` (
`id` bigint NOT NULL AUTO_INCREMENT,
`nick_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '抖音号名称',
`aweme_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '抖音号',
`in_living` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '是否直播,1:直播,0: 未直播',
`live_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播间id',
`live_title` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播间名称',
`live_room_start_time` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '直播开始时间',
`live_duration` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '持续时长',
`bd_create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`bd_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_live_start_date` (`live_id`,`live_room_start_time`) USING BTREE COMMENT 'live_id 和 live_room_start_time 的唯一组合',
KEY `bd_create_time` (`bd_create_time`) USING BTREE,
KEY `bd_update_time` (`bd_update_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
3.2 创建project
一般建议一个服务平台,可以对应创建一个 project。
同一个平台内部的 token、cookie 是互通的,便于封装方法,抽取公共实现;和其他服务平台做好业务隔离。
(1)创建project,名称为 dy_luopan。
python -m speedy startproject --project-name dy_luopan
(2)在 project 下创建具体的 spider,名称为 live_room
python -m speedy genspider --project-name dy_luopan --spider-name live_room
3.3 写爬虫程序
3.3.1 数据库表 items 维护
● 文件路径
sites/dy_luopan/items.py
● 基于 BaseItem 写 DyLiveRoomInfoDataItem
类似于Java中的mode、entity 层。
需要指定数据源 session,数据表名称、字段、类型及校验。
BaseItem 是自动生成的,大部分不用修改。可以按需改下插入模式,比如这里改为 speedy.ItemMode.REPLACE
import speedy
from speedy.config import config
from conf.dbsession import mysql_crawL_db_session
db_session = config.get_db_session('tidb')
class BaseItem(speedy.Item):
# 数据库名称
DATABASE = 'dy_luopan'
# 分组,用于文档生成是归类分组
GROUP = None
# 是否可导出用于生成sql
EXPORT_DB = True
# 是否可导出用于文档
EXPORT_DOC = True
# 表名称
TABLE = None
# 插入模式: ItemMode
MODE: speedy.ItemMode = speedy.ItemMode.REPLACE
# 压缩字段, 只支持tidb
COMPRESS_FIELDS = []
# 主键字段,如果没有在fields里会默认其为自增主键
PRIMARY_KEY = []
# 主键是否自增
PRIMARY_KEY_AUTO_INC = False
# 联合主键
UNIQUE_KEY = []
# 主键
KEY = []
# 数据库类型
DB_TYPE = 'tidb'
# curd.Session对象
session = db_session
# 快照时间
snapshot_time = speedy.TimestampField(required=True, comment='快照时间')
class DyLiveRoomInfoDataItem(BaseItem):
session = mysql_crawL_db_session
TABLE = 'dy_live_room_info_test'
nick_name = speedy.StringField(max_length=255, comment='抖音号名称')
aweme_id = speedy.StringField(max_length=50, comment='抖音号')
in_living = speedy.StringField(max_length=10, comment='是否直播,1:直播,0: 未直播')
live_id = speedy.StringField(max_length=50, comment='直播间id')
live_title = speedy.StringField(max_length=255, comment='直播间名称')
live_room_start_time = speedy.StringField(max_length=100, comment='直播开始时间')
live_duration = speedy.StringField(max_length=255, comment='持续时长')
pass
3.3.2 任务队列 task_queues 维护
(1)当前 project 的 redis db 配置
● 文件路径
sites/dy_luopan/task_queues/init.py
● 使用不同的db,做好业务隔离
CONNECTION.update({
'db': 7
})
redis_client = Connection(**CONNECTION).client()
(2)具体爬虫任务的队列配置
● 文件路径
sites/dy_luopan/task_queues/live_room.py
● 指定spider监听消费的队列配置
创建 spider 时已通过模板自动创建,按需修改配置。
GROUPS_SETTINGS = {
'name': 'live_room',
# 基础队列ID
# 'base_queue_id': 'live_room',
# 优先级列表
# 'priority_list': [0, 1, 2],
# 每秒并发数
# 'rate': 100,
# ack超时(单位ms)
# 'ack_timeout': 180000,
# 最大重试次数
# 'max_retry': 5,
# 设置过滤器
# 'filter': 'live_room_filter'
}
# FILTERS_SETTINGS = {
# 'id': 'live_room_filter',
# 'type': 'set', # 'set' or 'bloom'
# 'size': 10000000, # bloom过滤器需要设置最大过滤数
# 'expires': '0 12 * * *', # 过滤器重置时间, crontab
# }
3.3.3 编写 spider 爬虫实现
● 文件路径
sites/dy_luopan/spiders/live_room.py
● 编写爬虫程序
分析爬取地址 url,分析请求参数,封装 url 的 param。
分析 response 结果,解析 response.text,维护目标记录入库。
3.3.3.1 debug_task 与 Debug 模式
只写必要的信息,比如账号信息、数据筛选时间范围、业务类型标识等。
避免 body 过大,body 体在实际运行中,会作为消息内容,发送到 redis 中。body 过大不便观测数据排查问题。
def debug_task(self):
body = {
'douyin_id': 'lxxx17',
'douyin_name': '抖音号名称'
}
return {'body': body}
(1)debug模式
debug_task()方法里写debug模式调试下的请求体
当指定 BASE_SETTINGS[‘DEBUG’] = True 时,启用 debug 模式。
● debug_task
debug_task() 的请求体直接发送到 start_task() 下。
● start_task
start_task() 基于请求体的账号查询业务表获取到 cookie、token 信息,封装请求参数访问 api。
● parse
parse() 接收响应的结果。判断业务响应码是否为0,不为0抛出异常。全局拦截后触发告警通知到开发负责人。
业务码正常响应符合预期时,读取响应体中的数据,然后封装 item 的字典数据,通过 yield 批量入库。
(2)非debug模式
当指定 BASE_SETTINGS[‘DEBUG’] = False 时,不启用 debug 模式。
生产者生产消息发送到 redis,spider监听队列消息,进行消费。
spider通过任务队列组获取的task都会调用 start_task() 方法开始爬虫处理。
3.3.3.2 start_task
task 会通过 meta 默认传递,在 task 中包含请求体的所有信息
通过索引获取到请求体中账号,然后查询业务表获取账号的 cookie,维护到 headers,发出请求
self.Request()
def start_task(self, task):
task_body = task['body']
douyin_id = task_body['douyin_id']
lp_list = mysql_service_data_session.filter(
tb_dy_baiying_luopan_table,
filters=[('=', 'douyin_id', douyin_id)],
fields=['douyin_id', 'douyin_name', 'luopan_dt']
)
headers = core_headers.copy()
headers['referer'] = 'https://compass.jinritemai.com/shop/live-detail'
headers['Cookie'] = lp_list[0]['luopan_dt']
yield self.Request(url=self.today_live_url, headers=headers)
3.3.3.3 parse
parse 接收 Request 的响应
针对普通的 json 响应结果,使用 response.text 取出即可
解析结果中的目标数据,封装为字典 update_info:{}
通过 yield XxxItem(update_info) 批量入库
def parse(self, response):
task = response.meta['task']
task_body = task['body']
douyin_id = task_body['douyin_id']
douyin_name = task_body['douyin_name']
data = json.loads(response.text)
# response code码为0正常处理数据
st = data.get('st', 0)
if st != 0:
raise Exception(f"[罗盘达人]今日直播数据获取失败,Failed to get data: {data}")
card_list = data.get('data', {}).get('card_list', [])
for item in card_list:
live_id = item.get("live_id")
live_title = item.get("live_title")
in_living = '0' if not item.get("live_status") else '1'
update_info = {
'nick_name': douyin_name,
'aweme_id': douyin_id,
'in_living': in_living,
'live_id': live_id,
'live_title': live_title,
'live_room_start_time': item.get("start_time"),
'live_duration': item.get("live_duration")
}
yield DyLiveRoomInfoDataItem(update_info)
pass
3.3.3.4 Debug 模式运行截图
Debug 模式运行日志
Debug 模式不会真正的入库,只是打印日志。
3.3.4 生产者调度任务 job 维护
● 文件路径
sites/dy_luopan/jobs/add_task.py
● 编写添加任务的方法
from conf.dbsession import mysql_service_data_session
from sites.dy_luopan.task_queues import task_queue_manager
from sites.dy_luopan.utils.common import tb_dy_baiying_luopan_table
from speedy import Job
class DyLuopanJob(Job):
name = 'dy_luopan'
@staticmethod
def add_live_room_task():
my_group = task_queue_manager.get('live_room')
lp_list = mysql_service_data_session.filter(
tb_dy_baiying_luopan_table,
fields=['douyin_id', 'douyin_name']
)
for lp_info in lp_list:
body = {
'douyin_id': lp_info['douyin_id'],
'douyin_name': lp_info['douyin_name']
}
my_group.publish(0, body)
print(f'添加任务数量:{len(lp_list)}')
if __name__ == "__main__":
DyLuopanJob().run()
3.3.5 任务部署配置
● 文件路径
sites/dy_luopan/deployments.yml
● 编写任务调度配置文件
维护消费者和生产者及调度频率
group: sites/dy_luopan
services:
- name: live_room
replicas: 1
spider: sites.dy_luopan.spiders.live_room
jobs:
- name: add_live_room_task
timeout: 5m
retry: 0
cron: '0 * * * *'
script: sites.dy_luopan.jobs.add_task add_live_room_task
job_emails: ['xxx@qq.com']
3.4 本地测试爬虫程序
job:sites/dy_luopan/jobs/add_task.py
spider:sites/dy_luopan/spiders/live_room.py
(1)运行 spider 程序,非Debug模式,监听消息进行消费
(2)将 job中的 DyLuopanJob().run() 改为具体的方法 DyLuopanJob().add_live_room_task()
(3)运行 add_task.py,消息发送到 redis。spider 进行消费,进入 start_task() 方法
3.5 失败队列处理
dlq 中的失败消息,复制出 body 内容,单独走 Debug 模式断点调试,看失败原因是啥。
然后来完善爬虫程序规避正常的业务场景,或者丢弃无效数据。
四、发布任务到线上服务器
(1)添加自己的 ssh 公钥(id_rsa.pub)到线上服务器 authorized_keys 里面
(2)提交代码到 git 仓库线上分支。
(3)执行同步线上代码
python -m speedy deploy sync-codes
(4)发布任务到服务器并运行
python -m speedy deploy publish -f sites/dy_luopan/deployments.yml
取消任务
python -m speedy deploy unpublish -f sites/dy_luopan/deployments.yml
发布任务后在 rundeck 平台可以看到,这里可以点击按钮主动触发一次任务,及时观察日志,程序运行是否正常。
(5)查看任务运行状态
python -m speedy deploy list-services -f sites/dy_luopan/deployments.yml
确认服务在运行。
(6)登录服务器,在任何路径下可以执行下面的命令,查看爬虫程序的日志
supervisorctl tail -f speedy-sites_dy_luopan-live_room
日志信息如下:
(base) [root@izbp153j8cx9nm5c4a03kzz ~]# supervisorctl tail -f speedy-sites_dy_luopan-live_room
==> Press Ctrl-C to exit <==
] DEBUG: Crawled (200) <POST https://xxx.com/ad/api/data/v1/common/statQuery?reqFrom=xxx&aavid=181xxx62&a_bogus=Dy8hQ5hkdDIivDuf5UKLfY3qV4a3YQol0SVkMDhedd3tpL39HMTz9exow7zvMIubZsQmIebjy4haOpKhrQAy8r6UHuXiWdQ2myuZKl5Q5xSSs1fee6mBnsJx-J44FerM5id3EckMovKGzYuZ09OH-hevPjoja3LkFk6FOoQs> (referer: https://xxx.com/dataV2/roi2-live-analysis)
2025-07-31 18:46:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 15 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:47:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:48:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2025-07-31 18:49:44 [scrapy.extensions.logstats] INFO: Crawled 114519 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
(7)如有异常,及时关停程序,减小对账号的影响
停止supervisor
python -m speedy deploy stop-services -f sites/dy_luopan/deployments.yml
重新运行supervisor
python -m speedy deploy restart-services -f sites/dy_luopan/deployments.yml
五、规范
● python 非必要注释不需要写;文件名称,抽取的方法,变量等要见名知意。
● spider 的 class 上面空 2 行。
● spider 导入依赖或者其他文件,三方包要写在前面和业务文件空 1 行
● spider 的常量与 task_queue_group 空 1 行,与下面的 def 方法空 1 行
● debug_task 的 body 写法遵循规范
def debug_task(self):
body = {
'name1': 'value1',
'name2': 'value2'
}
return {'body': body}
● start_task 只允许发出一个 Request
● parse 的 yield XxxItem遵循规范
update_info = {
'nick_name': douyin_name,
'aweme_id': douyin_id,
'in_living': in_living,
'live_id': live_id,
'live_title': live_title,
'live_room_start_time': item.get("start_time"),
'live_duration': item.get("live_duration")
}
yield DyLiveRoomInfoDataItem(update_info)
● spider 的最后一个方法结束与下面空 2 行
● utils/common 中的 def 方法之间空 2 行
● spider 中的Request属性=符号前后不要空格 yield self.Request(url=url, headers=headers, callback=self.parse_core)
● spider 的start_task中的请求参数param(比如抓取30+数据指标)长的话也不用抽取到 common 中
六、技巧
● 排序查询
此处根据 live_room_start_time 倒序排序
live_list = mysql_crawL_db_session.filter(
dy_live_room_info_table,
filters=[('>', 'live_room_start_time', start_time),
('<', 'live_room_start_time', end_time),
('=', 'in_living', '0')],
fields=['aweme_id', 'nick_name', 'live_id', 'live_title'],
order_by='-live_room_start_time'
)
order_by源码
if self.order_by and not self.count:
segs = []
for o in self.order_by:
if o.startswith('-'):
segs.append(six.text_type(o[1:]) + ' DESC')
else:
segs.append(six.text_type(o))
qs += ['ORDER BY {0}'.format(', '.join(segs))]
● 业务流程长的任务可以发送多个 Task
任务会进入队列池,等待消费。
core_body = {
**live_base_body,
'api_type': 'core_gmv',
'date_time': datetime.now().strftime('%Y/%m/%d %H:00'),
}
yield self.Task(1, core_body, group='dy_lp_live_core')
time_type = '1'
if in_living:
time_type = '0'
crowd_body = {
**live_base_body,
'time_type': time_type
}
yield self.Task(1, crowd_body, group='dy_lp_live_crowd')
product_body = {
**live_base_body,
'in_living': in_living
}
yield self.Task(1, product_body, group='dy_lp_live_product')
● Task 优先级,业务流程长的场景,深度优先的任务,优先级应该设置的比上层高。以便优先处理任务。
yield self.Task(2, product_body, group='dy_lp_live_product')
yield self.Task(1, product_body, group='dy_lp_live_product')
● ItemMode 三种插入模式IGNORE、INSERT、REPLACE
● 普通 json text 结果使用 response.text 接收,下载接口使用 response.body 接收
res_dic = json.loads(response.text)
excel_data = BytesIO(response.body)
● 推荐 yield XxxItem,框架实现批量入库。不推荐自己写 sql
● 自定义 callback parse
start_task 只允许发出一个 Request,多个业务场景可以自定义 callback parse
def start_task(self, task):
if not next_spider:
yield self.Request(url=self.get_live_room_url, headers=headers)
else:
yield self.Request(self.get_device_id_url, headers=headers, callback=self.parse_device_id)
def parse(self, response):
return None
def parse_device_id(self, response):
return None
● Request 发送 POST 请求
params = {
"live_id": live_id
}
yield self.Request(self.url, method='POST', headers=headers, body=json.dumps(params))
Powered By niaonao