只支持普通公网无认证的url 如果无法下载请提供测试数据调试
# !/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC000abc@gmail.com
@file: test.py
@time: 2025/6/30 15:56
@desc:
"""
import copy
import os
import requests
import uuid
import json
from sdk.utils.util_class import URLParser
from sdk.utils.util_jsonl import JSONL
class MotrixDownload:
"""
"""
# 配置参数
def __init__(self):
self.jsonl = JSONL()
self.load_conf()
def load_conf(self, file="conf.json"):
"""
:param file:
:return:
"""
data = self.jsonl.read_json(file)
self.rpc_url = data["RPC_URL"]
self.rpc_secret = data["RPC_SECRET"]
self.rpc_headers = data["HEADERS"]
self.rpc_save_path = data["DIR"]
def call_motrix_rpc(self, method, params=None):
"""调用Motrix RPC接口的通用函数"""
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
"params": [f"token:{self.rpc_secret}"] + (params if params else [])
}
try:
response = requests.post(
self.rpc_url,
headers=self.rpc_headers,
data=json.dumps(payload),
timeout=10
)
result = response.json()
if "error" in result:
print(f"RPC Error [{result['error']['code']}]: {result['error']['message']}")
return None
return result.get("result")
except Exception as e:
print(f"Request failed: {str(e)}")
return None
def parse_download_conf(self, urls, folder_num=1):
"""
:param urls:
:param folder_num:
:return:
"""
for url in urls:
obj_url = URLParser(url)
yield (url, f"{os.sep.join(obj_url.path_lis[-folder_num:])}{os.sep}{obj_url.name}.{obj_url.tail}")
def download(self, urls, folder_num=1):
"""
:param urls:
:param folder_num:
:return:
"""
conf = {"dir": self.rpc_save_path}
for url, filename in self.parse_download_conf(urls, folder_num):
conf_copy = copy.deepcopy(conf)
conf_copy.update({
"out": filename
})
download_task = self.call_motrix_rpc("aria2.addUri", params=[[url], conf_copy])
print(download_task)
# 使用示例
if __name__ == "__main__":
mw = MotrixDownload()
urls = ["https://bj.bcebos.com/petite-mark/public_read/vipshop/1102/Windows.iso",
"https://bj.bcebos.com/petite-mark/public_read/vipshop/1102/output_True_path_2025-06-21-20-44-17.xlsx"]
mw.download(urls, 4)
from sdk.utils.util_class import URLParser
from sdk.utils.util_jsonl import JSONL
参照其他博客中的
URLParser 解析url 可以使用Yarl 库代替
JSONL 就是读json文件
先安装Motrix 客户端 ,本工具只负责批量提交任务 具体参数 客户端自行配置
配置文件:conf.json
参照下图
前两个参数可前往Motrix软件客户端获取