Edwin Salguero
chore: enterprise-grade project structure, robust .gitignore, and directory cleanup
9289e29
| """ | |
| FinRL Agent for Algorithmic Trading | |
| This module provides a FinRL-based reinforcement learning agent that can be integrated | |
| with the existing algorithmic trading system. It supports various RL algorithms | |
| including PPO, A2C, DDPG, and TD3, and can work with Alpaca broker for real trading. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import gymnasium as gym | |
| from gymnasium import spaces | |
| from stable_baselines3 import PPO, A2C, DDPG, TD3 | |
| from stable_baselines3.common.vec_env import DummyVecEnv | |
| from stable_baselines3.common.callbacks import EvalCallback | |
| import torch | |
| import logging | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from dataclasses import dataclass | |
| import yaml | |
| import inspect | |
| logger = logging.getLogger(__name__) | |
| class FinRLConfig: | |
| """Configuration for FinRL agent""" | |
| algorithm: str = "PPO" # PPO, A2C, DDPG, TD3 | |
| learning_rate: float = 0.0003 | |
| batch_size: int = 64 | |
| buffer_size: int = 1000000 | |
| learning_starts: int = 100 | |
| gamma: float = 0.99 | |
| tau: float = 0.005 | |
| train_freq: int = 1 | |
| gradient_steps: int = 1 | |
| target_update_interval: int = 1 | |
| exploration_fraction: float = 0.1 | |
| exploration_initial_eps: float = 1.0 | |
| exploration_final_eps: float = 0.05 | |
| max_grad_norm: float = 10.0 | |
| verbose: int = 1 | |
| tensorboard_log: str = "logs/finrl_tensorboard" | |
| class TradingEnvironment(gym.Env): | |
| """ | |
| Custom trading environment for FinRL | |
| This environment simulates a trading scenario where the agent can: | |
| - Buy, sell, or hold positions | |
| - Use technical indicators for decision making | |
| - Manage portfolio value and risk | |
| - Integrate with Alpaca broker for real trading | |
| """ | |
| def __init__(self, data: pd.DataFrame, config: Dict[str, Any], | |
| initial_balance: float = 100000, transaction_fee: float = 0.001, | |
| max_position: int = 100, use_real_broker: bool = False): | |
| super().__init__() | |
| self.data = data | |
| self.config = config | |
| self.initial_balance = initial_balance | |
| self.transaction_fee = transaction_fee | |
| self.max_position = max_position | |
| self.use_real_broker = use_real_broker | |
| # Initialize Alpaca broker if using real trading | |
| self.alpaca_broker = None | |
| if use_real_broker: | |
| try: | |
| from .alpaca_broker import AlpacaBroker | |
| self.alpaca_broker = AlpacaBroker(config) | |
| logger.info("Alpaca broker initialized for FinRL environment") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Alpaca broker: {e}") | |
| self.use_real_broker = False | |
| # Reset state | |
| self.reset() | |
| # Define action space: [-1, 0, 1] for sell, hold, buy | |
| self.action_space = spaces.Discrete(3) | |
| # Define observation space | |
| # Features: OHLCV + technical indicators + portfolio state | |
| n_features = len(self._get_features(self.data.iloc[0])) | |
| self.observation_space = spaces.Box( | |
| low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32 | |
| ) | |
| def _get_features(self, row: pd.Series) -> np.ndarray: | |
| """Extract features from market data row""" | |
| features = [] | |
| # Price features | |
| features.extend([ | |
| row['open'], row['high'], row['low'], row['close'], row['volume'] | |
| ]) | |
| # Technical indicators (if available) | |
| for indicator in ['sma_20', 'sma_50', 'rsi', 'bb_upper', 'bb_lower', 'macd']: | |
| if indicator in row.index: | |
| features.append(row[indicator]) | |
| else: | |
| features.append(0.0) | |
| # Portfolio state | |
| features.extend([ | |
| self.balance, | |
| self.position, | |
| self.portfolio_value, | |
| self.total_return | |
| ]) | |
| return np.array(features, dtype=np.float32) | |
| def _calculate_portfolio_value(self) -> float: | |
| """Calculate current portfolio value""" | |
| current_price = self.data.iloc[self.current_step]['close'] | |
| return self.balance + (self.position * current_price) | |
| def _calculate_reward(self) -> float: | |
| """Calculate reward based on portfolio performance""" | |
| current_value = self._calculate_portfolio_value() | |
| previous_value = self.previous_portfolio_value | |
| # Calculate return | |
| if previous_value > 0: | |
| return (current_value - previous_value) / previous_value | |
| else: | |
| return 0.0 | |
| def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]: | |
| """Execute one step in the environment""" | |
| # Get current market data | |
| current_data = self.data.iloc[self.current_step] | |
| current_price = current_data['close'] | |
| # Execute action | |
| if action == 0: # Sell | |
| if self.position > 0: | |
| shares_to_sell = min(self.position, self.max_position) | |
| if self.use_real_broker and self.alpaca_broker: | |
| # Execute real order with Alpaca | |
| result = self.alpaca_broker.place_market_order( | |
| symbol=self.config['trading']['symbol'], | |
| quantity=shares_to_sell, | |
| side='sell' | |
| ) | |
| if result['success']: | |
| sell_value = result['filled_avg_price'] * shares_to_sell * (1 - self.transaction_fee) | |
| self.balance += sell_value | |
| self.position -= shares_to_sell | |
| logger.info(f"Real sell order executed: {result['order_id']}") | |
| else: | |
| logger.warning(f"Real sell order failed: {result['error']}") | |
| else: | |
| # Simulate order execution | |
| sell_value = shares_to_sell * current_price * (1 - self.transaction_fee) | |
| self.balance += sell_value | |
| self.position -= shares_to_sell | |
| elif action == 2: # Buy | |
| if self.balance > 0: | |
| max_shares = min( | |
| int(self.balance / current_price), | |
| self.max_position - self.position | |
| ) | |
| if max_shares > 0: | |
| if self.use_real_broker and self.alpaca_broker: | |
| # Execute real order with Alpaca | |
| result = self.alpaca_broker.place_market_order( | |
| symbol=self.config['trading']['symbol'], | |
| quantity=max_shares, | |
| side='buy' | |
| ) | |
| if result['success']: | |
| buy_value = result['filled_avg_price'] * max_shares * (1 + self.transaction_fee) | |
| self.balance -= buy_value | |
| self.position += max_shares | |
| logger.info(f"Real buy order executed: {result['order_id']}") | |
| else: | |
| logger.warning(f"Real buy order failed: {result['error']}") | |
| else: | |
| # Simulate order execution | |
| buy_value = max_shares * current_price * (1 + self.transaction_fee) | |
| self.balance -= buy_value | |
| self.position += max_shares | |
| # Update portfolio value | |
| self.previous_portfolio_value = self.portfolio_value | |
| self.portfolio_value = self._calculate_portfolio_value() | |
| self.total_return = (self.portfolio_value - self.initial_balance) / self.initial_balance | |
| # Move to next step | |
| self.current_step += 1 | |
| # Check if episode is done | |
| done = self.current_step >= len(self.data) - 1 | |
| # Get observation for next step | |
| if not done: | |
| observation = self._get_features(self.data.iloc[self.current_step]) | |
| else: | |
| observation = self._get_features(self.data.iloc[-1]) | |
| # Calculate reward | |
| reward = self._calculate_reward() | |
| # Additional info | |
| info = { | |
| 'portfolio_value': self.portfolio_value, | |
| 'total_return': self.total_return, | |
| 'position': self.position, | |
| 'balance': self.balance, | |
| 'step': self.current_step | |
| } | |
| return observation, reward, done, False, info | |
| def reset(self, seed: Optional[int] = None) -> Tuple[np.ndarray, Dict]: | |
| """Reset the environment""" | |
| super().reset(seed=seed) | |
| self.current_step = 0 | |
| self.balance = self.initial_balance | |
| self.position = 0 | |
| self.portfolio_value = self.initial_balance | |
| self.previous_portfolio_value = self.initial_balance | |
| self.total_return = 0.0 | |
| # Get initial observation | |
| observation = self._get_features(self.data.iloc[0]) | |
| return observation, {} | |
| class FinRLAgent: | |
| """ | |
| FinRL-based reinforcement learning agent for algorithmic trading | |
| """ | |
| def __init__(self, config: FinRLConfig): | |
| self.config = config | |
| self.model = None | |
| self.env = None | |
| self.eval_env = None | |
| self.callback = None | |
| logger.info(f"Initializing FinRL agent with algorithm: {self.config.algorithm}") | |
| def _get_valid_kwargs(self, algo_class): | |
| """Return a dict of config fields valid for the given algorithm class, excluding tensorboard_log.""" | |
| sig = inspect.signature(algo_class.__init__) | |
| valid_keys = set(sig.parameters.keys()) | |
| # Exclude 'self', 'policy', and 'tensorboard_log' which are always passed explicitly | |
| valid_keys.discard('self') | |
| valid_keys.discard('policy') | |
| valid_keys.discard('tensorboard_log') | |
| # Build kwargs from config dataclass | |
| return {k: getattr(self.config, k) for k in self.config.__dataclass_fields__ if k in valid_keys} | |
| def create_environment(self, data: pd.DataFrame, config: Dict[str, Any], | |
| initial_balance: float = 100000, use_real_broker: bool = False) -> TradingEnvironment: | |
| """Create trading environment from market data""" | |
| return TradingEnvironment( | |
| data=data, | |
| config=config, | |
| initial_balance=initial_balance, | |
| transaction_fee=0.001, | |
| max_position=100, | |
| use_real_broker=use_real_broker | |
| ) | |
| def prepare_data(self, data: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare data with technical indicators for FinRL""" | |
| df = data.copy() | |
| # Add technical indicators if not present | |
| if 'sma_20' not in df.columns: | |
| df['sma_20'] = df['close'].rolling(window=20).mean() | |
| if 'sma_50' not in df.columns: | |
| df['sma_50'] = df['close'].rolling(window=50).mean() | |
| if 'rsi' not in df.columns: | |
| df['rsi'] = self._calculate_rsi(df['close']) | |
| if 'bb_upper' not in df.columns or 'bb_lower' not in df.columns: | |
| bb_upper, bb_lower = self._calculate_bollinger_bands(df['close']) | |
| df['bb_upper'] = bb_upper | |
| df['bb_lower'] = bb_lower | |
| if 'macd' not in df.columns: | |
| df['macd'] = self._calculate_macd(df['close']) | |
| # Fill NaN values | |
| df = df.bfill().fillna(0) | |
| return df | |
| def train(self, data: pd.DataFrame, config: Dict[str, Any], | |
| total_timesteps: int = 100000, use_real_broker: bool = False) -> Dict[str, Any]: | |
| """ | |
| Train the FinRL agent | |
| Args: | |
| data: Market data for training | |
| config: Configuration dictionary | |
| total_timesteps: Number of timesteps for training | |
| use_real_broker: Whether to use real Alpaca broker during training | |
| Returns: | |
| Training results dictionary | |
| """ | |
| try: | |
| # Prepare data | |
| prepared_data = self.prepare_data(data) | |
| # Create environment | |
| self.env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
| # Create evaluation environment (without real broker) | |
| eval_data = prepared_data.copy() | |
| self.eval_env = self.create_environment(eval_data, config, use_real_broker=False) | |
| # Create callback for evaluation | |
| finrl_config = config.get('finrl', {}) | |
| training_config = finrl_config.get('training', {}) | |
| model_save_path = training_config.get('model_save_path', 'models/finrl') | |
| tensorboard_log = finrl_config.get('tensorboard_log', self.config.tensorboard_log) | |
| eval_freq = training_config.get('eval_freq', 1000) | |
| self.callback = EvalCallback( | |
| self.eval_env, | |
| best_model_save_path=model_save_path, | |
| log_path=tensorboard_log, | |
| eval_freq=eval_freq, | |
| deterministic=True, | |
| render=False | |
| ) | |
| # Initialize model based on algorithm | |
| if self.config.algorithm == "PPO": | |
| algo_kwargs = self._get_valid_kwargs(PPO) | |
| self.model = PPO( | |
| "MlpPolicy", | |
| self.env, | |
| **algo_kwargs, | |
| tensorboard_log=self.config.tensorboard_log | |
| ) | |
| elif self.config.algorithm == "A2C": | |
| algo_kwargs = self._get_valid_kwargs(A2C) | |
| self.model = A2C( | |
| "MlpPolicy", | |
| self.env, | |
| **algo_kwargs, | |
| tensorboard_log=self.config.tensorboard_log | |
| ) | |
| elif self.config.algorithm == "DDPG": | |
| algo_kwargs = self._get_valid_kwargs(DDPG) | |
| self.model = DDPG( | |
| "MlpPolicy", | |
| self.env, | |
| **algo_kwargs, | |
| tensorboard_log=self.config.tensorboard_log | |
| ) | |
| elif self.config.algorithm == "TD3": | |
| algo_kwargs = self._get_valid_kwargs(TD3) | |
| self.model = TD3( | |
| "MlpPolicy", | |
| self.env, | |
| **algo_kwargs, | |
| tensorboard_log=self.config.tensorboard_log | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported algorithm: {self.config.algorithm}") | |
| # Train the model | |
| logger.info(f"Starting training with {total_timesteps} timesteps") | |
| self.model.learn( | |
| total_timesteps=total_timesteps, | |
| callback=self.callback, | |
| progress_bar=True | |
| ) | |
| # Save the final model | |
| model_path = f"{model_save_path}/final_model" | |
| self.model.save(model_path) | |
| logger.info(f"Training completed. Model saved to {model_path}") | |
| return { | |
| 'success': True, | |
| 'algorithm': self.config.algorithm, | |
| 'total_timesteps': total_timesteps, | |
| 'model_path': model_path | |
| } | |
| except Exception as e: | |
| logger.error(f"Error during training: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def predict(self, data: pd.DataFrame, config: Dict[str, Any], | |
| use_real_broker: bool = False) -> Dict[str, Any]: | |
| """ | |
| Make predictions using the trained model | |
| Args: | |
| data: Market data for prediction | |
| config: Configuration dictionary | |
| use_real_broker: Whether to use real Alpaca broker for execution | |
| Returns: | |
| Prediction results dictionary | |
| """ | |
| try: | |
| if self.model is None: | |
| # Try to load model | |
| finrl_config = config.get('finrl', {}) | |
| inference_config = finrl_config.get('inference', {}) | |
| model_path = inference_config.get('model_path', 'models/finrl/final_model') | |
| use_trained_model = inference_config.get('use_trained_model', True) | |
| if use_trained_model: | |
| self.model = self._load_model(model_path, config) | |
| if self.model is None: | |
| return {'success': False, 'error': 'No trained model available'} | |
| else: | |
| return {'success': False, 'error': 'No model available for prediction'} | |
| # Prepare data | |
| prepared_data = self.prepare_data(data) | |
| # Create environment | |
| env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
| # Run prediction | |
| obs, _ = env.reset() | |
| done = False | |
| actions = [] | |
| rewards = [] | |
| portfolio_values = [] | |
| while not done: | |
| action, _ = self.model.predict(obs, deterministic=True) | |
| obs, reward, done, _, info = env.step(action) | |
| actions.append(action) | |
| rewards.append(reward) | |
| portfolio_values.append(info['portfolio_value']) | |
| # Calculate final metrics | |
| initial_value = config.get('trading', {}).get('capital', 100000) | |
| final_value = portfolio_values[-1] if portfolio_values else initial_value | |
| total_return = (final_value - initial_value) / initial_value | |
| return { | |
| 'success': True, | |
| 'actions': actions, | |
| 'rewards': rewards, | |
| 'portfolio_values': portfolio_values, | |
| 'initial_value': initial_value, | |
| 'final_value': final_value, | |
| 'total_return': total_return, | |
| 'total_trades': len([a for a in actions if a != 1]) # Count non-hold actions | |
| } | |
| except Exception as e: | |
| logger.error(f"Error during prediction: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def evaluate(self, data: pd.DataFrame, config: Dict[str, Any], | |
| use_real_broker: bool = False) -> Dict[str, Any]: | |
| """ | |
| Evaluate the trained model on test data | |
| Args: | |
| data: Market data for evaluation | |
| config: Configuration dictionary | |
| use_real_broker: Whether to use real Alpaca broker for execution | |
| Returns: | |
| Evaluation results dictionary | |
| """ | |
| try: | |
| if self.model is None: | |
| raise ValueError("Model not trained") | |
| # Prepare data | |
| prepared_data = self.prepare_data(data) | |
| # Create environment | |
| env = self.create_environment(prepared_data, config, use_real_broker=use_real_broker) | |
| # Run evaluation | |
| obs, _ = env.reset() | |
| done = False | |
| actions = [] | |
| rewards = [] | |
| portfolio_values = [] | |
| while not done: | |
| action, _ = self.model.predict(obs, deterministic=True) | |
| obs, reward, done, _, info = env.step(action) | |
| actions.append(action) | |
| rewards.append(reward) | |
| portfolio_values.append(info['portfolio_value']) | |
| # Calculate evaluation metrics | |
| initial_value = config.get('trading', {}).get('capital', 100000) | |
| final_value = portfolio_values[-1] if portfolio_values else initial_value | |
| total_return = (final_value - initial_value) / initial_value | |
| # Calculate additional metrics | |
| total_trades = len([a for a in actions if a != 1]) # Count non-hold actions | |
| avg_reward = np.mean(rewards) if rewards else 0 | |
| max_drawdown = self._calculate_max_drawdown(portfolio_values) | |
| return { | |
| 'success': True, | |
| 'total_return': total_return, | |
| 'total_trades': total_trades, | |
| 'avg_reward': avg_reward, | |
| 'max_drawdown': max_drawdown, | |
| 'final_portfolio_value': final_value, | |
| 'initial_portfolio_value': initial_value, | |
| 'actions': actions, | |
| 'rewards': rewards, | |
| 'portfolio_values': portfolio_values | |
| } | |
| except Exception as e: | |
| logger.error(f"Error during evaluation: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def save_model(self, model_path: str) -> bool: | |
| """ | |
| Save the trained model | |
| Args: | |
| model_path: Path to save the model | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| if self.model is None: | |
| raise ValueError("Model not trained") | |
| self.model.save(model_path) | |
| logger.info(f"Model saved to {model_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving model: {e}") | |
| return False | |
| def load_model(self, model_path: str, config: Dict[str, Any]) -> bool: | |
| """ | |
| Load a trained model | |
| Args: | |
| model_path: Path to the model | |
| config: Configuration dictionary | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| self.model = self._load_model(model_path, config) | |
| if self.model is None: | |
| return False | |
| logger.info(f"Model loaded from {model_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| return False | |
| def _calculate_max_drawdown(self, portfolio_values: List[float]) -> float: | |
| """Calculate maximum drawdown from portfolio values""" | |
| if not portfolio_values: | |
| return 0.0 | |
| peak = portfolio_values[0] | |
| max_drawdown = 0.0 | |
| for value in portfolio_values: | |
| if value > peak: | |
| peak = value | |
| drawdown = (peak - value) / peak | |
| max_drawdown = max(max_drawdown, drawdown) | |
| return max_drawdown | |
| def _load_model(self, model_path: str, config: Dict[str, Any]): | |
| """Load a trained model""" | |
| try: | |
| # Get algorithm from config or use default | |
| finrl_config = config.get('finrl', {}) | |
| algorithm = finrl_config.get('algorithm', self.config.algorithm) | |
| if algorithm == "PPO": | |
| return PPO.load(model_path) | |
| elif algorithm == "A2C": | |
| return A2C.load(model_path) | |
| elif algorithm == "DDPG": | |
| return DDPG.load(model_path) | |
| elif algorithm == "TD3": | |
| return TD3.load(model_path) | |
| else: | |
| logger.error(f"Unsupported algorithm for model loading: {algorithm}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| return None | |
| def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: | |
| """Calculate RSI indicator""" | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| return 100 - (100 / (1 + rs)) | |
| def _calculate_bollinger_bands(self, prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series]: | |
| """Calculate Bollinger Bands""" | |
| sma = prices.rolling(window=period).mean() | |
| std = prices.rolling(window=period).std() | |
| upper_band = sma + (std * std_dev) | |
| lower_band = sma - (std * std_dev) | |
| return upper_band, lower_band | |
| def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.Series: | |
| """Calculate MACD indicator""" | |
| ema_fast = prices.ewm(span=fast).mean() | |
| ema_slow = prices.ewm(span=slow).mean() | |
| macd = ema_fast - ema_slow | |
| return macd | |
| def create_finrl_agent_from_config(config: FinRLConfig) -> FinRLAgent: | |
| """Create a FinRL agent from configuration""" | |
| return FinRLAgent(config) |