stock-advisor / finrl_agent.py
Dung Pham Anh
update finr
b2fcf54
import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
class StockTradingEnv(gym.Env):
"""Custom Environment for stock trading"""
def __init__(self, df, initial_balance=1000000):
super(StockTradingEnv, self).__init__()
self.df = df
self.initial_balance = initial_balance
self.current_step = 0
self.shares_held = 0
self.balance = initial_balance
self.cost_basis = 0
# Định nghĩa không gian hành động: 0 (Bán), 1 (Giữ), 2 (Mua)
self.action_space = spaces.Discrete(3)
# Định nghĩa không gian trạng thái (features)
self.observation_space = spaces.Box(
low=-np.inf,
high=np.inf,
shape=(7,), # price, volume, RSI, MACD, Signal, EMA20, EMA50
dtype=np.float32
)
def reset(self, seed=None):
super().reset(seed=seed)
self.current_step = 0
self.balance = self.initial_balance
self.shares_held = 0
self.cost_basis = 0
return self._next_observation(), {}
def _next_observation(self):
# Chuyển đổi các giá trị sang float64 trước khi tạo array
obs = np.array([
float(self.df['close'].iloc[self.current_step]),
float(self.df['volume'].iloc[self.current_step]),
float(self.df['RSI'].iloc[self.current_step]),
float(self.df['MACD'].iloc[self.current_step]),
float(self.df['Signal'].iloc[self.current_step]),
float(self.df['EMA20'].iloc[self.current_step]),
float(self.df['EMA50'].iloc[self.current_step])
], dtype=np.float32)
return obs
def step(self, action):
current_price = self.df['close'].iloc[self.current_step]
# Thực hiện hành động giao dịch
if action == 0: # Bán
if self.shares_held > 0:
self.balance += self.shares_held * current_price * 0.999 # Phí giao dịch 0.1%
self.shares_held = 0
self.cost_basis = 0
elif action == 2: # Mua
if self.balance > current_price:
shares_to_buy = self.balance // current_price
actual_cost = shares_to_buy * current_price * 1.001 # Phí giao dịch 0.1%
if actual_cost <= self.balance:
self.balance -= actual_cost
self.shares_held += shares_to_buy
self.cost_basis = current_price
# Di chuyển đến bước tiếp theo
self.current_step += 1
# Tính toán giá trị danh mục đầu tư
portfolio_value = self.balance + self.shares_held * current_price
# Kiểm tra kết thúc episode
done = self.current_step >= len(self.df) - 1
# Tính reward dựa trên thay đổi giá trị danh mục
if not done:
next_price = self.df['close'].iloc[self.current_step]
price_change = (next_price - current_price) / current_price
reward = price_change * self.shares_held * current_price / self.initial_balance
else:
reward = (portfolio_value - self.initial_balance) / self.initial_balance
return self._next_observation(), reward, done, False, {}
class RLTrader:
def __init__(self):
self.model = None
def train(self, df):
"""Huấn luyện mô hình RL"""
# Kiểm tra dữ liệu đầu vào
if not all(col in df.columns for col in ['close', 'volume', 'RSI', 'MACD', 'Signal', 'EMA20', 'EMA50']):
raise ValueError("Thiếu một số cột dữ liệu cần thiết cho mô hình RL")
# Kiểm tra giá trị NaN
if df[['close', 'volume', 'RSI', 'MACD', 'Signal', 'EMA20', 'EMA50']].isna().any().any():
raise ValueError("Dữ liệu chứa giá trị NaN")
# Tạo môi trường
env = DummyVecEnv([lambda: StockTradingEnv(df.copy())])
# Khởi tạo và huấn luyện mô hình
self.model = PPO("MlpPolicy", env, verbose=0,
learning_rate=1e-4,
batch_size=64,
ent_coef=0.01)
self.model.learn(total_timesteps=50000)
return self.model
def predict(self, df):
"""Dự đoán hành động giao dịch"""
if self.model is None:
raise Exception("Model chưa được huấn luyện")
env = StockTradingEnv(df)
obs = env.reset()[0]
actions = []
states = []
current_step = 0
while current_step < len(df):
action, _ = self.model.predict(obs, deterministic=True)
actions.append(action)
states.append(obs)
obs, _, done, _, _ = env.step(action)
current_step += 1
if done:
# Nếu đã kết thúc nhưng chưa đủ độ dài, thêm hành động giữ
while current_step < len(df):
actions.append(1) # 1 = Giữ
current_step += 1
break
# Đảm bảo độ dài của actions bằng với độ dài của DataFrame
actions = actions[:len(df)]
# Chuyển đổi actions thành tín hiệu
signals = pd.DataFrame({
'RL_Signal': ['Mua' if a == 2 else 'Bán' if a == 0 else 'Giữ' for a in actions],
'RL_Action': actions
}, index=df.index)
# Đánh giá hiệu suất
performance = self.evaluate_performance(df, signals)
return signals, performance
def evaluate_performance(self, df, signals):
"""Đánh giá hiệu suất của mô hình"""
df_copy = df.copy()
df_copy['RL_Signal'] = signals['RL_Signal']
# Tính toán lợi nhuận
df_copy['Returns'] = df_copy['close'].pct_change()
df_copy['Strategy_Returns'] = df_copy['Returns'] * (df_copy['RL_Signal'].shift(1) == 'Mua')
# Tính các chỉ số hiệu suất
total_returns = df_copy['Strategy_Returns'].sum() * 100 # Chuyển sang phần trăm
sharpe_ratio = df_copy['Strategy_Returns'].mean() / df_copy['Strategy_Returns'].std() * np.sqrt(252)
max_drawdown = (df_copy['close'] / df_copy['close'].cummax() - 1).min() * 100 # Chuyển sang phần trăm
return {
'total_returns': total_returns,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown
}