使用rest接口调用历史数据
获取历史数据不需要使用api密钥。
环境准备的阶段
# 必需的依赖库
import os
import time
import pandas as pd
from datetime import datetime, timedelta
from binance.client import Client
初始化数据获取器
fetcher = BatchKlineFetcher()
# 可选参数:api_key, api_secret, use_testnet
# 开发测试阶段
client = BinanceAPIClient(
api_key='test_key',
api_secret='test_secret',
use_testnet=True # 使用测试网络
)
# 生产部署阶段
client = BinanceAPIClient(
api_key='real_key',
api_secret='real_secret',
use_testnet=False # 使用主网络
)
Testnet(测试网络)
- 用途: 开发和测试环境
- 资金: 使用虚拟资金,不是真实货币
- 风险: 零风险,可以随意测试
- 数据: 模拟真实市场数据,但不影响真实账户
Mainnet(主网络)
- 用途: 生产环境,真实交易
- 资金: 真实的加密货币资金
- 风险: 真实的资金风险
- 数据: 真实市场数据和交易
对于历史数据的研究没有区别。
获取历史数据的时候
相同点
主要的历史K线数据(价格、成交量等)在两个网络上基本一致
数据结构和格式完全相同
获取速度和稳定性差不多
细微差别
数据完整性: 主网的历史数据更完整,覆盖时间更长
数据精度: 主网数据是真实交易数据,测试网可能有微小的模拟差异
API限制: 速率限制可能略有不同
# 对于纯历史数据分析,可以这样设置
client = BinanceAPIClient(
api_key='', # 可以为空
api_secret='', # 可以为空
use_testnet=False # 建议用主网获取最完整的历史数据
)
# 获取历史K线不需要API密钥权限
klines = client.get_historical_klines('BTCUSDT', '1m', start_time, end_time)
如获取某个交易对1年的数据
# 一年前的时间
end_time = datetime.now()
start_time = end_time - timedelta(days=365)
klines = client.get_historical_klines(
symbol='BTCUSDT',
interval='1m',
start_time=start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
)
对应的类文件和函数
class BinanceAPIClient:
"""
Binance API客户端类
负责与Binance API的所有交互
"""
def __init__(self, api_key='', api_secret='', use_testnet=False):
"""
初始化API客户端
参数:
- api_key: Binance API密钥
- api_secret: Binance API密钥
- use_testnet: 是否使用测试网络
"""
self.api_key = api_key
self.api_secret = api_secret
self.use_testnet = use_testnet
# 初始化Binance客户端
self.client = Client(api_key, api_secret, testnet=use_testnet)
# 配置参数
self.config = API_CONFIG
def get_historical_klines(self, symbol, interval, start_time, end_time, limit=1000):
"""
获取历史K线数据,带有重试机制和时区处理
参数:
- symbol: 交易对符号 (如 'BTCUSDT')
- interval: 时间间隔 (如 '1h', '1d')
- start_time: 开始时间 (字符串格式)
- end_time: 结束时间 (字符串格式)
- limit: 每次请求的最大数量
返回:
- K线数据列表
"""
all_klines = []
current_start_time = start_time
retry_count = 0
while retry_count < self.config["max_retries"]:
try:
# 添加小延迟避免频率限制
time.sleep(self.config["request_delay"])
while True:
# 请求历史K线数据,每次最多1000条
klines = self.client.get_historical_klines(
symbol, interval, current_start_time, end_time, limit=limit
)
if not klines:
logger.warning(f"没有获取到K线数据,尝试调整时间范围")
break
all_klines.extend(klines)
logger.info(f"获取了 {len(klines)} 条K线数据")
# 如果返回的数据少于limit,说明已经获取完所有数据
if len(klines) < limit:
break
# 更新开始时间为最后一根K线的收盘时间 + 1ms
last_close_time = klines[-1][6]
current_start_time = str(int(last_close_time) + 1)
# 如果新的开始时间超过结束时间,跳出循环
if current_start_time >= end_time:
break
# 如果到这里没有异常,跳出重试循环
break
except Exception as e:
retry_count += 1
logger.error(f"获取K线数据时出错 (尝试 {retry_count}/{self.config['max_retries']}): {e}")
if retry_count >= self.config["max_retries"]:
logger.error("达到最大重试次数,放弃获取数据")
break
time.sleep(self.config["retry_delay"]) # 等待后重试
return all_klines
🚀 加密货币合约数据获取与保存完整教程
梳理完整的数据获取流程。
📋 主要流程概览
1️⃣ 环境准备阶段
# 必需的依赖库
import os
import time
import pandas as pd
from datetime import datetime, timedelta
from binance.client import Client
2️⃣ 初始化数据获取器
fetcher = BatchKlineFetcher()
# 可选参数:api_key, api_secret, use_testnet
3️⃣ 核心数据获取流程
步骤A: 参数验证
- 验证交易对符号(如 ‘BTCUSDT’)
- 验证时间间隔(支持:1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 1d)
- 计算时间范围(开始时间 ← 结束时间 - 天数)
步骤B: API数据请求
- 调用 Binance API 获取历史K线数据
- 自动分批处理(每次最多1000条记录)
- 内置重试机制和频率限制保护
步骤C: 数据处理与转换
- 原始数据转换为 pandas DataFrame
- 数据类型转换(价格、成交量等转为数值型)
- 时间戳转换(毫秒 → 标准时间格式)
- 设置时间戳为索引
步骤D: 数据保存
- 自动生成文件名:
{交易对}_{时间间隔}_{天数}days_{时间戳}.csv
- 保存到指定目录:
batch_kline_data/
- CSV格式,包含完整的K线数据
🔧 关键参数说明
必需参数
参数 | 类型 | 说明 | 示例 |
---|---|---|---|
symbol |
string | 交易对符号 | ‘BTCUSDT’, ‘ETHUSDT’ |
interval |
string | 时间间隔 | ‘1m’, ‘5m’, ‘1h’, ‘1d’ |
days_back |
int | 向前追溯天数 | 7, 30, 365 |
可选参数
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
end_date |
string/None | None | 结束日期,默认当前时间 |
save_to_file |
bool | True | 是否保存到文件 |
api_key |
string | ‘’ | Binance API密钥(公开数据不需要) |
api_secret |
string | ‘’ | Binance API密钥 |
📊 数据结构说明
输出的CSV文件包含以下列:
timestamp
: 时间戳(索引)open
: 开盘价high
: 最高价low
: 最低价close
: 收盘价volume
: 成交量close_time
: 收盘时间quote_asset_volume
: 计价资产成交量number_of_trades
: 成交笔数taker_buy_base_asset_volume
: 主动买入成交量taker_buy_quote_asset_volume
: 主动买入计价资产成交量
💡 实际使用示例
示例1: 获取单个合约数据
# 获取BTCUSDT最近7天的1分钟数据
df = fetcher.fetch_kline_data('BTCUSDT', '1m', 7)
print(f"获取了 {len(df)} 条记录")
示例2: 批量获取多个合约
# 批量获取多个交易对的5分钟数据
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
results = fetcher.fetch_multiple_symbols(symbols, '5m', 3)
示例3: 获取历史特定时间段数据
# 获取2024年1月的数据
df = fetcher.fetch_kline_data('BTCUSDT', '1h', 31, end_date='2024-02-01')
⚡ 性能优化建议
- 时间间隔选择:分钟级数据建议不超过30天,小时级可获取更长时间
- 批量处理:使用
fetch_multiple_symbols()
批量获取多个交易对 - 内存管理:大数据集建议分批处理,避免内存溢出
- API限制:内置延迟机制,避免触发API频率限制
📁 文件组织结构
项目目录/
├── batch_kline_fetcher.py # 主要获取器
├── api_client.py # API客户端
├── config.py # 配置文件
└── batch_kline_data/ # 数据保存目录
├── BTCUSDT_1m_7days_20250813_211425.csv
├── ETHUSDT_5m_3days_20250813_211429.csv
└── ...
🎯 博客教程建议结构
- 引言:为什么需要获取加密货币数据
- 环境搭建:Python环境和依赖安装
- 核心概念:K线数据、时间间隔、交易对
- 代码实现:逐步讲解每个函数
- 实战案例:3-5个不同场景的使用示例
- 数据分析:如何使用获取的数据进行分析
- 注意事项:API限制、错误处理、性能优化
- 扩展应用:结合技术指标、可视化等
优化版本的代码
因为原来的版本,我们使用的时候,是单线程,对于较长的时间段获取,时间会很长。使用多线程,自动切片任务。就可以加快获取数据的速度。
# -*- coding: utf-8 -*-
"""
优化版API客户端模块
支持多线程并发获取和智能时间切片,大幅提升长时间段数据获取速度
"""
import os
import time
import logging
import threading
import pandas as pd
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from binance.client import Client
from config import API_CONFIG, TIMEFRAMES, KLINE_COLUMNS
from queue import Queue
import math
logger = logging.getLogger(__name__)
class OptimizedBinanceAPIClient:
"""
优化版Binance API客户端类
支持多线程并发获取和智能时间切片,提升长时间段数据获取效率
"""
def __init__(self, api_key='', api_secret='', use_testnet=False, max_workers=5):
"""
初始化优化版API客户端
参数:
- api_key: Binance API密钥
- api_secret: Binance API密钥
- use_testnet: 是否使用测试网络
- max_workers: 最大并发线程数
"""
self.api_key = api_key
self.api_secret = api_secret
self.use_testnet = use_testnet
self.max_workers = max_workers
# 初始化Binance客户端
self.client = Client(api_key, api_secret, testnet=use_testnet)
# 配置参数
self.config = API_CONFIG.copy()
# 优化并发配置
self.config["concurrent_request_delay"] = 0.2 # 并发请求间隔更短
self.config["chunk_size_hours"] = 24 # 每个时间切片的小时数
# 线程锁
self._lock = threading.Lock()
self._request_times = Queue()
def _rate_limit_check(self):
"""
检查API请求频率限制
"""
current_time = time.time()
# 清理1分钟前的请求记录
while not self._request_times.empty():
if current_time - self._request_times.queue[0] > 60:
self._request_times.get()
else:
break
# 如果1分钟内请求过多,等待
if self._request_times.qsize() >= 1000: # Binance限制
sleep_time = 60 - (current_time - self._request_times.queue[0])
if sleep_time > 0:
logger.warning(f"API频率限制,等待 {sleep_time:.2f} 秒")
time.sleep(sleep_time)
self._request_times.put(current_time)
def _calculate_time_chunks(self, start_time, end_time, interval):
"""
计算时间切片
参数:
- start_time: 开始时间戳(毫秒)
- end_time: 结束时间戳(毫秒)
- interval: 时间间隔
返回:
- list: 时间切片列表 [(start, end), ...]
"""
start_ms = int(start_time)
end_ms = int(end_time)
# 根据时间间隔调整切片大小
interval_minutes = {
'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,
'1h': 60, '2h': 120, '4h': 240, '1d': 1440
}
minutes_per_interval = interval_minutes.get(interval, 60)
# 动态调整切片大小:确保每个切片不超过1000条记录
max_records_per_chunk = 1000
chunk_minutes = min(
self.config["chunk_size_hours"] * 60,
max_records_per_chunk * minutes_per_interval
)
chunk_ms = chunk_minutes * 60 * 1000 # 转换为毫秒
chunks = []
current_start = start_ms
while current_start < end_ms:
current_end = min(current_start + chunk_ms, end_ms)
chunks.append((str(current_start), str(current_end)))
current_start = current_end + 1 # 避免重复
logger.info(f"时间范围切分为 {len(chunks)} 个切片,每切片约 {chunk_minutes/60:.1f} 小时")
return chunks
def _fetch_chunk_data(self, symbol, interval, start_time, end_time, chunk_index):
"""
获取单个时间切片的数据
参数:
- symbol: 交易对符号
- interval: 时间间隔
- start_time: 开始时间戳
- end_time: 结束时间戳
- chunk_index: 切片索引
返回:
- tuple: (chunk_index, klines_data)
"""
retry_count = 0
while retry_count < self.config["max_retries"]:
try:
# 频率限制检查
with self._lock:
self._rate_limit_check()
time.sleep(self.config["concurrent_request_delay"])
# 获取数据
klines = self.client.get_historical_klines(
symbol, interval, start_time, end_time, limit=1000
)
logger.info(f"切片 {chunk_index}: 获取了 {len(klines)} 条记录")
return (chunk_index, klines)
except Exception as e:
retry_count += 1
logger.error(f"切片 {chunk_index} 获取失败 (尝试 {retry_count}/{self.config['max_retries']}): {e}")
if retry_count >= self.config["max_retries"]:
logger.error(f"切片 {chunk_index} 达到最大重试次数")
return (chunk_index, [])
time.sleep(self.config["retry_delay"])
return (chunk_index, [])
def get_historical_klines_optimized(self, symbol, interval, start_time, end_time, use_threading=True):
"""
优化版历史K线数据获取,支持多线程并发
参数:
- symbol: 交易对符号 (如 'BTCUSDT')
- interval: 时间间隔 (如 '1h', '1d')
- start_time: 开始时间 (字符串格式)
- end_time: 结束时间 (字符串格式)
- use_threading: 是否使用多线程
返回:
- K线数据列表
"""
logger.info(f"开始优化获取 {symbol} {interval} 数据")
start_fetch_time = time.time()
# 计算时间切片
chunks = self._calculate_time_chunks(start_time, end_time, interval)
if not use_threading or len(chunks) == 1:
# 单线程模式
logger.info("使用单线程模式")
all_klines = []
for i, (chunk_start, chunk_end) in enumerate(chunks):
_, klines = self._fetch_chunk_data(symbol, interval, chunk_start, chunk_end, i)
all_klines.extend(klines)
else:
# 多线程模式
logger.info(f"使用多线程模式,{self.max_workers} 个工作线程")
all_klines = []
chunk_results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_chunk = {
executor.submit(
self._fetch_chunk_data,
symbol, interval, chunk_start, chunk_end, i
): i
for i, (chunk_start, chunk_end) in enumerate(chunks)
}
# 收集结果
for future in as_completed(future_to_chunk):
chunk_index, klines = future.result()
chunk_results[chunk_index] = klines
logger.info(f"完成切片 {chunk_index}/{len(chunks)-1}")
# 按顺序合并结果
for i in range(len(chunks)):
if i in chunk_results:
all_klines.extend(chunk_results[i])
else:
logger.warning(f"切片 {i} 数据缺失")
fetch_duration = time.time() - start_fetch_time
logger.info(f"优化获取完成,共 {len(all_klines)} 条记录,耗时 {fetch_duration:.2f} 秒")
return all_klines
def get_historical_klines_with_progress(self, symbol, interval, start_time, end_time,
progress_callback=None, save_intermediate=False,
output_file=None):
"""
带进度回调和中间保存的历史K线数据获取
参数:
- symbol: 交易对符号
- interval: 时间间隔
- start_time: 开始时间
- end_time: 结束时间
- progress_callback: 进度回调函数 callback(current, total, chunk_data)
- save_intermediate: 是否保存中间结果
- output_file: 输出文件路径
返回:
- K线数据列表
"""
logger.info(f"开始带进度获取 {symbol} {interval} 数据")
chunks = self._calculate_time_chunks(start_time, end_time, interval)
all_klines = []
# 如果需要保存中间结果,准备文件
if save_intermediate and output_file:
# 创建临时文件头
temp_df = pd.DataFrame(columns=KLINE_COLUMNS)
temp_df.to_csv(output_file, index=False)
logger.info(f"创建输出文件: {output_file}")
for i, (chunk_start, chunk_end) in enumerate(chunks):
logger.info(f"处理切片 {i+1}/{len(chunks)}")
_, klines = self._fetch_chunk_data(symbol, interval, chunk_start, chunk_end, i)
if klines:
all_klines.extend(klines)
# 保存中间结果
if save_intermediate and output_file:
chunk_df = pd.DataFrame(klines, columns=KLINE_COLUMNS)
chunk_df.to_csv(output_file, mode='a', header=False, index=False)
logger.info(f"切片 {i+1} 数据已保存到文件")
# 进度回调
if progress_callback:
progress_callback(i+1, len(chunks), klines)
logger.info(f"带进度获取完成,共 {len(all_klines)} 条记录")
return all_klines
def estimate_fetch_time(self, symbol, interval, days_back):
"""
估算数据获取时间
参数:
- symbol: 交易对符号
- interval: 时间间隔
- days_back: 天数
返回:
- dict: 估算信息
"""
# 计算大概的记录数
interval_minutes = {
'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,
'1h': 60, '2h': 120, '4h': 240, '1d': 1440
}
minutes_per_interval = interval_minutes.get(interval, 60)
total_minutes = days_back * 24 * 60
estimated_records = total_minutes // minutes_per_interval
# 估算请求次数
requests_needed = math.ceil(estimated_records / 1000)
# 估算时间(考虑并发)
single_thread_time = requests_needed * (self.config["request_delay"] + 0.5) # 0.5s per request
multi_thread_time = single_thread_time / min(self.max_workers, requests_needed)
return {
'estimated_records': estimated_records,
'requests_needed': requests_needed,
'single_thread_time_seconds': single_thread_time,
'multi_thread_time_seconds': multi_thread_time,
'speedup_ratio': single_thread_time / multi_thread_time if multi_thread_time > 0 else 1
}
def get_symbol_data_optimized(self, symbol, timeframe, days=7, use_threading=True,
save_to_file=True, output_dir='optimized_data'):
"""
优化版交易对数据获取
参数:
- symbol: 交易对符号
- timeframe: 时间框架
- days: 天数
- use_threading: 是否使用多线程
- save_to_file: 是否保存到文件
- output_dir: 输出目录
返回:
- DataFrame: 处理后的数据
"""
# 创建输出目录
if save_to_file and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 计算时间范围
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
start_timestamp = str(int(start_time.timestamp() * 1000))
end_timestamp = str(int(end_time.timestamp() * 1000))
# 估算获取时间
estimate = self.estimate_fetch_time(symbol, timeframe, days)
logger.info(f"估算: {estimate['estimated_records']} 条记录, "
f"单线程 {estimate['single_thread_time_seconds']:.1f}s, "
f"多线程 {estimate['multi_thread_time_seconds']:.1f}s, "
f"加速比 {estimate['speedup_ratio']:.1f}x")
# 获取数据
klines = self.get_historical_klines_optimized(
symbol, TIMEFRAMES[timeframe], start_timestamp, end_timestamp, use_threading
)
if not klines:
logger.warning("未获取到任何数据")
return pd.DataFrame()
# 转换为DataFrame
df = pd.DataFrame(klines, columns=KLINE_COLUMNS)
# 数据类型转换
numeric_columns = ['open', 'high', 'low', 'close', 'volume',
'quote_asset_volume', 'number_of_trades',
'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 时间戳转换
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['close_time'] = pd.to_datetime(df['close_time'], unit='ms')
# 设置时间戳为索引
df.set_index('timestamp', inplace=True)
# 保存到文件
if save_to_file:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{symbol}_{timeframe}_{days}days_optimized_{timestamp}.csv"
filepath = os.path.join(output_dir, filename)
df.to_csv(filepath)
logger.info(f"优化数据已保存到: {filepath}")
return df
def compare_performance():
"""
性能对比测试
"""
from api_client import BinanceAPIClient
# 创建客户端
original_client = BinanceAPIClient()
optimized_client = OptimizedBinanceAPIClient(max_workers=5)
symbol = 'BTCUSDT'
timeframe = '1m'
days = 7
print(f"\n性能对比测试: {symbol} {timeframe} {days}天数据")
print("=" * 50)
# 估算时间
estimate = optimized_client.estimate_fetch_time(symbol, timeframe, days)
print(f"估算记录数: {estimate['estimated_records']}")
print(f"估算单线程时间: {estimate['single_thread_time_seconds']:.1f}秒")
print(f"估算多线程时间: {estimate['multi_thread_time_seconds']:.1f}秒")
print(f"预期加速比: {estimate['speedup_ratio']:.1f}x")
# 测试优化版(多线程)
print("\n测试优化版(多线程)...")
start_time = time.time()
df_optimized = optimized_client.get_symbol_data_optimized(
symbol, timeframe, days, use_threading=True, save_to_file=False
)
optimized_time = time.time() - start_time
print(f"优化版完成: {len(df_optimized)} 条记录,耗时 {optimized_time:.2f} 秒")
# 测试优化版(单线程)
print("\n测试优化版(单线程)...")
start_time = time.time()
df_single = optimized_client.get_symbol_data_optimized(
symbol, timeframe, days, use_threading=False, save_to_file=False
)
single_time = time.time() - start_time
print(f"单线程版完成: {len(df_single)} 条记录,耗时 {single_time:.2f} 秒")
# 计算实际加速比
if optimized_time > 0:
actual_speedup = single_time / optimized_time
print(f"\n实际加速比: {actual_speedup:.1f}x")
print(f"效率提升: {(1 - optimized_time/single_time)*100:.1f}%")
if __name__ == "__main__":
# 运行性能对比
compare_performance()
调用优化的apiclient,实现任务调度,可以取多个交易对,执行任务。
# -*- coding: utf-8 -*-
"""
批量数据管理器
支持断点续传、数据验证、自动重试和智能调度的高级数据获取工具
"""
import os
import json
import time
import logging
import hashlib
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from api_client_optimized import OptimizedBinanceAPIClient
from config import KLINE_COLUMNS
logger = logging.getLogger(__name__)
class BatchDataManager:
"""
批量数据管理器
提供高级数据获取功能:
- 断点续传
- 数据验证
- 自动重试
- 进度保存
- 智能调度
"""
def __init__(self, output_dir='batch_data', max_workers=8, enable_resume=True):
"""
初始化批量数据管理器
参数:
- output_dir: 输出目录
- max_workers: 最大工作线程数
- enable_resume: 是否启用断点续传
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.max_workers = max_workers
self.enable_resume = enable_resume
# 创建子目录
self.data_dir = self.output_dir / 'data'
self.progress_dir = self.output_dir / 'progress'
self.logs_dir = self.output_dir / 'logs'
for dir_path in [self.data_dir, self.progress_dir, self.logs_dir]:
dir_path.mkdir(exist_ok=True)
# 初始化API客户端
self.client = OptimizedBinanceAPIClient(max_workers=max_workers)
# 任务状态
self.tasks = {}
self.completed_tasks = set()
self.failed_tasks = set()
# 配置日志
self._setup_logging()
def _setup_logging(self):
"""
设置日志配置
"""
log_file = self.logs_dir / f'batch_manager_{datetime.now().strftime("%Y%m%d")}.log'
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
def _generate_task_id(self, symbol: str, interval: str, start_time: str, end_time: str) -> str:
"""
生成任务ID
"""
task_str = f"{symbol}_{interval}_{start_time}_{end_time}"
return hashlib.md5(task_str.encode()).hexdigest()[:12]
def _get_progress_file(self, task_id: str) -> Path:
"""
获取进度文件路径
"""
return self.progress_dir / f"{task_id}_progress.json"
def _save_progress(self, task_id: str, progress_data: Dict):
"""
保存进度信息
"""
progress_file = self._get_progress_file(task_id)
progress_data['last_update'] = datetime.now().isoformat()
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(progress_data, f, indent=2, ensure_ascii=False)
def _load_progress(self, task_id: str) -> Optional[Dict]:
"""
加载进度信息
"""
progress_file = self._get_progress_file(task_id)
if not progress_file.exists():
return None
try:
with open(progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载进度文件失败: {e}")
return None
def _validate_data_integrity(self, file_path: Path, expected_records: int = None) -> bool:
"""
验证数据完整性
参数:
- file_path: 数据文件路径
- expected_records: 预期记录数
返回:
- bool: 数据是否完整
"""
try:
if not file_path.exists():
return False
# 读取数据
df = pd.read_csv(file_path)
# 检查基本结构
if df.empty:
logger.warning(f"数据文件为空: {file_path}")
return False
# 检查必要列
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"数据文件缺少必要列 {missing_columns}: {file_path}")
return False
# 检查记录数
if expected_records and len(df) < expected_records * 0.95: # 允许5%的误差
logger.warning(f"数据记录数不足,期望 {expected_records},实际 {len(df)}: {file_path}")
return False
# 检查时间序列连续性
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
# 检查是否有重复时间戳
if df['timestamp'].duplicated().any():
logger.warning(f"发现重复时间戳: {file_path}")
return False
logger.info(f"数据验证通过: {file_path} ({len(df)} 条记录)")
return True
except Exception as e:
logger.error(f"数据验证失败: {file_path}, 错误: {e}")
return False
def add_task(self, symbol: str, interval: str, days: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
priority: int = 1) -> str:
"""
添加数据获取任务
参数:
- symbol: 交易对符号
- interval: 时间间隔
- days: 天数(如果未指定start_date和end_date)
- start_date: 开始日期
- end_date: 结束日期
- priority: 优先级(数字越小优先级越高)
返回:
- str: 任务ID
"""
# 计算时间范围
if start_date and end_date:
start_time = start_date
end_time = end_date
else:
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
start_timestamp = str(int(start_time.timestamp() * 1000))
end_timestamp = str(int(end_time.timestamp() * 1000))
# 生成任务ID
task_id = self._generate_task_id(symbol, interval, start_timestamp, end_timestamp)
# 创建任务信息
task_info = {
'task_id': task_id,
'symbol': symbol,
'interval': interval,
'start_time': start_timestamp,
'end_time': end_timestamp,
'start_date': start_time.isoformat(),
'end_date': end_time.isoformat(),
'days': (end_time - start_time).days,
'priority': priority,
'status': 'pending',
'created_at': datetime.now().isoformat(),
'attempts': 0,
'max_attempts': 3
}
self.tasks[task_id] = task_info
logger.info(f"添加任务: {symbol} {interval} {task_info['days']}天 (ID: {task_id})")
return task_id
def _execute_task(self, task_id: str) -> bool:
"""
执行单个任务
参数:
- task_id: 任务ID
返回:
- bool: 是否成功
"""
task = self.tasks[task_id]
try:
# 更新任务状态
task['status'] = 'running'
task['attempts'] += 1
task['start_time_actual'] = datetime.now().isoformat()
logger.info(f"开始执行任务 {task_id}: {task['symbol']} {task['interval']} (尝试 {task['attempts']}/{task['max_attempts']})")
# 检查是否可以断点续传
output_file = self.data_dir / f"{task['symbol']}_{task['interval']}_{task['days']}days_{task_id}.csv"
if self.enable_resume and output_file.exists():
# 验证现有数据
if self._validate_data_integrity(output_file):
logger.info(f"任务 {task_id} 数据已存在且完整,跳过")
task['status'] = 'completed'
task['end_time_actual'] = datetime.now().isoformat()
self.completed_tasks.add(task_id)
return True
else:
logger.warning(f"任务 {task_id} 现有数据不完整,重新获取")
output_file.unlink() # 删除损坏的文件
# 进度回调
def progress_callback(current, total, chunk_data):
progress = (current / total) * 100
progress_data = {
'task_id': task_id,
'current_chunk': current,
'total_chunks': total,
'progress_percent': progress,
'records_in_chunk': len(chunk_data),
'status': 'running'
}
self._save_progress(task_id, progress_data)
logger.info(f"任务 {task_id} 进度: {current}/{total} ({progress:.1f}%)")
# 获取数据
klines = self.client.get_historical_klines_with_progress(
symbol=task['symbol'],
interval=task['interval'],
start_time=task['start_time'],
end_time=task['end_time'],
progress_callback=progress_callback,
save_intermediate=True,
output_file=str(output_file)
)
# 验证结果
if not klines:
raise ValueError("未获取到任何数据")
# 验证文件完整性
if not self._validate_data_integrity(output_file, len(klines)):
raise ValueError("数据完整性验证失败")
# 更新任务状态
task['status'] = 'completed'
task['end_time_actual'] = datetime.now().isoformat()
task['records_count'] = len(klines)
task['output_file'] = str(output_file)
# 保存最终进度
final_progress = {
'task_id': task_id,
'status': 'completed',
'records_count': len(klines),
'output_file': str(output_file)
}
self._save_progress(task_id, final_progress)
self.completed_tasks.add(task_id)
logger.info(f"任务 {task_id} 完成: {len(klines):,} 条记录")
return True
except Exception as e:
# 任务失败
task['status'] = 'failed'
task['error'] = str(e)
task['end_time_actual'] = datetime.now().isoformat()
logger.error(f"任务 {task_id} 失败 (尝试 {task['attempts']}/{task['max_attempts']}): {e}")
# 保存错误进度
error_progress = {
'task_id': task_id,
'status': 'failed',
'error': str(e),
'attempts': task['attempts']
}
self._save_progress(task_id, error_progress)
if task['attempts'] >= task['max_attempts']:
self.failed_tasks.add(task_id)
logger.error(f"任务 {task_id} 达到最大重试次数,标记为失败")
else:
task['status'] = 'pending' # 重新排队
logger.info(f"任务 {task_id} 将重新尝试")
return False
def execute_all_tasks(self, max_concurrent=3):
"""
执行所有任务
参数:
- max_concurrent: 最大并发任务数
"""
logger.info(f"开始执行批量任务,共 {len(self.tasks)} 个任务")
# 按优先级排序任务
sorted_tasks = sorted(
self.tasks.items(),
key=lambda x: (x[1]['priority'], x[1]['created_at'])
)
total_tasks = len(sorted_tasks)
completed_count = 0
failed_count = 0
start_time = time.time()
for task_id, task in sorted_tasks:
if task_id in self.completed_tasks:
completed_count += 1
continue
if task_id in self.failed_tasks:
failed_count += 1
continue
# 执行任务
success = self._execute_task(task_id)
if success:
completed_count += 1
else:
if task['attempts'] >= task['max_attempts']:
failed_count += 1
# 显示总体进度
progress = ((completed_count + failed_count) / total_tasks) * 100
elapsed = time.time() - start_time
logger.info(f"总体进度: {completed_count + failed_count}/{total_tasks} ({progress:.1f}%) - "
f"成功: {completed_count}, 失败: {failed_count}, 耗时: {elapsed:.1f}s")
total_time = time.time() - start_time
logger.info(f"批量任务执行完成!")
logger.info(f"总任务数: {total_tasks}")
logger.info(f"成功: {completed_count}")
logger.info(f"失败: {failed_count}")
logger.info(f"总耗时: {total_time:.1f} 秒 ({total_time/60:.1f} 分钟)")
return {
'total': total_tasks,
'completed': completed_count,
'failed': failed_count,
'duration': total_time
}
def get_task_status(self, task_id: str = None) -> Dict:
"""
获取任务状态
参数:
- task_id: 任务ID,如果为None则返回所有任务状态
返回:
- Dict: 任务状态信息
"""
if task_id:
if task_id in self.tasks:
task = self.tasks[task_id].copy()
progress = self._load_progress(task_id)
if progress:
task['progress'] = progress
return task
else:
return {'error': f'任务 {task_id} 不存在'}
else:
# 返回所有任务的摘要
summary = {
'total_tasks': len(self.tasks),
'completed': len(self.completed_tasks),
'failed': len(self.failed_tasks),
'pending': len(self.tasks) - len(self.completed_tasks) - len(self.failed_tasks),
'tasks': {}
}
for tid, task in self.tasks.items():
summary['tasks'][tid] = {
'symbol': task['symbol'],
'interval': task['interval'],
'days': task['days'],
'status': task['status'],
'attempts': task.get('attempts', 0)
}
return summary
def cleanup_failed_tasks(self):
"""
清理失败任务的临时文件
"""
cleaned_count = 0
for task_id in self.failed_tasks:
task = self.tasks[task_id]
# 删除可能的部分文件
output_file = self.data_dir / f"{task['symbol']}_{task['interval']}_{task['days']}days_{task_id}.csv"
if output_file.exists():
output_file.unlink()
cleaned_count += 1
logger.info(f"清理失败任务文件: {output_file}")
# 删除进度文件
progress_file = self._get_progress_file(task_id)
if progress_file.exists():
progress_file.unlink()
logger.info(f"清理完成,删除了 {cleaned_count} 个失败任务的文件")
def export_summary_report(self) -> str:
"""
导出摘要报告
返回:
- str: 报告文件路径
"""
report_file = self.output_dir / f'batch_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
report = {
'generated_at': datetime.now().isoformat(),
'summary': self.get_task_status(),
'detailed_tasks': {}
}
for task_id, task in self.tasks.items():
detailed_task = task.copy()
progress = self._load_progress(task_id)
if progress:
detailed_task['progress'] = progress
report['detailed_tasks'][task_id] = detailed_task
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
logger.info(f"摘要报告已导出: {report_file}")
return str(report_file)
def create_preset_tasks(manager: BatchDataManager, preset_name: str = 'popular_pairs'):
"""
创建预设任务
参数:
- manager: 批量数据管理器
- preset_name: 预设名称
"""
presets = {
'popular_pairs': [
('BTCUSDT', '1m', 30),
('ETHUSDT', '1m', 30),
('BNBUSDT', '1m', 30),
('ADAUSDT', '5m', 60),
('SOLUSDT', '5m', 60),
('DOTUSDT', '15m', 90),
('LINKUSDT', '15m', 90),
],
'major_pairs_hourly': [
('BTCUSDT', '1h', 365),
('ETHUSDT', '1h', 365),
('BNBUSDT', '1h', 180),
('ADAUSDT', '1h', 180),
],
'quick_test': [
('BTCUSDT', '1h', 7),
('ETHUSDT', '1h', 7),
]
}
if preset_name not in presets:
logger.error(f"未知预设: {preset_name}")
return
tasks = presets[preset_name]
task_ids = []
for symbol, interval, days in tasks:
task_id = manager.add_task(symbol, interval, days)
task_ids.append(task_id)
logger.info(f"创建预设任务 '{preset_name}': {len(task_ids)} 个任务")
return task_ids
if __name__ == "__main__":
# 示例使用
manager = BatchDataManager(output_dir='advanced_batch_data', max_workers=6)
# 创建预设任务
print("创建预设任务...")
create_preset_tasks(manager, 'quick_test')
# 显示任务状态
print("\n任务状态:")
status = manager.get_task_status()
print(f"总任务数: {status['total_tasks']}")
print(f"待执行: {status['pending']}")
# 执行所有任务
print("\n开始执行任务...")
result = manager.execute_all_tasks()
# 导出报告
report_file = manager.export_summary_report()
print(f"\n报告已导出: {report_file}")