量化之路酝酿良久,终于迈步。


导言

这个项目的主要目的是

  1. 熟悉搭建量化交易系统的主要流程;
  2. 验证一个简单的套利策略的可行性。

交易的标的是BTC现货和合约。

架构设计

1
2
3
4
graph LR
A[数据模块] --> B[策略模块]
B --> C[回测模块] --> E[可视化模块]
B --> D[交易模块] --> E

模块搭建

数据模块

  • 数据源
    • Binance API
    • 第三方库:ccxt
  • 实现功能:
    1. 下载数据;
    2. 缓存数据。
  • 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    import ccxt
    import pandas as pd
    import pickle
    import time
    import logging
    from tqdm import tqdm
    from datetime import datetime, timezone

    ENABLE_DEBUG = True

    logging.basicConfig(level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s')


    class DataFetcher:
    def __init__(self, exchange_name='binance'):
    # 初始化时不再强制使用默认交易所逻辑,fetch_data中会根据参数创建交易所实例
    self.default_exchange_name = exchange_name
    self.proxy_url = 'http://127.0.0.1:7890'
    self.timezone = 'Asia/Shanghai'

    def fetch_data(self, symbol='BTC/USDT',start_time = "2024-01-01 00:00:00", range = '30d', timeframe='5m', contract_type='spot', data_source='binance'):
    """获取指定数据来源的现货或合约数据"""
    # 根据CONFIG计算时间范围
    start_time_ms = int(datetime.strptime(start_time,
    # 起始时间戳
    "%Y-%m-%d %H:%M:%S").timestamp())*1000
    range_ms = self._parse_range(range)
    end_time = start_time_ms + range_ms
    file_year = datetime.fromtimestamp(
    start_time_ms/1000, tz=timezone.utc).strftime('%Y-%m-%d-%H:%M:%S')
    file_base = f'{data_source}_{contract_type}_{symbol.replace("/", "")}_{timeframe}_{file_year}_{range}'
    cache_pkl = f'database/{file_base}.pkl'
    cache_csv = f'database/{file_base}.csv'

    # 尝试加载缓存数据
    try:
    with open(cache_pkl, 'rb') as f:
    all_ohlcv = pickle.load(f)
    df = self._process_data(all_ohlcv)
    logging.info("加载缓存数据成功")
    return df
    except FileNotFoundError:
    logging.info("未找到缓存数据,开始从交易所获取数据...")

    # 根据 data_source 创建交易所实例
    exchange = getattr(ccxt, data_source)({
    'enableRateLimit': True,
    'options': {'adjustForTimeDifference': True},
    'proxies': {'http': self.proxy_url, 'https': self.proxy_url},
    'timeout': 30000
    })

    # 设置市场类型
    exchange.options['defaultType'] = contract_type
    if contract_type == 'future':
    exchange.options['defaultSettle'] = 'usdt'

    all_ohlcv = []
    since = start_time_ms
    total_range = end_time - start_time_ms
    progress_bar = tqdm(total=total_range, desc="下载进度", unit="ms")
    while since < end_time:
    batch_limit = 1000
    try:
    batch = exchange.fetch_ohlcv(
    symbol=symbol,
    timeframe=timeframe,
    since=since,
    limit=batch_limit
    )
    if not batch:
    logging.info("无更多数据可获取,退出循环")
    break
    all_ohlcv.extend(batch)
    new_since = batch[-1][0] + \
    self._timeframe_to_ms(timeframe)
    if new_since <= since:
    logging.warning("时间戳未更新,退出循环以避免无限循环")
    break
    update_amount = min(new_since, end_time) - since
    progress_bar.update(update_amount)
    since = new_since
    time.sleep(exchange.rateLimit / 1000)
    except ccxt.RequestTimeout as e:
    logging.error(f"请求超时: {e}")
    time.sleep(30)
    continue
    except ccxt.ExchangeError as e:
    logging.error(f"交易所错误: {e}")
    break
    except Exception as e:
    logging.exception("未知错误:")
    break
    progress_bar.close()

    # 保存下载数据到缓存,并在文件名中包含数据来源信息
    try:
    with open(cache_pkl, 'wb') as f:
    pickle.dump(all_ohlcv, f)
    df = self._process_data(all_ohlcv)
    with open(cache_csv, 'w') as f:
    f.write(df.to_csv())
    logging.info(f"数据保存成功,保存文件: {cache_csv}")
    except Exception as e:
    logging.error(f"缓存数据保存失败: {e}")
    return df

    def _process_data(self, data):
    df = pd.DataFrame(
    data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

    def _timeframe_to_ms(self, timeframe):
    """将 timeframe 字符串转换为毫秒值,仅支持 '5m',可扩展支持更多"""
    if timeframe == '5m':
    return 5 * 60 * 1000
    else:
    raise ValueError(f"不支持的时间周期: {timeframe}")

    def _parse_range(self, range_str):
    """将 range 字符串转换为毫秒值,支持格式 '1d'、'1M'、'1y'"""
    unit = range_str[-1]
    try:
    num = int(range_str[:-1])
    except ValueError:
    raise ValueError(f"range 格式错误: {range_str}")
    if unit.lower() == 'd':
    return num * 24 * 60 * 60 * 1000
    elif unit.lower() == 'm':
    return num * 30 * 24 * 60 * 60 * 1000 # 近似1个月按30天计算
    elif unit.lower() == 'y':
    return num * 365 * 24 * 60 * 60 * 1000 # 按365天计算
    else:
    raise ValueError(f"不支持的 range 单位: {unit}")


    if __name__ == '__main__':
    fetcher = DataFetcher()
    logging.info('正在获取数据...')
    spot = fetcher.fetch_data(symbol='ETH/USDT', start_time='2025-01-01 00:00:00', range='30d', timeframe='5m', data_source='binance')
    logging.info('数据获取完成')
    logging.info(f'数据量: {len(spot)}')
    logging.info('数据预览:')
    logging.info(spot.head())

策略模块

这里选择的策略是一个简单的统计套利(Pairs Trading)。标的是BTC现货和U本位永续合约。基本逻辑:假设价差满足均值回归,在价差扩大时做空价差、在价差缩小时做多价差。

这里偷个懒,直接把信号放进市场数据那个dataframe里,后续可以再模块化,这样程序逻辑会更清晰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import pandas as pd
import numpy as np
from datafetcher import DataFetcher
from datetime import datetime

ENABLE_DEBUG = True

class MeanReversionStrategy:
"""均值回归策略"""
def __init__(self):

self.SYMBOL = 'BTC/USDT'
self.TIMEFRAME = '15m'
self.RANGE = '180d'
self.START_TIME = '2024-01-01 00:00:00'

self.zscore_window = 12 # 窗口
self.zscore_threshold = 1.8 # 阈值
SPOT_FUTURE_PORTION = 0.5 # 现货和合约仓位比例

def load_data(self):
"""导入数据"""

fetcher = DataFetcher()
spot = fetcher.fetch_data(symbol=self.SYMBOL, start_time=self.START_TIME, range=self.RANGE, timeframe=self.TIMEFRAME, contract_type='spot')[['close']].rename(
columns={'close': 'spot'})
future = fetcher.fetch_data(symbol=self.SYMBOL, start_time=self.START_TIME, range=self.RANGE, timeframe=self.TIMEFRAME, contract_type='future')[['close']].rename(
columns={'close': 'future'})

# 合并数据
self.market_data = pd.merge(spot, future, left_index=True,
right_index=True, how='inner')
# 调试
if ENABLE_DEBUG:
spot.to_csv('debug/spot.csv')
future.to_csv('debug/future.csv')

def generate_signals(self):
"""生成交易信号"""

zscore_window = self.zscore_window
zscore_threshold = self.zscore_threshold

# 计算溢价
self.market_data['premium'] = self.market_data['spot'] - self.market_data['future']

# 计算溢价率
self.market_data['premium_pct'] = (
# (现货价格 - 合约价格) / 现货价格
self.market_data['spot'] - self.market_data['future']) / self.market_data['spot'] * 100

# 计算统计指标
self.market_data['mean_premium_pct'] = self.market_data['premium_pct'].rolling(
zscore_window).mean()
self.market_data['std'] = self.market_data['premium_pct'].rolling(
zscore_window).std()
self.market_data['zscore'] = (self.market_data['premium_pct'] -
self.market_data['mean_premium_pct']) / self.market_data['std']

# 生成原始信号
self.market_data['raw_signal'] = 0
self.market_data.loc[self.market_data['zscore'] > zscore_threshold,
'raw_signal'] = -1 # 溢价率高于阈值,做空溢价, 同时做空现货、做多合约
self.market_data.loc[self.market_data['zscore'] < -zscore_threshold,
'raw_signal'] = 1 # 溢价率低于阈值,做多溢价, 同时做多现货、做空合约

self.market_data['signal'] = self.market_data['raw_signal']

# 计算 20 SMA
self.market_data['SMA'] = self.market_data['premium_pct'].rolling(
window=2*zscore_window).mean()

# 过滤低波动时段
self.market_data.loc[self.market_data['volatility'] <
0.01, 'signal'] = 0

# 调试
if ENABLE_DEBUG:
self.market_data[['signal']].to_csv('debug/signals.csv')

return self.market_data


if __name__ == '__main__':
engine = MeanReversionStrategy()
engine.load_data()
engine.generate_signals()

from visualizer import Visualizer
visualizer = Visualizer()
visualizer.link_strategy(engine)
visualizer.load_data(engine.market_data)
visualizer.plot_signals(engine.market_data)

# from backtest import run_backtest
# trade_history, metrics = run_backtest(engine.market_data) # 调用回测模块
# print(f'[{datetime.now()}] 回测完成。')
# if not trade_history.empty:
# print(f'[{datetime.now()}] 交易记录:')
# print(trade_history.head())
# if ENABLE_DEBUG:
# trade_history.to_csv('debug/trade_history.csv')
# if metrics:
# print(f'[{datetime.now()}] 交易指标:')
# for k, v in metrics.items():
# print(f'{k}: {v}')

回测模块

有很多现成的回测框架,如backtrader之类。这里自己写一个run_backtest函数。

实现功能:

  1. 根据信号进行交易;
    • 做多信号:做多现货,做空合约;
    • 做空信号:做空现货,做多合约;
  2. 仓位管理,可以根据价差的zscore调整杠杆率;
  3. 记录仓位;
  4. 计算输出指标:
    • PnL
    • 最大回撤
    • 修复时间
    • 夏普比率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import pandas as pd
import numpy as np
import math

INITIAL_CAPITAL = 10000 # Example initial capital
FEE_RATE = {'spot': 0.001, 'future': 0.0001} # Example fee rates
SLIPPAGE = 0.0005 # Example slippage value
ENABLE_FEE = False
ENABLE_SLIPPAGE = False
LEVERAGE = 20 # Example leverage
TAKE_PROFIT = 0.02 # Example take profit threshold
STOP_LOSS = 0.03 # Example stop loss threshold
POSITION_RATIO = 0.1 # Example position ratio
ENABLE_DEBUG = True


def run_backtest(df, initial_capital=INITIAL_CAPITAL) -> tuple:
# Initialize parameters and state variables
use_fee = ENABLE_FEE
use_slippage = ENABLE_SLIPPAGE
fee_spot = FEE_RATE['spot'] if use_fee else 0
fee_future = FEE_RATE['future'] if use_fee else 0
slippage = SLIPPAGE if use_slippage else 0
fraction = {'spot': fee_spot + slippage, 'future': fee_future + slippage}
position_history = []
current_capital = initial_capital
position_size = 0
entry_spot_price = None
entry_future_price = None
entry_time = None
position_direction = 0 # 0: no position, 1: long, -1: short
current_position = {}

for time, row in df.iterrows():
# 持仓则计算盈亏
if position_direction != 0:
exit_spot_price = row['spot']
exit_future_price = row['future']
if position_direction == 1:
spot_pnl = (exit_spot_price / entry_spot_price) * \
(1 - fraction['spot'])**2 - 1
future_pnl = (1 - fraction['future']) - (exit_future_price /
entry_future_price) / (1 - fraction['future'])
pnl = spot_pnl * 0.5 + future_pnl * 0.5 * leverage
else:
spot_pnl = (1 - fraction['spot']) - (exit_spot_price /
entry_spot_price) / (1 - fraction['spot'])
future_pnl = (exit_future_price / entry_future_price) * \
(1 - fraction['future'])**2 - 1
pnl = spot_pnl * 0.5 + future_pnl * 0.5 * leverage

# 平仓
if pnl < -1:
current_capital += (1 + pnl) * position_size
current_position['pnl'] = pnl * 100
current_position['exit_time'] = time
current_position['duration'] = (
time - entry_time).total_seconds() / 3600
current_position['close_type'] = '爆仓'
current_position['final_capital'] = current_capital
position_history.append(current_position)

position_direction = 0
elif pnl >= TAKE_PROFIT or pnl <= -STOP_LOSS: # 止盈止损
current_capital += (1 + pnl) * position_size
current_position['pnl'] = pnl * 100
current_position['exit_time'] = time
current_position['duration'] = (
time - entry_time).total_seconds() / 3600
current_position['close_type'] = '止盈' if pnl >= TAKE_PROFIT else '止损'
current_position['final_capital'] = current_capital
position_history.append(current_position)

position_direction = 0
elif (row['signal'] == -position_direction) and (pnl > 0): # 反向
current_capital += (1 + pnl) * position_size
current_position['pnl'] = pnl * 100
current_position['exit_time'] = time
current_position['duration'] = (
time - entry_time).total_seconds() / 3600
current_position['close_type'] = '反向'
current_position['final_capital'] = current_capital
position_history.append(current_position)

position_direction = 0
# 开仓
elif row['signal'] != 0: # 允许多次同向开仓
entry_spot_price = row['spot']
entry_future_price = row['future']
entry_time = time
position_direction = row['signal']
leverage = LEVERAGE
position_size = current_capital * POSITION_RATIO
current_position = {
'initial_capital': current_capital,
'type': 'long' if position_direction == 1 else 'short',
'entry_time': entry_time,
'leverage': leverage
}
current_capital -= position_size

position_history = pd.DataFrame(position_history)
if ENABLE_DEBUG:
position_history.to_csv('debug/position_history.csv')

# 交易结束,计算指标

if position_history.empty:
print("没有交易执行。")
metrics = {}
else:
initial_capital_value = initial_capital
final_capital_value = position_history['final_capital'].iloc[-1]
total_profit = final_capital_value - initial_capital_value
total_trades = len(position_history)

profitable_trades = position_history[position_history['pnl'] > 0]
win_rate = len(profitable_trades) / total_trades * 100

avg_duration = position_history['duration'].mean(
) if 'duration' in position_history.columns else 0

start_time = df.index[0]
end_time = df.index[-1]
time_in_years = (end_time - start_time).total_seconds() / \
(365.25 * 24 * 3600)
annualized_return = ((final_capital_value / initial_capital_value)
** (1 / time_in_years) - 1) * 100 if time_in_years > 0 else np.nan

max_drawdown = (position_history['final_capital'] /
position_history['final_capital'].cummax() - 1).min() * 100

cumpnl = position_history['pnl'].cumsum()
drawdown_series = cumpnl - cumpnl.cummax()
if drawdown_series.empty:
max_drawdown_recovery_time = 0
else:
trough_idx = drawdown_series.idxmin()
peak_value = cumpnl.cummax().iloc[trough_idx]
trough_time = position_history['exit_time'].iloc[trough_idx]
recovery_time = None
for j in range(trough_idx, len(cumpnl)):
if cumpnl.iloc[j] >= peak_value:
recovery_time = (
position_history['exit_time'].iloc[j] - trough_time).total_seconds() / 3600
break
max_drawdown_recovery_time = recovery_time if recovery_time is not None else np.nan

roi = (final_capital_value - initial_capital_value) / \
initial_capital_value * 100
avg_pnl = position_history['pnl'].mean() * 100
pnl_std = position_history['pnl'].std()
sharpe_ratio = avg_pnl / pnl_std if pnl_std != 0 else np.nan

metrics = {
"初始资金": f'{INITIAL_CAPITAL}',
"最终资金": f'{final_capital_value}',
"总收益": f'{total_profit}',
"年化收益率": f'{annualized_return} %',
"胜率": f'{win_rate} %',
"交易次数": f'{total_trades}',
"平均持仓时间": f'{avg_duration} h',
"最大回撤": f'{max_drawdown} %',
"最大回撤修复时间": f'{max_drawdown_recovery_time} h',
"夏普比率": sharpe_ratio,
}

return position_history, metrics

回测结果:

1
2
3
4
5
6
7
8
9
10
初始资金: 10000
最终资金: 10651.176791099295
总收益: 651.1767910992949
年化收益率: 815.442016678132 %
胜率: 67.85714285714286 %
交易次数: 28
平均持仓时间: 0.7589285714285714 h
最大回撤: -1.9089572651408093 %
最大回撤修复时间: 125.5 h
夏普比率: 23.28055581414463

可视化模块

使用matplotlib。模块化绘图函数。加入绘图函数注册机制,方便以后拓展。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pandas as pd
import numpy as np
from datetime import datetime
from matplotlib import pyplot as plt

ENABLE_DEBUG = True

class Visualizer:
"""图表绘制类"""

def __init__(self):
# 图表注册字典,默认注册 premium/signals 图表和资金曲线图表
self.charts = {}
self.register_chart("premium_signals", self._plot_premium_signals)
self.register_chart("capital", self._plot_capital)

def link_strategy(self, strategy):
"""关联策略类,方便获取配置参数"""
self.strategy = strategy

def load_data(self, *data):
"""导入数据"""
self.data = data

def register_chart(self, key: str, func):
"""注册新的图表绘制方法,便于后续扩展图表"""
self.charts[key] = func

def _configure_plot(self):
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']

def _plot_premium(self, ax, market_data: pd.DataFrame, premium_col: str):
"""绘制 premium 和移动平均"""
ax.plot(
market_data[premium_col],
label='溢价',
color='#2ca02c',
linewidth=1
)
ax.plot(
market_data['mean_premium_pct'],
'--',
label='移动平均',
color='#d62728',
linewidth=1
)
# premium_range = self.strategy.zscore_threshold * market_data['std']
# ax.fill_between(
# market_data.index,
# market_data['mean_premium_pct'] + premium_range,
# market_data['mean_premium_pct'] - premium_range,
# color='gray', alpha=0.2
# )
ax.set_ylabel(f'{self.strategy.SYMBOL}' + '溢价(USDT)' if premium_col == 'premium' else '溢价率(%)')

def _plot_signals(self, ax, market_data: pd.DataFrame, premium_col: str):
"""绘制交易信号"""
long_signals = market_data[market_data['signal'] == 1]
short_signals = market_data[market_data['signal'] == -1]
ax.scatter(long_signals.index, long_signals[premium_col],
marker='^', color='g', s=30, label='做多信号')
ax.scatter(short_signals.index, short_signals[premium_col],
marker='v', color='r', s=30, label='做空信号')

def _plot_premium_signals(self, ax, market_data: pd.DataFrame, premium_col: str):
"""组合绘制 premium 和信号"""
self._plot_premium(ax, market_data, premium_col)
self._plot_signals(ax, market_data, premium_col)

def _plot_capital(self, ax, market_data, position_history: pd.DataFrame):
"""绘制资金曲线"""
ax.plot(
position_history.set_index('exit_time')['final_capital'].copy().reindex(market_data.index, method='ffill'),
label='资金曲线',
color='#1f77b4',
linewidth=1
)
ax.set_ylabel('资金')
ax.set_xlabel('时间')
ax.legend()

def _save_if_debug(self):
if ENABLE_DEBUG:
plt.savefig(
f"debug/plot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")

def plot_signals(self, market_data: pd.DataFrame, premium_col: str = 'premium_pct'):
"""绘制 premium 和信号"""
self._configure_plot()
fig, ax = plt.subplots(figsize=(15, 8))
self.charts["premium_signals"](
ax, market_data, premium_col=premium_col)
plt.tight_layout()
self._save_if_debug()
plt.show()

def plot(self, market_data: pd.DataFrame, position_history: pd.DataFrame, layout: str = 'stacked'):
"""
统一绘图接口,根据 layout 参数选择布局,同时利用注册的图表方法便于后续扩展。

参数:
market_data: 市场数据 DataFrame
position_history: 交易记录 DataFrame
layout:
'stacked' - 上下两块面板(上面绘制 premium/signals,下面绘制资金曲线)
'twin' - 单图双 y 轴(左侧绘制 premium/signals,右侧绘制资金曲线)
"""
self._configure_plot()

if layout == 'stacked':
fig, (ax_top, ax_bottom) = plt.subplots(2, 1, figsize=(15, 8))
# premium/signals 面板使用 'premium_pct' 字段
self.charts["premium_signals"](
ax_top, market_data, premium_col='premium_pct')
self.charts["capital"](ax_bottom, market_data, position_history)
elif layout == 'twin':
fig, ax_left = plt.subplots(figsize=(15, 8))
ax_right = ax_left.twinx()
# premium/signals 面板使用 'premium' 字段
self.charts["premium_signals"](
ax_left, market_data, premium_col='premium')
self.charts["capital"](ax_right, market_data, position_history)
ax_right.set_xlabel('时间')
else:
raise ValueError("无效的 layout 参数,请使用 'stacked' 或 'twin'。")

plt.tight_layout()
self._save_if_debug()
plt.show()

可视化结果:
回测结果

进阶方向

不一而足,方向很多:

  1. 多因子模型开发;
  2. 结合期权定价与波动率交易;
  3. 基于强化学习的超参数优化。