批量向Motrix 提交下载任务

发布于:2025-07-01 ⋅ 阅读:(17) ⋅ 点赞:(0)

只支持普通公网无认证的url 如果无法下载请提供测试数据调试

# !/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC000abc@gmail.com
@file: test.py
@time: 2025/6/30 15:56 
@desc: 

"""
import copy
import os
import requests
import uuid
import json
from sdk.utils.util_class import URLParser
from sdk.utils.util_jsonl import JSONL


class MotrixDownload:
    """

    """

    # 配置参数
    def __init__(self):
        self.jsonl = JSONL()
        self.load_conf()

    def load_conf(self, file="conf.json"):
        """

        :param file:
        :return:
        """
        data = self.jsonl.read_json(file)
        self.rpc_url = data["RPC_URL"]
        self.rpc_secret = data["RPC_SECRET"]
        self.rpc_headers = data["HEADERS"]
        self.rpc_save_path = data["DIR"]

    def call_motrix_rpc(self, method, params=None):
        """调用Motrix RPC接口的通用函数"""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": method,
            "params": [f"token:{self.rpc_secret}"] + (params if params else [])
        }

        try:
            response = requests.post(
                self.rpc_url,
                headers=self.rpc_headers,
                data=json.dumps(payload),
                timeout=10
            )
            result = response.json()
            if "error" in result:
                print(f"RPC Error [{result['error']['code']}]: {result['error']['message']}")
                return None
            return result.get("result")

        except Exception as e:
            print(f"Request failed: {str(e)}")
            return None

    def parse_download_conf(self, urls, folder_num=1):
        """

        :param urls:
        :param folder_num:
        :return:
        """
        for url in urls:
            obj_url = URLParser(url)
            yield (url, f"{os.sep.join(obj_url.path_lis[-folder_num:])}{os.sep}{obj_url.name}.{obj_url.tail}")

    def download(self, urls, folder_num=1):
        """

        :param urls:
        :param folder_num:
        :return:
        """
        conf = {"dir": self.rpc_save_path}
        for url, filename in self.parse_download_conf(urls, folder_num):
            conf_copy = copy.deepcopy(conf)
            conf_copy.update({
                "out": filename
            })
            download_task = self.call_motrix_rpc("aria2.addUri", params=[[url], conf_copy])
            print(download_task)


# 使用示例
if __name__ == "__main__":
    mw = MotrixDownload()
    urls = ["https://bj.bcebos.com/petite-mark/public_read/vipshop/1102/Windows.iso",
            "https://bj.bcebos.com/petite-mark/public_read/vipshop/1102/output_True_path_2025-06-21-20-44-17.xlsx"]

    mw.download(urls, 4)


from sdk.utils.util_class import URLParser
from sdk.utils.util_jsonl import JSONL
参照其他博客中的
URLParser 解析url 可以使用Yarl 库代替
JSONL 就是读json文件

先安装Motrix 客户端 ,本工具只负责批量提交任务 具体参数 客户端自行配置

配置文件:conf.json
参照下图
在这里插入图片描述

前两个参数可前往Motrix软件客户端获取
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到