| """ |
| FinRL Agent for Algorithmic Trading |
| |
| This module provides a FinRL-based reinforcement learning agent that can be integrated |
| with the existing algorithmic trading system. It supports various RL algorithms |
| including PPO, A2C, DDPG, and TD3, and can work with Alpaca broker for real trading. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import gymnasium as gym |
| from gymnasium import spaces |
| from stable_baselines3 import PPO, A2C, DDPG, TD3 |
| from stable_baselines3.common.vec_env import DummyVecEnv |
| from stable_baselines3.common.callbacks import EvalCallback |
| import torch |
| import logging |
| from typing import Dict, List, Tuple, Optional, Any |
| from dataclasses import dataclass |
| import yaml |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class FinRLConfig: |
| """Configuration for FinRL agent""" |
| algorithm: str = "PPO" |
| learning_rate: float = 0.0003 |
| batch_size: int = 64 |
| buffer_size: int = 1000000 |
| learning_starts: int = 100 |
| gamma: float = 0.99 |
| tau: float = 0.005 |
| train_freq: int = 1 |
| gradient_steps: int = 1 |
| target_update_interval: int = 1 |
| exploration_fraction: float = 0.1 |
| exploration_initial_eps: float = 1.0 |
| exploration_final_eps: float = 0.05 |
| max_grad_norm: float = 10.0 |
| verbose: int = 1 |
| tensorboard_log: str = "logs/finrl_tensorboard" |
|
|
|
|
| class TradingEnvironment(gym.Env): |
| """ |
| Custom trading environment for FinRL |
| |
| This environment simulates a trading scenario where the agent can: |
| - Buy, sell, or hold positions |
| - Use technical indicators for decision making |
| - Manage portfolio value and risk |
| - Integrate with Alpaca broker for real trading |
| """ |
| |
| def __init__(self, data: pd.DataFrame, config: Dict[str, Any], |
| initial_balance: float = 100000, transaction_fee: float = 0.001, |
| max_position: int = 100, use_real_broker: bool = False): |
| super().__init__() |
| |
| self.data = data |
| self.config = config |
| self.initial_balance = initial_balance |
| self.transaction_fee = transaction_fee |
| self.max_position = max_position |
| self.use_real_broker = use_real_broker |
| |
| |
| self.alpaca_broker = None |
| if use_real_broker: |
| try: |
| from .alpaca_broker import AlpacaBroker |
| self.alpaca_broker = AlpacaBroker(config) |
| logger.info("Alpaca broker initialized for FinRL environment") |
| except Exception as e: |
| logger.error(f"Failed to initialize Alpaca broker: {e}") |
| self.use_real_broker = False |
| |
| |
| self.reset() |
| |
| |
| self.action_space = spaces.Discrete(3) |
| |
| |
| |
| n_features = len(self._get_features(self.data.iloc[0])) |
| self.observation_space = spaces.Box( |
| low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32 |
| ) |
| |
| def _get_features(self, row: pd.Series) -> np.ndarray: |
| """Extract features from market data row""" |
| features = [] |
| |
| |
| features.extend([ |
| row['open'], row['high'], row['low'], row['close'], row['volume'] |
| ]) |
| |
| |
| for indicator in ['sma_20', 'sma_50', 'rsi', 'bb_upper', 'bb_lower', 'macd']: |
| if indicator in row.index: |
| features.append(row[indicator]) |
| else: |
| features.append(0.0) |
| |
| |
| features.extend([ |
| self.balance, |
| self.position, |
| self.portfolio_value, |
| self.total_return |
| ]) |
| |
| return np.array(features, dtype=np.float32) |
| |
| def _calculate_portfolio_value(self) -> float: |
| """Calculate current portfolio value""" |
| current_price = self.data.iloc[self.current_step]['close'] |
| return self.balance + (self.position * current_price) |
| |
| def _calculate_reward(self) -> float: |
| """Calculate reward based on portfolio performance""" |
| current_value = self._calculate_portfolio_value() |
| previous_value = self.previous_portfolio_value |
| |
| |
| if previous_value > 0: |
| return (current_value - previous_value) / previous_value |
| else: |
| return 0.0 |
| |
| def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]: |
| """Execute one step in the environment""" |
| |
| |
| current_data = self.data.iloc[self.current_step] |
| current_price = current_data['close'] |
| |
| |
| if action == 0: |
| if self.position > 0: |
| shares_to_sell = min(self.position, self.max_position) |
| |
| if self.use_real_broker and self.alpaca_broker: |
| |
| result = self.alpaca_broker.place_market_order( |
| symbol=self.config['trading']['symbol'], |
| quantity=shares_to_sell, |
| side='sell' |
| ) |
| |
| if result['success']: |
| sell_value = result['filled_avg_price'] * shares_to_sell * (1 - self.transaction_fee) |
| self.balance += sell_value |
| self.position -= shares_to_sell |
| logger.info(f"Real sell order executed: {result['order_id']}") |
| else: |
| logger.warning(f"Real sell order failed: {result['error']}") |
| else: |
| |
| sell_value = shares_to_sell * current_price * (1 - self.transaction_fee) |
| self.balance += sell_value |
| self.position -= shares_to_sell |
| |
| elif action == 2: |
| if self.balance > 0: |
| max_shares = min( |
| int(self.balance / current_price), |
| self.max_position - self.position |
| ) |
| |
| if max_shares > 0: |
| if self.use_real_broker and self.alpaca_broker: |
| |
| result = self.alpaca_broker.place_market_order( |
| symbol=self.config['trading']['symbol'], |
| quantity=max_shares, |
| side='buy' |
| ) |
| |
| if result['success']: |
| buy_value = result['filled_avg_price'] * max_shares * (1 + self.transaction_fee) |
| self.balance -= buy_value |
| self.position += max_shares |
| logger.info(f"Real buy order executed: {result['order_id']}") |
| else: |
| logger.warning(f"Real buy order failed: {result['error']}") |
| else: |
| |
| buy_value = max_shares * current_price * (1 + self.transaction_fee) |
| self.balance -= buy_value |
| self.position += max_shares |
| |
| |
| self.previous_portfolio_value = self.portfolio_value |
| self.portfolio_value = self._calculate_portfolio_value() |
| self.total_return = (self.portfolio_value - self.initial_balance) / self.initial_balance |
| |
| |
| self.current_step += 1 |
| |
| |
| done = self.current_step >= len(self.data) - 1 |
| |
| |
| if not done: |
| observation = self._get_features(self.data.iloc[self.current_step]) |
| else: |
| observation = self._get_features(self.data.iloc[-1]) |
| |
| |
| reward = self._calculate_reward() |
| |
| |
| info = { |
| 'portfolio_value': self.portfolio_value, |
| 'total_return': self.total_return, |
| 'position': self.position, |
| 'balance': self.balance, |
| 'step': self.current_step |
| } |
| |
| return observation, reward, done, False, info |
| |
| def reset(self, seed: Optional[int] = None) -> Tuple[np.ndarray, Dict]: |
| """Reset the environment""" |
| super().reset(seed=seed) |
| |
| self.current_step = 0 |
| self.balance = self.initial_balance |
| self.position = 0 |
| self.portfolio_value = self.initial_balance |
| self.previous_portfolio_value = self.initial_balance |
| self.total_return = 0.0 |
| |
| |
| observation = self._get_features(self.data.iloc[0]) |
| |
| return observation, {} |
|
|
|
|
| class FinRLAgent: |
| """ |
| FinRL-based reinforcement learning agent for algorithmic trading |
| """ |
| |
| def __init__(self, config: FinRLConfig): |
| self.config = config |
| self.model = None |
| self.env = None |
| self.eval_env = None |
| self.callback = None |
| |
| logger.info(f"Initializing FinRL agent with algorithm: {config.algorithm}") |
| |
| def create_environment(self, data: pd.DataFrame, config: Dict[str, Any], |
| initial_balance: float = 100000, use_real_broker: bool = False) -> TradingEnvironment: |
| """Create trading environment from market data""" |
| return TradingEnvironment( |
| data=data, |
| config=config, |
| initial_balance=initial_balance, |
| transaction_fee=0.001, |
| max_position=100, |
| use_real_broker=use_real_broker |
| ) |
| |
| def prepare_data(self, data: pd.DataFrame) -> pd.DataFrame: |
| """Prepare data with technical indicators for FinRL""" |
| df = data.copy() |
| |
| |
| if 'sma_20' not in df.columns: |
| df['sma_20'] = df['close'].rolling(window=20).mean() |
| if 'sma_50' not in df.columns: |
| df['sma_50'] = df['close'].rolling(window=50).mean() |
| if 'rsi' not in df.columns: |
| df['rsi'] = self._calculate_rsi(df['close']) |
| if 'bb_upper' not in df.columns or 'bb_lower' not in df.columns: |
| bb_upper, bb_lower = self._calculate_bollinger_bands(df['close']) |
| df['bb_upper'] = bb_upper |
| df['bb_lower'] = bb_lower |
| if 'macd' not in df.columns: |
| df['macd'] = self._calculate_macd(df['close']) |
| |
| |
| df = df.bfill().fillna(0) |
| |
| return df |
| |
| def train(self, data: pd.DataFrame, config: Dict[str, Any], |
| total_timesteps: int = 100000, use_real_broker: bool = False) -> Dict[str, Any]: |
| """ |
| Train the FinRL agent |
| |
| Args: |
| data: Market data for training |
| config: Configuration dictionary |
| total_timesteps: Number of timesteps for training |
| use_real_broker: Whether to use real Alpaca broker during training |
| |
| Returns: |
| Training results dictionary |
| """ |
| try: |
| |
| prepared_data = self.prepare_data(data) |
| |
| |
| self.env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) |
| |
| |
| eval_data = prepared_data.copy() |
| self.eval_env = self.create_environment(eval_data, config, use_real_broker=False) |
| |
| |
| self.callback = EvalCallback( |
| self.eval_env, |
| best_model_save_path=config['finrl']['training']['model_save_path'], |
| log_path=config['finrl']['tensorboard_log'], |
| eval_freq=config['finrl']['training']['eval_freq'], |
| deterministic=True, |
| render=False |
| ) |
| |
| |
| if self.config.algorithm == "PPO": |
| self.model = PPO( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| batch_size=self.config.batch_size, |
| buffer_size=self.config.buffer_size, |
| learning_starts=self.config.learning_starts, |
| gamma=self.config.gamma, |
| train_freq=self.config.train_freq, |
| gradient_steps=self.config.gradient_steps, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "A2C": |
| self.model = A2C( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| gamma=self.config.gamma, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "DDPG": |
| self.model = DDPG( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| buffer_size=self.config.buffer_size, |
| learning_starts=self.config.learning_starts, |
| gamma=self.config.gamma, |
| tau=self.config.tau, |
| train_freq=self.config.train_freq, |
| gradient_steps=self.config.gradient_steps, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| elif self.config.algorithm == "TD3": |
| self.model = TD3( |
| "MlpPolicy", |
| self.env, |
| learning_rate=self.config.learning_rate, |
| buffer_size=self.config.buffer_size, |
| learning_starts=self.config.learning_starts, |
| gamma=self.config.gamma, |
| tau=self.config.tau, |
| train_freq=self.config.train_freq, |
| gradient_steps=self.config.gradient_steps, |
| verbose=self.config.verbose, |
| tensorboard_log=self.config.tensorboard_log |
| ) |
| else: |
| raise ValueError(f"Unsupported algorithm: {self.config.algorithm}") |
| |
| |
| logger.info(f"Starting training with {total_timesteps} timesteps") |
| self.model.learn( |
| total_timesteps=total_timesteps, |
| callback=self.callback, |
| progress_bar=True |
| ) |
| |
| |
| model_path = f"{config['finrl']['training']['model_save_path']}/final_model" |
| self.model.save(model_path) |
| logger.info(f"Training completed. Model saved to {model_path}") |
| |
| return { |
| 'success': True, |
| 'algorithm': self.config.algorithm, |
| 'total_timesteps': total_timesteps, |
| 'model_path': model_path |
| } |
| |
| except Exception as e: |
| logger.error(f"Error during training: {e}") |
| return { |
| 'success': False, |
| 'error': str(e) |
| } |
| |
| def predict(self, data: pd.DataFrame, config: Dict[str, Any], |
| use_real_broker: bool = False) -> Dict[str, Any]: |
| """ |
| Make predictions using the trained model |
| |
| Args: |
| data: Market data for prediction |
| config: Configuration dictionary |
| use_real_broker: Whether to use real Alpaca broker for execution |
| |
| Returns: |
| Prediction results dictionary |
| """ |
| try: |
| if self.model is None: |
| |
| model_path = config['finrl']['inference']['model_path'] |
| if config['finrl']['inference']['use_trained_model']: |
| self.model = self._load_model(model_path, config) |
| if self.model is None: |
| return {'success': False, 'error': 'No trained model available'} |
| else: |
| return {'success': False, 'error': 'No model available for prediction'} |
| |
| |
| prepared_data = self.prepare_data(data) |
| |
| |
| env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) |
| |
| |
| obs, _ = env.reset() |
| done = False |
| actions = [] |
| rewards = [] |
| portfolio_values = [] |
| |
| while not done: |
| action, _ = self.model.predict(obs, deterministic=True) |
| obs, reward, done, _, info = env.step(action) |
| |
| actions.append(action) |
| rewards.append(reward) |
| portfolio_values.append(info['portfolio_value']) |
| |
| |
| initial_value = config['trading']['capital'] |
| final_value = portfolio_values[-1] if portfolio_values else initial_value |
| total_return = (final_value - initial_value) / initial_value |
| |
| return { |
| 'success': True, |
| 'actions': actions, |
| 'rewards': rewards, |
| 'portfolio_values': portfolio_values, |
| 'initial_value': initial_value, |
| 'final_value': final_value, |
| 'total_return': total_return, |
| 'total_trades': len([a for a in actions if a != 1]) |
| } |
| |
| except Exception as e: |
| logger.error(f"Error during prediction: {e}") |
| return { |
| 'success': False, |
| 'error': str(e) |
| } |
| |
| def _load_model(self, model_path: str, config: Dict[str, Any]): |
| """Load a trained model""" |
| try: |
| if config['finrl']['algorithm'] == "PPO": |
| return PPO.load(model_path) |
| elif config['finrl']['algorithm'] == "A2C": |
| return A2C.load(model_path) |
| elif config['finrl']['algorithm'] == "DDPG": |
| return DDPG.load(model_path) |
| elif config['finrl']['algorithm'] == "TD3": |
| return TD3.load(model_path) |
| else: |
| logger.error(f"Unsupported algorithm for model loading: {config['finrl']['algorithm']}") |
| return None |
| except Exception as e: |
| logger.error(f"Error loading model: {e}") |
| return None |
| |
| def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: |
| """Calculate RSI indicator""" |
| delta = prices.diff() |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() |
| rs = gain / loss |
| return 100 - (100 / (1 + rs)) |
| |
| def _calculate_bollinger_bands(self, prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series]: |
| """Calculate Bollinger Bands""" |
| sma = prices.rolling(window=period).mean() |
| std = prices.rolling(window=period).std() |
| upper_band = sma + (std * std_dev) |
| lower_band = sma - (std * std_dev) |
| return upper_band, lower_band |
| |
| def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.Series: |
| """Calculate MACD indicator""" |
| ema_fast = prices.ewm(span=fast).mean() |
| ema_slow = prices.ewm(span=slow).mean() |
| macd = ema_fast - ema_slow |
| return macd |