Fastapi中怎么一次性运行多个Scrapy爬虫

发布于:2024-05-07 ⋅ 阅读:(24) ⋅ 点赞:(0)

运行Scrapy爬虫很简单,直接"Scrapy crawl 爬虫名称"即可。但是我们如果想在Fastapi中通过接口的方式一次性运行多个爬虫。那该怎么实现?

假如在scrapy下面的spiders里面写了许多爬虫文件,你可以在spiders的__init__.py文件中,将写好的爬虫类导入到__init__文件里面,然后将类保存到一个序列里面,用于后续启动。例如

from .apnews import ApnewsSpider
from .cnnnews import CnnnewsSpider
...

SPIDERS = [
    ApnewsSpider,
    CnnnewsSpider,
    ...
]

当然,如果你觉得麻烦,完全可以动态添加,比如使用装饰器等。

废话少说,接着直接看代码:

import asyncio
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
PROCESS = CrawlerProcess(get_project_settings())
_CRAWL_LOCK = asyncio.Lock()

async def run_spiders():
    for spider_class in SPIDERS:
        _ = PROCESS.crawl(spider_class)

@router.post('/crawl/news', description="新闻爬取")
async def crawl_news(background_tasks: BackgroundTasks):
    async with _CRAWL_LOCK:
        try:
            if not PROCESS.crawlers or not any(crawler.crawling for crawler in PROCESS.crawlers):
                background_tasks.add_task(run_spiders)
            else:
                return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content="爬虫正在运行")
        except Exception as error:
            error_msg = f'Exception in start crawler, reason: {str(error)}'
            log_it(error_msg, level=logging.ERROR)
            return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content="爬虫启动失败")
        return JSONResponse(status_code=status.HTTP_200_OK, content="OK")

PROCESS = CrawlerProcess(get_project_settings()): 这一行创建了一个 Scrapy 的 CrawlerProcess 对象,该对象用于管理和运行爬虫。_CRAWL_LOCK = asyncio.Lock()这一行创建了一个 asyncio 锁,用于确保在同一时刻只有一个异步任务可以运行。这是为了防止多个请求同时触发爬虫启动时可能出现的问题。run_spiders是运行爬虫的主要函数,其中SPIDERS就是上面所有爬虫类的列表,依次遍历这个爬虫类列表,使用PROCESS.crawl(spider_class)调用了 CrawlerProcess 对象的 crawl 方法来启动指定的爬虫。

我如果不想在fastapi里面运行多个爬虫,只需在普通的main函数里面即可,这时该怎么修改(这里有个坑

也直接看代码:

PROCESS = CrawlerProcess(get_project_settings())
_CRAWL_LOCK = threading.Lock()

def run_spiders():
    for spider_class in SPIDERS:
        _ = PROCESS.crawl(spider_class)
    PROCESS.start()
    PROCESS.join()


def crawl_spider():
    with _CRAWL_LOCK:
        try:
            if not PROCESS.crawlers or not any(crawler.crawling for crawler in PROCESS.crawlers):
                run_spiders()
            else:
                logging.error("爬虫正在运行。。。")
        except Exception as error:
            error_msg = f'Exception in start crawler, reason: {str(error)}'
            logging.error(error_msg)

if __name__ == '__main__':
    crawl_spider()

这里如果在普通函数里面遍历运行爬虫类,需要PROCESS.start(),PROCESS.join()用于等待所有爬虫执行完成正常退出。否则遍历完爬虫类程序就会结束了。

而在fastapi异步模式下,通常不需要使用 join() 方法来等待爬虫执行完毕。这是因为异步框架通常会在任务完成时通过回调或者异步等待的方式来处理结果,而不是像同步编程那样需要显式地等待任务的完成。