一个量化交易系统的诞生记
量化之路酝酿良久,终于迈步。
导言
这个项目的主要目的是
- 熟悉搭建量化交易系统的主要流程;
- 验证一个简单的套利策略的可行性。
交易的标的是BTC现货和合约。
架构设计
1 | graph LR |
模块搭建
数据模块
- 数据源:
- Binance API
- 第三方库:ccxt
- 实现功能:
- 下载数据;
- 缓存数据。
- 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148import ccxt
import pandas as pd
import pickle
import time
import logging
from tqdm import tqdm
from datetime import datetime, timezone
ENABLE_DEBUG = True
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class DataFetcher:
def __init__(self, exchange_name='binance'):
# 初始化时不再强制使用默认交易所逻辑,fetch_data中会根据参数创建交易所实例
self.default_exchange_name = exchange_name
self.proxy_url = 'http://127.0.0.1:7890'
self.timezone = 'Asia/Shanghai'
def fetch_data(self, symbol='BTC/USDT',start_time = "2024-01-01 00:00:00", range = '30d', timeframe='5m', contract_type='spot', data_source='binance'):
"""获取指定数据来源的现货或合约数据"""
# 根据CONFIG计算时间范围
start_time_ms = int(datetime.strptime(start_time,
# 起始时间戳
"%Y-%m-%d %H:%M:%S").timestamp())*1000
range_ms = self._parse_range(range)
end_time = start_time_ms + range_ms
file_year = datetime.fromtimestamp(
start_time_ms/1000, tz=timezone.utc).strftime('%Y-%m-%d-%H:%M:%S')
file_base = f'{data_source}_{contract_type}_{symbol.replace("/", "")}_{timeframe}_{file_year}_{range}'
cache_pkl = f'database/{file_base}.pkl'
cache_csv = f'database/{file_base}.csv'
# 尝试加载缓存数据
try:
with open(cache_pkl, 'rb') as f:
all_ohlcv = pickle.load(f)
df = self._process_data(all_ohlcv)
logging.info("加载缓存数据成功")
return df
except FileNotFoundError:
logging.info("未找到缓存数据,开始从交易所获取数据...")
# 根据 data_source 创建交易所实例
exchange = getattr(ccxt, data_source)({
'enableRateLimit': True,
'options': {'adjustForTimeDifference': True},
'proxies': {'http': self.proxy_url, 'https': self.proxy_url},
'timeout': 30000
})
# 设置市场类型
exchange.options['defaultType'] = contract_type
if contract_type == 'future':
exchange.options['defaultSettle'] = 'usdt'
all_ohlcv = []
since = start_time_ms
total_range = end_time - start_time_ms
progress_bar = tqdm(total=total_range, desc="下载进度", unit="ms")
while since < end_time:
batch_limit = 1000
try:
batch = exchange.fetch_ohlcv(
symbol=symbol,
timeframe=timeframe,
since=since,
limit=batch_limit
)
if not batch:
logging.info("无更多数据可获取,退出循环")
break
all_ohlcv.extend(batch)
new_since = batch[-1][0] + \
self._timeframe_to_ms(timeframe)
if new_since <= since:
logging.warning("时间戳未更新,退出循环以避免无限循环")
break
update_amount = min(new_since, end_time) - since
progress_bar.update(update_amount)
since = new_since
time.sleep(exchange.rateLimit / 1000)
except ccxt.RequestTimeout as e:
logging.error(f"请求超时: {e}")
time.sleep(30)
continue
except ccxt.ExchangeError as e:
logging.error(f"交易所错误: {e}")
break
except Exception as e:
logging.exception("未知错误:")
break
progress_bar.close()
# 保存下载数据到缓存,并在文件名中包含数据来源信息
try:
with open(cache_pkl, 'wb') as f:
pickle.dump(all_ohlcv, f)
df = self._process_data(all_ohlcv)
with open(cache_csv, 'w') as f:
f.write(df.to_csv())
logging.info(f"数据保存成功,保存文件: {cache_csv}")
except Exception as e:
logging.error(f"缓存数据保存失败: {e}")
return df
def _process_data(self, data):
df = pd.DataFrame(
data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
def _timeframe_to_ms(self, timeframe):
"""将 timeframe 字符串转换为毫秒值,仅支持 '5m',可扩展支持更多"""
if timeframe == '5m':
return 5 * 60 * 1000
else:
raise ValueError(f"不支持的时间周期: {timeframe}")
def _parse_range(self, range_str):
"""将 range 字符串转换为毫秒值,支持格式 '1d'、'1M'、'1y'"""
unit = range_str[-1]
try:
num = int(range_str[:-1])
except ValueError:
raise ValueError(f"range 格式错误: {range_str}")
if unit.lower() == 'd':
return num * 24 * 60 * 60 * 1000
elif unit.lower() == 'm':
return num * 30 * 24 * 60 * 60 * 1000 # 近似1个月按30天计算
elif unit.lower() == 'y':
return num * 365 * 24 * 60 * 60 * 1000 # 按365天计算
else:
raise ValueError(f"不支持的 range 单位: {unit}")
if __name__ == '__main__':
fetcher = DataFetcher()
logging.info('正在获取数据...')
spot = fetcher.fetch_data(symbol='ETH/USDT', start_time='2025-01-01 00:00:00', range='30d', timeframe='5m', data_source='binance')
logging.info('数据获取完成')
logging.info(f'数据量: {len(spot)}')
logging.info('数据预览:')
logging.info(spot.head())
策略模块
这里选择的策略是一个简单的统计套利(Pairs Trading)。标的是BTC现货和U本位永续合约。基本逻辑:假设价差满足均值回归,在价差扩大时做空价差、在价差缩小时做多价差。
这里偷个懒,直接把信号放进市场数据那个dataframe里,后续可以再模块化,这样程序逻辑会更清晰。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106import pandas as pd
import numpy as np
from datafetcher import DataFetcher
from datetime import datetime
ENABLE_DEBUG = True
class MeanReversionStrategy:
"""均值回归策略"""
def __init__(self):
self.SYMBOL = 'BTC/USDT'
self.TIMEFRAME = '15m'
self.RANGE = '180d'
self.START_TIME = '2024-01-01 00:00:00'
self.zscore_window = 12 # 窗口
self.zscore_threshold = 1.8 # 阈值
SPOT_FUTURE_PORTION = 0.5 # 现货和合约仓位比例
def load_data(self):
"""导入数据"""
fetcher = DataFetcher()
spot = fetcher.fetch_data(symbol=self.SYMBOL, start_time=self.START_TIME, range=self.RANGE, timeframe=self.TIMEFRAME, contract_type='spot')[['close']].rename(
columns={'close': 'spot'})
future = fetcher.fetch_data(symbol=self.SYMBOL, start_time=self.START_TIME, range=self.RANGE, timeframe=self.TIMEFRAME, contract_type='future')[['close']].rename(
columns={'close': 'future'})
# 合并数据
self.market_data = pd.merge(spot, future, left_index=True,
right_index=True, how='inner')
# 调试
if ENABLE_DEBUG:
spot.to_csv('debug/spot.csv')
future.to_csv('debug/future.csv')
def generate_signals(self):
"""生成交易信号"""
zscore_window = self.zscore_window
zscore_threshold = self.zscore_threshold
# 计算溢价
self.market_data['premium'] = self.market_data['spot'] - self.market_data['future']
# 计算溢价率
self.market_data['premium_pct'] = (
# (现货价格 - 合约价格) / 现货价格
self.market_data['spot'] - self.market_data['future']) / self.market_data['spot'] * 100
# 计算统计指标
self.market_data['mean_premium_pct'] = self.market_data['premium_pct'].rolling(
zscore_window).mean()
self.market_data['std'] = self.market_data['premium_pct'].rolling(
zscore_window).std()
self.market_data['zscore'] = (self.market_data['premium_pct'] -
self.market_data['mean_premium_pct']) / self.market_data['std']
# 生成原始信号
self.market_data['raw_signal'] = 0
self.market_data.loc[self.market_data['zscore'] > zscore_threshold,
'raw_signal'] = -1 # 溢价率高于阈值,做空溢价, 同时做空现货、做多合约
self.market_data.loc[self.market_data['zscore'] < -zscore_threshold,
'raw_signal'] = 1 # 溢价率低于阈值,做多溢价, 同时做多现货、做空合约
self.market_data['signal'] = self.market_data['raw_signal']
# 计算 20 SMA
self.market_data['SMA'] = self.market_data['premium_pct'].rolling(
window=2*zscore_window).mean()
# 过滤低波动时段
self.market_data.loc[self.market_data['volatility'] <
0.01, 'signal'] = 0
# 调试
if ENABLE_DEBUG:
self.market_data[['signal']].to_csv('debug/signals.csv')
return self.market_data
if __name__ == '__main__':
engine = MeanReversionStrategy()
engine.load_data()
engine.generate_signals()
from visualizer import Visualizer
visualizer = Visualizer()
visualizer.link_strategy(engine)
visualizer.load_data(engine.market_data)
visualizer.plot_signals(engine.market_data)
# from backtest import run_backtest
# trade_history, metrics = run_backtest(engine.market_data) # 调用回测模块
# print(f'[{datetime.now()}] 回测完成。')
# if not trade_history.empty:
# print(f'[{datetime.now()}] 交易记录:')
# print(trade_history.head())
# if ENABLE_DEBUG:
# trade_history.to_csv('debug/trade_history.csv')
# if metrics:
# print(f'[{datetime.now()}] 交易指标:')
# for k, v in metrics.items():
# print(f'{k}: {v}')
回测模块
有很多现成的回测框架,如backtrader之类。这里自己写一个run_backtest函数。
实现功能:
- 根据信号进行交易;
- 做多信号:做多现货,做空合约;
- 做空信号:做空现货,做多合约;
- 仓位管理,可以根据价差的zscore调整杠杆率;
- 记录仓位;
- 计算输出指标:
- PnL
- 最大回撤
- 修复时间
- 夏普比率
1 | import pandas as pd |
回测结果:1
2
3
4
5
6
7
8
9
10初始资金: 10000
最终资金: 10651.176791099295
总收益: 651.1767910992949
年化收益率: 815.442016678132 %
胜率: 67.85714285714286 %
交易次数: 28
平均持仓时间: 0.7589285714285714 h
最大回撤: -1.9089572651408093 %
最大回撤修复时间: 125.5 h
夏普比率: 23.28055581414463
可视化模块
使用matplotlib。模块化绘图函数。加入绘图函数注册机制,方便以后拓展。
1 | import pandas as pd |
可视化结果:
进阶方向
不一而足,方向很多:
- 多因子模型开发;
- 结合期权定价与波动率交易;
- 基于强化学习的超参数优化。