| """ |
| FinRL Agent for Algorithmic Trading |
| |
| This module provides a FinRL-based reinforcement learning agent that can be integrated |
| with the existing algorithmic trading system. It supports various RL algorithms |
| including PPO, A2C, DDPG, and TD3. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import gymnasium as gym |
| from gymnasium import spaces |
| from stable_baselines3 import PPO, A2C, DDPG, TD3 |
| from stable_baselines3.common.vec_env import DummyVecEnv |
| from stable_baselines3.common.callbacks import EvalCallback |
| import torch |
| import logging |
| from typing import Dict, List, Tuple, Optional, Any |
| from dataclasses import dataclass |
| import yaml |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class FinRLConfig: |
| """Configuration for FinRL agent""" |
| algorithm: str = "PPO" |
| learning_rate: float = 0.0003 |
| batch_size: int = 64 |
| buffer_size: int = 1000000 |
| learning_starts: int = 100 |
| gamma: float = 0.99 |
| tau: float = 0.005 |
| train_freq: int = 1 |
| gradient_steps: int = 1 |
| target_update_interval: int = 1 |
| exploration_fraction: float = 0.1 |
| exploration_initial_eps: float = 1.0 |
| exploration_final_eps: float = 0.05 |
| max_grad_norm: float = 10.0 |
| verbose: int = 1 |
| tensorboard_log: str = "logs/finrl_tensorboard" |
|
|
|
|
| class TradingEnvironment(gym.Env): |
| """ |
| Custom trading environment for FinRL |
| |
| This environment simulates a trading scenario where the agent can: |
| - Buy, sell, or hold positions |
| - Use technical indicators for decision making |
| - Manage portfolio value and risk |
| """ |
| |
| def __init__(self, data: pd.DataFrame, initial_balance: float = 100000, |
| transaction_fee: float = 0.001, max_position: int = 100): |
| super().__init__() |
| |
| self.data = data |
| self.initial_balance = initial_balance |
| self.transaction_fee = transaction_fee |
| self.max_position = max_position |
| |
| |
| self.reset() |
| |
| |
| self.action_space = spaces.Discrete(3) |
| |
| |
| |
| n_features = len(self._get_features(self.data.iloc[0])) |
| self.observation_space = spaces.Box( |
| low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32 |
| ) |
| |
| def _get_features(self, row: pd.Series) -> np.ndarray: |
| """Extract features from market data row""" |
| features = [] |
| |
| |
| features.extend([ |
| row['open'], row['high'], row['low'], row['close'], row['volume'] |
| ]) |
| |
| |
| for indicator in ['sma_20', 'sma_50', 'rsi', 'bb_upper', 'bb_lower', 'macd']: |
| if indicator in row.index: |
| features.append(row[indicator]) |
| else: |
| features.append(0.0) |
| |
| |
| features.extend([ |
| self.balance, |
| self.position, |
| self.portfolio_value, |
| self.total_return |
| ]) |
| |
| return np.array(features, dtype=np.float32) |
| |
| def _calculate_portfolio_value(self) -> float: |
| """Calculate current portfolio value""" |
| current_price = self.data.iloc[self.current_step]['close'] |
| return self.balance + (self.position * current_price) |
| |
| def _calculate_reward(self) -> float: |
| """Calculate reward based on portfolio performance""" |
| current_value = self._calculate_portfolio_value() |
| previous_value = self.previous_portfolio_value |
| |
| |
| if previous_value > 0: |
| return (current_value - previous_value) / previous_value |
| else: |
| return 0.0 |
| |
| def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]: |
| """Execute one step in the environment""" |
| |
| |
| current_data = self.data.iloc[self.current_step] |
| current_price = current_data['close'] |
| |
| |
| if action == 0: |
| if self.position > 0: |
| shares_to_sell = min(self.position, self.max_position) |
| sell_value = shares_to_sell * current_price * (1 - self.transaction_fee) |
| self.balance += sell_value |
| self.position -= shares_to_sell |
| elif action == 2: |
| if self.balance > 0: |
| max_shares = min( |
| int(self.balance / current_price), |
| self.max_position - self.position |
| ) |
| if max_shares > 0: |
| buy_value = max_shares * current_price * (1 + self.transaction_fee) |
| self.balance -= buy_value |
| self.position += max_shares |
| |
| |
| self.previous_portfolio_value = self.portfolio_value |
| self.portfolio_value = self._calculate_portfolio_value() |
| self.total_return = (self.portfolio_value - self.initial_balance) / self.initial_balance |
| |
| |
| reward = self._calculate_reward() |
| |
| |
| self.current_step += 1 |
| |
| |
| done = self.current_step >= len(self.data) - 1 |
| |
| |
| if not done: |
| observation = self._get_features(self.data.iloc[self.current_step]) |
| else: |
| |
| observation = self._get_features(self.data.iloc[-1]) |
| |
| info = { |
| 'balance': self.balance, |
| 'position': self.position, |
| 'portfolio_value': self.portfolio_value, |
| 'total_return': self.total_return, |
| 'current_price': current_price |
| } |
| |
| return observation, reward, done, False, info |
| |
| def reset(self, seed: Optional[int] = None) -> Tuple[np.ndarray, Dict]: |
| """Reset the environment""" |
| super().reset(seed=seed) |
| |
| self.current_step = 0 |
| self.balance = self.initial_balance |
| self.position = 0 |
| self.portfolio_value = self.initial_balance |
| self.previous_portfolio_value = self.initial_balance |
| self.total_return = 0.0 |
| |
| observation = self._get_features(self.data.iloc[self.current_step]) |
| info = { |
| 'balance': self.balance, |
| 'position': self.position, |
| 'portfolio_value': self.portfolio_value, |
| 'total_return': self.total_return |
| } |
| |
| return observation, info |
|
|
|
|
| class FinRLAgent: |
| """ |
| FinRL-based reinforcement learning agent for algorithmic trading |
| """ |
| |
| def __init__(self, config: FinRLConfig): |
| self.config = config |
| self.model = None |
| self.env = None |
| self.eval_env = None |
| self.callback = None |
| |
| logger.info(f"Initializing FinRL agent with algorithm: {config.algorithm}") |
| |
| def create_environment(self, data: pd.DataFrame, initial_balance: float = 100000) -> TradingEnvironment: |
| """Create trading environment from market data""" |
| return TradingEnvironment( |
| data=data, |
| initial_balance=initial_balance, |
| transaction_fee=0.001, |
| max_position=100 |
| ) |
| |
| def prepare_data(self, data: pd.DataFrame) -> pd.DataFrame: |
| """Prepare data with technical indicators for FinRL""" |
| df = data.copy() |
| |
| |
| if 'sma_20' not in df.columns: |
| df['sma_20'] = df['close'].rolling(window=20).mean() |
| if 'sma_50' not in df.columns: |
| df['sma_50'] = df['close'].rolling(window=50).mean() |
| if 'rsi' not in df.columns: |
| df['rsi'] = self._calculate_rsi(df['close']) |
| if 'bb_upper' not in df.columns or 'bb_lower' not in df.columns: |
| bb_upper, bb_lower = self._calculate_bollinger_bands(df['close']) |
| df['bb_upper'] = bb_upper |
| df['bb_lower'] = bb_lower |
| if 'macd' not in df.columns: |
| df['macd'] = self._calculate_macd(df['close']) |
| |
| |
| df = df.bfill().fillna(0) |
| |
| return df |
| |
| def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: |
| """Calculate RSI indicator""" |
| delta = prices.diff() |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() |
| rs = gain / loss |
| rsi = 100 - (100 / (1 + rs)) |
| return rsi |
| |
| def _calculate_bollinger_bands(self, prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series]: |
| """Calculate Bollinger Bands""" |
| sma = prices.rolling(window=period).mean() |
| std = prices.rolling(window=period).std() |
| upper_band = sma + (std * std_dev) |
| lower_band = sma - (std * std_dev) |
| return upper_band, lower_band |
| |
| def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.Series: |
| """Calculate MACD indicator""" |
| ema_fast = prices.ewm(span=fast).mean() |
| ema_slow = prices.ewm(span=slow).mean() |
| macd_line = ema_fast - ema_slow |
| return macd_line |
| |
| def train(self, data: pd.DataFrame, total_timesteps: int = 100000, |
| eval_freq: int = 10000, eval_data: Optional[pd.DataFrame] = None) -> Dict[str, Any]: |
| """Train the FinRL agent""" |
| |
| logger.info("Starting FinRL agent training") |
| |
| |
| train_data = self.prepare_data(data) |
| |
| |
| self.env = DummyVecEnv([lambda: self.create_environment(train_data)]) |
| |
| |
| if eval_data is not None: |
| eval_data = self.prepare_data(eval_data) |
| self.eval_env = DummyVecEnv([lambda: self.create_environment(eval_data)]) |
| self.callback = EvalCallback( |
| self.eval_env, |
| best_model_save_path="models/finrl_best/", |
| log_path="logs/finrl_eval/", |
| eval_freq=eval_freq, |
| deterministic=True, |
| render=False |
| ) |
| |
| |
| if self.config.algorithm == "PPO": |
| self.model = PPO( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| batch_size=self.config.batch_size, |
| gamma=self.config.gamma, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "A2C": |
| self.model = A2C( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| gamma=self.config.gamma, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "DDPG": |
| self.model = DDPG( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| buffer_size=self.config.buffer_size, |
| learning_starts=self.config.learning_starts, |
| gamma=self.config.gamma, |
| tau=self.config.tau, |
| train_freq=self.config.train_freq, |
| gradient_steps=self.config.gradient_steps, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "TD3": |
| self.model = TD3( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| buffer_size=self.config.buffer_size, |
| learning_starts=self.config.learning_starts, |
| gamma=self.config.gamma, |
| tau=self.config.tau, |
| train_freq=self.config.train_freq, |
| gradient_steps=self.config.gradient_steps, |
| target_update_interval=self.config.target_update_interval, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| else: |
| raise ValueError(f"Unsupported algorithm: {self.config.algorithm}") |
| |
| |
| callbacks = [self.callback] if self.callback else None |
| self.model.learn( |
| total_timesteps=total_timesteps, |
| callback=callbacks |
| ) |
| |
| logger.info("FinRL agent training completed") |
| |
| return { |
| 'algorithm': self.config.algorithm, |
| 'total_timesteps': total_timesteps, |
| 'model_path': f"models/finrl_{self.config.algorithm.lower()}" |
| } |
| |
| def predict(self, data: pd.DataFrame) -> List[int]: |
| """Generate trading predictions using the trained model""" |
| if self.model is None: |
| raise ValueError("Model not trained. Call train() first.") |
| |
| |
| test_data = self.prepare_data(data) |
| |
| |
| test_env = self.create_environment(test_data) |
| |
| predictions = [] |
| obs, _ = test_env.reset() |
| |
| done = False |
| while not done: |
| action, _ = self.model.predict(obs, deterministic=True) |
| predictions.append(action) |
| obs, _, done, _, _ = test_env.step(action) |
| |
| return predictions |
| |
| def evaluate(self, data: pd.DataFrame) -> Dict[str, float]: |
| """Evaluate the trained model on test data""" |
| if self.model is None: |
| raise ValueError("Model not trained. Call train() first.") |
| |
| |
| test_data = self.prepare_data(data) |
| |
| |
| test_env = self.create_environment(test_data) |
| |
| obs, _ = test_env.reset() |
| done = False |
| total_reward = 0 |
| steps = 0 |
| |
| while not done: |
| action, _ = self.model.predict(obs, deterministic=True) |
| obs, reward, done, _, info = test_env.step(action) |
| total_reward += reward |
| steps += 1 |
| |
| |
| final_portfolio_value = info['portfolio_value'] |
| initial_balance = test_env.initial_balance |
| total_return = (final_portfolio_value - initial_balance) / initial_balance |
| |
| return { |
| 'total_reward': total_reward, |
| 'total_return': total_return, |
| 'final_portfolio_value': final_portfolio_value, |
| 'steps': steps, |
| 'sharpe_ratio': total_reward / steps if steps > 0 else 0 |
| } |
| |
| def save_model(self, path: str): |
| """Save the trained model""" |
| if self.model is None: |
| raise ValueError("No model to save. Train the model first.") |
| |
| self.model.save(path) |
| logger.info(f"Model saved to {path}") |
| |
| def load_model(self, path: str): |
| """Load a trained model""" |
| if self.config.algorithm == "PPO": |
| self.model = PPO.load(path) |
| elif self.config.algorithm == "A2C": |
| self.model = A2C.load(path) |
| elif self.config.algorithm == "DDPG": |
| self.model = DDPG.load(path) |
| elif self.config.algorithm == "TD3": |
| self.model = TD3.load(path) |
| else: |
| raise ValueError(f"Unsupported algorithm: {self.config.algorithm}") |
| |
| logger.info(f"Model loaded from {path}") |
|
|
|
|
| def create_finrl_agent_from_config(config_path: str) -> FinRLAgent: |
| """Create FinRL agent from configuration file""" |
| with open(config_path, 'r') as file: |
| config_data = yaml.safe_load(file) |
| |
| finrl_config = FinRLConfig(**config_data.get('finrl', {})) |
| return FinRLAgent(finrl_config) |