""" Trading Strategy Return Analysis Tool Usage: 1. Modify the configuration parameter lists (assets, models, agents) in the main() function 2. Run directly: python get_return.py Supports batch analysis: - assets: Asset list, e.g., ["BTC", "TSLA", "AAPL"] - models: Model list, e.g., ["gpt_4o", "gpt_4.1"] - agents: Agent list, e.g., ["HedgeFundAgent", "FinAgent", "TradeAgent"] Will automatically calculate all combinations and output results in order. File naming format: action/{agent}_{asset}_{model}_trading_decisions.json Example: action/HedgeFundAgent_BTC_gpt_4o_trading_decisions.json """ import json import os import pickle import numpy as np import pandas as pd from scipy.stats import ttest_rel from datetime import datetime, timedelta # Import price fetching functions from get_daily_news import get_asset_price, is_crypto, is_stock # Global price cache to avoid repeated API calls _price_cache = {} CACHE_FILE = "cache/price_cache.pkl" def load_price_cache(): """Load price cache from local pkl file""" global _price_cache try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, 'rb') as f: _price_cache = pickle.load(f) # Count loaded cache information total_entries = sum(len(dates) for dates in _price_cache.values()) symbols = list(_price_cache.keys()) else: _price_cache = {} except Exception as e: _price_cache = {} def save_price_cache(): """Save price cache to local pkl file""" global _price_cache try: # Ensure cache directory exists os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) with open(CACHE_FILE, 'wb') as f: pickle.dump(_price_cache, f) # Count saved cache information total_entries = sum(len(dates) for dates in _price_cache.values()) symbols = list(_price_cache.keys()) except Exception as e: pass def preload_prices(symbol, start_date, end_date): """Preload all price data within specified time range to cache""" global _price_cache # On first call, load cache from local file if not _price_cache: load_price_cache() # Preload price data # Generate date range dates = pd.date_range(start=start_date, end=end_date, freq='D') cache_key = symbol if cache_key not in _price_cache: _price_cache[cache_key] = {} # Count API calls api_calls = 0 cached_hits = 0 # Batch fetch price data for current_date in dates: date_str = current_date.strftime('%Y-%m-%d') if date_str not in _price_cache[cache_key]: price = get_asset_price(symbol, date_str) # Directly call API to fill cache _price_cache[cache_key][date_str] = price api_calls += 1 else: cached_hits += 1 # Complete price data preloading # Save cache if there were new API calls if api_calls > 0: save_price_cache() def get_cached_price(symbol, date_str): """Get price from cache, call API directly if not in cache""" global _price_cache # On first call, load cache from local file if not _price_cache: load_price_cache() cache_key = symbol if cache_key in _price_cache and date_str in _price_cache[cache_key]: # Get from cache return _price_cache[cache_key][date_str] else: # If not in cache, call API directly (fallback solution) price = get_asset_price(symbol, date_str) # Cache the API result as well if cache_key not in _price_cache: _price_cache[cache_key] = {} _price_cache[cache_key][date_str] = price # Immediately save newly fetched price save_price_cache() return price def clear_price_cache(): """Save price cache but don't clear memory (for compatibility with existing code)""" global _price_cache # Count cache information total_entries = sum(len(dates) for dates in _price_cache.values()) symbols = list(_price_cache.keys()) # Save to file instead of clearing save_price_cache() def force_clear_cache(): """Force clear memory cache (actual clearing function)""" global _price_cache # Count cache information total_entries = sum(len(dates) for dates in _price_cache.values()) symbols = list(_price_cache.keys()) # Save first then clear save_price_cache() _price_cache.clear() def run_compounding_simulation(recommendations, initial_capital=100000, trade_fee=0.0005, strategy='long_short', trading_mode='normal', asset_type='stock', symbol=None): """ Runs a realistic trading simulation with compounding capital and returns a daily capital series. trading_mode: - 'normal': Original strategy - HOLD: keep current position - BUY: open long if flat, ignore if in position - SELL: open short if flat, close if long - 'aggressive': New strategy - HOLD: force close to flat - BUY: close short (if short) then open long - SELL: close long (if long) then open short """ capital = float(initial_capital) position = 'FLAT' entry_price = 0 capital_series = [] rec_map = {rec['date']: rec for rec in recommendations} start_date = datetime.fromisoformat(recommendations[0]['date']) end_date = datetime.fromisoformat(recommendations[-1]['date']) # symbol must be provided, no default values if symbol is None: raise ValueError("Symbol must be provided for run_compounding_simulation, cannot use default values") # Use all calendar days (let price fetching function decide if valid) dates = pd.date_range(start=start_date, end=end_date, freq='D') # Record previous trading day's capital for filling non-trading days last_capital = capital for current_date in dates: date_str = current_date.strftime('%Y-%m-%d') # Actually get current day's price (based on asset type) current_price = get_cached_price(symbol, date_str) if current_price is None: # If price is null (market closed), skip this day capital_series.append(last_capital) continue daily_capital = capital if position == 'LONG': daily_capital = capital * (current_price / entry_price) if entry_price != 0 else capital elif position == 'SHORT': daily_capital = capital * (1 + (entry_price - current_price) / entry_price) if entry_price != 0 else capital # Execute trades for the current day BEFORE recording capital # Check if the date exists in recommendations, default to HOLD if not if date_str in rec_map: action = rec_map[date_str].get('recommended_action', 'HOLD') else: action = 'HOLD' # Default action for missing dates if trading_mode == 'normal': # Original strategy: HOLD keeps position if action == 'HOLD': # Keep current position, do nothing pass elif action == 'BUY': if position == 'FLAT': position, entry_price = 'LONG', current_price capital *= (1 - trade_fee) daily_capital = capital # Update daily capital after trade elif position == 'SHORT': # Close short position first return_pct = (entry_price - current_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) # Then open long position position, entry_price = 'LONG', current_price capital *= (1 - trade_fee) daily_capital = capital elif action == 'SELL': if position == 'LONG': return_pct = (current_price - entry_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 daily_capital = capital # Update daily capital after trade elif position == 'FLAT' and strategy == 'long_short': position, entry_price = 'SHORT', current_price capital *= (1 - trade_fee) daily_capital = capital # Update daily capital after trade else: # New strategy: HOLD closes position, BUY/SELL switches position directly if action == 'HOLD': # Force close position if position == 'LONG': return_pct = (current_price - entry_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 daily_capital = capital elif position == 'SHORT': return_pct = (entry_price - current_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 daily_capital = capital elif action == 'BUY': if position == 'SHORT': # First close short position return_pct = (entry_price - current_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 daily_capital = capital # Update daily_capital if position == 'FLAT': # Then open long position position, entry_price = 'LONG', current_price capital *= (1 - trade_fee) daily_capital = capital elif action == 'SELL': if position == 'LONG': # First close long position return_pct = (current_price - entry_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 daily_capital = capital # Update daily_capital if position == 'FLAT' and strategy == 'long_short': # Then open short position position, entry_price = 'SHORT', current_price capital *= (1 - trade_fee) daily_capital = capital # Record capital after all trades are executed capital_series.append(daily_capital) last_capital = daily_capital # Force close position on the last day if current_date == dates[-1] and position != 'FLAT': if position == 'LONG': return_pct = (current_price - entry_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) elif position == 'SHORT': return_pct = (entry_price - current_price) / entry_price if entry_price != 0 else 0 capital *= (1 + return_pct) * (1 - trade_fee) position, entry_price = 'FLAT', 0 capital_series[-1] = capital # Update the last capital value return capital_series def calculate_buy_and_hold_series(recommendations, initial_capital=100000, trade_fee=0.0005, asset_type='stock', symbol=None): """Calculate buy and hold strategy performance""" capital_series = [] rec_map = {rec['date']: rec for rec in recommendations} start_date = datetime.fromisoformat(recommendations[0]['date']) end_date = datetime.fromisoformat(recommendations[-1]['date']) # symbol must be provided, no default values if symbol is None: raise ValueError("Symbol must be provided for calculate_buy_and_hold_series, cannot use default values") # Get first valid price as buy price buy_price = None first_date_str = start_date.strftime('%Y-%m-%d') buy_price = get_cached_price(symbol, first_date_str) # Use cache if buy_price is None: # If no price on first day, find first valid price current_date = start_date while current_date <= end_date and buy_price is None: date_str = current_date.strftime('%Y-%m-%d') buy_price = get_cached_price(symbol, date_str) current_date += timedelta(days=1) if buy_price is None or buy_price <= 0: # If no valid price throughout the period, return empty sequence print(f"Warning: No valid buy price found for {symbol} in period {start_date} to {end_date}") return [] # Buy on first day, charge opening fee capital = initial_capital * (1 - trade_fee) # Use all calendar days (let price fetching function decide if valid) dates = pd.date_range(start=start_date, end=end_date, freq='D') last_price = buy_price for i, current_date in enumerate(dates): date_str = current_date.strftime('%Y-%m-%d') # Actually get current day's price (based on asset type) current_price = get_cached_price(symbol, date_str) # If price is null, skip this day and use last valid price if current_price is None: daily_capital = capital * (last_price / buy_price) if buy_price != 0 else capital capital_series.append(daily_capital) continue # Calculate current market value daily_capital = capital * (current_price / buy_price) if buy_price != 0 else capital # Sell on last day, charge closing fee if i == len(dates) - 1: # Use index to determine last day daily_capital *= (1 - trade_fee) capital_series.append(daily_capital) last_price = current_price return capital_series def get_daily_returns(capital_series): """Calculate daily returns from capital series""" series = pd.Series(capital_series) return series.pct_change().fillna(0) def calculate_metrics(capital_series, recommendations, asset_type='stock'): """ Calculate performance metrics for different asset types Parameters: - capital_series: list of daily capital values - recommendations: list of trading recommendations - asset_type: 'stock' or 'crypto' """ if len(capital_series) == 0: return { 'total_return': 0, 'ann_return': 0, 'ann_vol': 0, 'sharpe_ratio': 0, 'max_drawdown': 0 } daily_returns = get_daily_returns(capital_series) # Total Return total_return = (capital_series[-1] - capital_series[0]) / capital_series[0] * 100 # Choose annualization parameters based on asset type if asset_type == 'stock': annual_days = 252 # Stock trading days per year # For stocks, the capital series includes calendar days; weekends/holidays # create zero returns that artificially depress volatility. # Filter out zero-return days to approximate trading days only. trading_returns = daily_returns[daily_returns != 0] effective_returns = trading_returns if len(trading_returns) > 0 else daily_returns n_days_effective = len(effective_returns) if len(effective_returns) > 0 else len(daily_returns) ann_vol = (effective_returns.std() * np.sqrt(annual_days) * 100) if len(effective_returns) > 1 else 0 # Annualized return uses effective trading day count if n_days_effective > 1: ann_return = (((capital_series[-1] / capital_series[0]) ** (annual_days / n_days_effective)) - 1) * 100 else: ann_return = total_return else: # crypto annual_days = 365 # Cryptocurrency trades year-round n_days_effective = len(daily_returns) ann_vol = daily_returns.std() * np.sqrt(annual_days) * 100 if len(daily_returns) > 1 else 0 if n_days_effective > 1: ann_return = (((capital_series[-1] / capital_series[0]) ** (annual_days / n_days_effective)) - 1) * 100 else: ann_return = total_return # Sharpe Ratio (assuming risk-free rate = 0) # Use standard daily mean/std approach with consistent day count per asset type if asset_type == 'stock': sharpe_base_returns = effective_returns else: sharpe_base_returns = daily_returns mean_daily = sharpe_base_returns.mean() if len(sharpe_base_returns) > 0 else 0 std_daily = sharpe_base_returns.std() if len(sharpe_base_returns) > 1 else 0 if std_daily and std_daily > 0: sharpe_ratio = (mean_daily / std_daily) * np.sqrt(annual_days) else: sharpe_ratio = 0 # Maximum Drawdown capital_series_pd = pd.Series(capital_series) rolling_max = capital_series_pd.expanding().max() drawdowns = (capital_series_pd - rolling_max) / rolling_max max_drawdown = drawdowns.min() * 100 if len(drawdowns) > 0 else 0 return { 'total_return': total_return, 'ann_return': ann_return, 'ann_vol': ann_vol, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown } def print_metrics_table(strategies_data, headers): """Print formatted metrics table""" metrics = ['total_return', 'ann_return', 'ann_vol', 'sharpe_ratio', 'max_drawdown'] metric_headers = { 'total_return': 'Total Return % (↑)', 'ann_return': 'Ann. Return % (↑)', 'ann_vol': 'Ann. Vol % (↓)', 'sharpe_ratio': 'Sharpe Ratio (↑)', 'max_drawdown': 'Max DD % (↓)' } # Calculate column widths col_widths = {m: max(12, len(metric_headers[m]) + 1) for m in metrics} # Print header header_line = f"{'Strategy':<20} | " + " | ".join(f"{metric_headers[m]:>{col_widths[m]}}" for m in metrics) print(header_line) print("-" * len(header_line)) # Print strategy data for name, data in strategies_data: line = f"{name:<20} | " + " | ".join(f"{data[metric]:>{col_widths[metric]}.2f}" for metric in metrics) print(line) def discover_available_files(): """ Automatically discover all trading decision files in action directory and return available combinations """ action_dir = 'action' if not os.path.exists(action_dir): print(f"Error: {action_dir} directory not found") return [], [], [] available_agents = set() available_assets = set() available_models = set() found_files = [] # Scan all json files for filename in os.listdir(action_dir): if filename.endswith('_trading_decisions.json'): # Parse filename format: {agent}_{asset}_{model}_trading_decisions.json parts = filename.replace('_trading_decisions.json', '').split('_') if len(parts) >= 3: # Parse based on known model name patterns base_name = '_'.join(parts) if 'claude_sonnet_4_20250514' in base_name: # claude_sonnet_4_20250514 format model = 'claude_sonnet_4_20250514' remaining = base_name.replace('_claude_sonnet_4_20250514', '') elif 'claude_3_5_haiku_20241022' in base_name: # claude_3_5_haiku_20241022 format model = 'claude_3_5_haiku_20241022' remaining = base_name.replace('_claude_3_5_haiku_20241022', '') elif 'gemini_2.0_flash' in base_name: # gemini_2.0_flash format model = 'gemini_2.0_flash' remaining = base_name.replace('_gemini_2.0_flash', '') elif 'gpt_4o' in base_name: # gpt_4o format model = 'gpt_4o' remaining = base_name.replace('_gpt_4o', '') elif 'gpt_4.1' in base_name: # gpt_4.1 format model = 'gpt_4.1' remaining = base_name.replace('_gpt_4.1', '') elif 'vote' in base_name: # vote format model = 'vote' remaining = base_name.replace('_vote', '') else: # Default handling: last two parts are model model = '_'.join(parts[-2:]) remaining = '_'.join(parts[:-2]) # Extract asset and agent from remaining parts remaining_parts = remaining.split('_') if len(remaining_parts) >= 2: asset = remaining_parts[-1] # Last part is asset agent = '_'.join(remaining_parts[:-1]) # Previous parts are agent available_agents.add(agent) available_assets.add(asset) available_models.add(model) found_files.append((agent, asset, model, filename)) # Silently discover files, no detailed output return sorted(available_agents), sorted(available_assets), sorted(available_models) def analyze_and_print(title, recommendations, asset_type='stock', symbol=None): """Analyze and print strategy performance comparison""" print(f"\n{'='*60}") print(f"{title:^60}") print(f"{'='*60}") if not recommendations: print("No recommendations to analyze.") return # Preload price data (get all needed prices at once) start_date = recommendations[0]['date'] end_date = recommendations[-1]['date'] preload_prices(symbol, start_date, end_date) # Calculate Buy & Hold strategy (calculate only once) bh_series = calculate_buy_and_hold_series(recommendations, asset_type=asset_type, symbol=symbol) bh_metrics = calculate_metrics(bh_series, recommendations, asset_type=asset_type) # Strategy 1: HOLD KEEP current (keep position) ls_keep_current = run_compounding_simulation(recommendations, strategy='long_short', trading_mode='normal', asset_type=asset_type, symbol=symbol) lo_keep_current = run_compounding_simulation(recommendations, strategy='long_only', trading_mode='normal', asset_type=asset_type, symbol=symbol) # Calculate metrics for Strategy 1 ls_metrics = calculate_metrics(ls_keep_current, recommendations, asset_type=asset_type) lo_metrics = calculate_metrics(lo_keep_current, recommendations, asset_type=asset_type) # Print Strategy 1 metrics print("\nStrategy 1 (HOLD keeps position):") strategies_data = [ ('Long/Short', ls_metrics), ('Long-Only', lo_metrics), ('Buy & Hold', bh_metrics) ] print_metrics_table(strategies_data, None) # Strategy 2: HOLD KEEP FLAT (force close position) ls_keep_flat = run_compounding_simulation(recommendations, strategy='long_short', trading_mode='aggressive', asset_type=asset_type, symbol=symbol) lo_keep_flat = run_compounding_simulation(recommendations, strategy='long_only', trading_mode='aggressive', asset_type=asset_type, symbol=symbol) # Calculate metrics for Strategy 2 ls_flat_metrics = calculate_metrics(ls_keep_flat, recommendations, asset_type=asset_type) lo_flat_metrics = calculate_metrics(lo_keep_flat, recommendations, asset_type=asset_type) # Print Strategy 2 metrics print("\nStrategy 2 (HOLD forces flat):") strategies_data = [ ('Long/Short', ls_flat_metrics), ('Long-Only', lo_flat_metrics), ('Buy & Hold', bh_metrics) ] print_metrics_table(strategies_data, None) print(f"{asset_type.upper()} {symbol} | {recommendations[0]['date']} to {recommendations[-1]['date']} | {len(ls_keep_current)} days") def main(): """Main function to run the analysis""" # =========================================== # Configuration Parameters - Modify here # =========================================== # Whether to auto-discover available files (True: auto-discover, False: use manual configuration below) auto_discover = False # Manual configuration parameters (only used when auto_discover = False) # Asset symbol list (e.g.: BTC, TSLA, AAPL, etc.) assets = ['TSLA']#["BTC", 'TSLA'] # Only analyze BTC # Model name list (e.g.: gpt_4o, gpt_4.1) models = ["gpt_4o", "gpt_4.1", "gemini_2.0_flash","claude_3_5_haiku_20241022", "claude_sonnet_4_20250514", "vote"] # models = ['vote'] # Agent name list (e.g.: HedgeFundAgent, FinAgent, TradeAgent) agents = ['InvestorAgent', "TradeAgent"]# "InvestorAgent", "HedgeFundAgent", "DeepFundAgent"] # Multiple agents to analyze # =========================================== # Analysis Logic - No need to modify # =========================================== # If auto-discovery is enabled, scan existing files if auto_discover: print("🔍 Auto-discovering available files...") discovered_agents, discovered_assets, discovered_models = discover_available_files() print(f"Discovered files: Agents={discovered_agents}, Assets={discovered_assets}, Models={discovered_models}") if discovered_agents and discovered_assets and discovered_models: agents, assets, models = discovered_agents, discovered_assets, discovered_models print(f"✅ Using auto-discovered parameters: Agents={agents}, Assets={assets}, Models={models}") else: print("⚠️ Auto-discovery failed, using manual configuration parameters") # Iterate through all combinations for agent in agents: for asset in assets: for model in models: # Construct file path: action/{agent}_{asset}_{model}_trading_decisions.json file_path = f'action/{agent}_{asset}_{model}_trading_decisions.json' # Determine asset type symbol = asset if asset in ['BTC', 'ETH', 'ADA', 'SOL', 'DOT', 'LINK', 'UNI', 'MATIC', 'AVAX', 'ATOM']: asset_type = 'crypto' elif asset in ['TSLA', 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'META', 'NFLX', 'AMD', 'INTC']: asset_type = 'stock' else: asset_type = 'stock' try: if not os.path.exists(file_path): print(f"File not found: {file_path}") continue with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) recs = data.get('recommendations', []) if not recs: print(f"No recommendations found in {file_path}") continue # Validate recommendation format valid_format = True for rec in recs: if 'date' not in rec or 'price' not in rec: print(f"Invalid recommendation format in {file_path}") valid_format = False break if not valid_format: continue recs.sort(key=lambda x: datetime.fromisoformat(x['date'])) title = f"{agent}_{asset}_{model} ({data.get('start_date', 'Unknown')} to {data.get('end_date', 'Unknown')})" analyze_and_print(title, recs, asset_type=asset_type, symbol=symbol) except Exception as e: print(f"Error processing {file_path}: {e}") continue # Clear price cache to free memory clear_price_cache() if __name__ == "__main__": main()