Spaces:
Running
Running
| import gradio as gr | |
| import psutil | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import yfinance as yf | |
| import torch | |
| from chronos import BaseChronosPipeline | |
| from chronos import ChronosPipeline | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from sklearn.preprocessing import MinMaxScaler, StandardScaler | |
| import plotly.express as px | |
| from typing import Dict, List, Tuple, Optional, Union | |
| import json | |
| import os | |
| import spaces | |
| import gc | |
| import pytz | |
| import time | |
| import random | |
| import platform | |
| from scipy import stats | |
| from scipy.optimize import minimize | |
| import warnings | |
| import threading | |
| from dataclasses import dataclass | |
| from transformers import GenerationConfig | |
| warnings.filterwarnings('ignore') | |
| # Additional imports for advanced features | |
| try: | |
| from hmmlearn import hmm | |
| HMM_AVAILABLE = True | |
| except ImportError: | |
| HMM_AVAILABLE = False | |
| print("Warning: hmmlearn not available. Regime detection will use simplified methods.") | |
| try: | |
| from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor | |
| from sklearn.linear_model import LinearRegression, Ridge, Lasso | |
| from sklearn.svm import SVR | |
| from sklearn.neural_network import MLPRegressor | |
| from sklearn.model_selection import TimeSeriesSplit, cross_val_score | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| ENSEMBLE_AVAILABLE = True | |
| except ImportError: | |
| ENSEMBLE_AVAILABLE = False | |
| print("Warning: scikit-learn not available. Ensemble methods will be simplified.") | |
| # Additional imports for enhanced features | |
| try: | |
| import requests | |
| import re | |
| from textblob import TextBlob | |
| SENTIMENT_AVAILABLE = True | |
| except ImportError: | |
| SENTIMENT_AVAILABLE = False | |
| print("Warning: sentiment analysis not available.") | |
| try: | |
| from arch import arch_model | |
| GARCH_AVAILABLE = True | |
| except ImportError: | |
| GARCH_AVAILABLE = False | |
| print("Warning: arch not available. GARCH modeling will be simplified.") | |
| # Market status management | |
| class MarketStatus: | |
| """Data class to hold market status information""" | |
| is_open: bool | |
| status_text: str | |
| next_trading_day: str | |
| last_updated: str | |
| time_until_open: str | |
| time_until_close: str | |
| current_time_et: str | |
| market_name: str | |
| market_type: str | |
| market_symbol: str | |
| # Global configuration for market status updates | |
| MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes | |
| # Market configurations for different exchanges | |
| MARKET_CONFIGS = { | |
| 'US_STOCKS': { | |
| 'name': 'US Stock Market', | |
| 'symbol': '^GSPC', # S&P 500 | |
| 'type': 'stocks', | |
| 'timezone': 'US/Eastern', | |
| 'open_time': '09:30', | |
| 'close_time': '16:00', | |
| 'days': [0, 1, 2, 3, 4], # Monday to Friday | |
| 'description': 'NYSE, NASDAQ, AMEX' | |
| }, | |
| 'US_FUTURES': { | |
| 'name': 'US Futures Market', | |
| 'symbol': 'ES=F', # E-mini S&P 500 | |
| 'type': 'futures', | |
| 'timezone': 'US/Eastern', | |
| 'open_time': '18:00', # Previous day | |
| 'close_time': '17:00', # Current day | |
| 'days': [0, 1, 2, 3, 4, 5, 6], # 24/7 trading | |
| 'description': 'CME, ICE, CBOT' | |
| }, | |
| 'FOREX': { | |
| 'name': 'Forex Market', | |
| 'symbol': 'EURUSD=X', | |
| 'type': 'forex', | |
| 'timezone': 'UTC', | |
| 'open_time': '00:00', | |
| 'close_time': '23:59', | |
| 'days': [0, 1, 2, 3, 4, 5, 6], # 24/7 trading | |
| 'description': 'Global Currency Exchange' | |
| }, | |
| 'CRYPTO': { | |
| 'name': 'Cryptocurrency Market', | |
| 'symbol': 'BTC-USD', | |
| 'type': 'crypto', | |
| 'timezone': 'UTC', | |
| 'open_time': '00:00', | |
| 'close_time': '23:59', | |
| 'days': [0, 1, 2, 3, 4, 5, 6], # 24/7 trading | |
| 'description': 'Bitcoin, Ethereum, Altcoins' | |
| }, | |
| 'COMMODITIES': { | |
| 'name': 'Commodities Market', | |
| 'symbol': 'GC=F', # Gold Futures | |
| 'type': 'commodities', | |
| 'timezone': 'US/Eastern', | |
| 'open_time': '18:00', # Previous day | |
| 'close_time': '17:00', # Current day | |
| 'days': [0, 1, 2, 3, 4, 5, 6], # 24/7 trading | |
| 'description': 'Gold, Silver, Oil, Natural Gas' | |
| }, | |
| 'EUROPE': { | |
| 'name': 'European Markets', | |
| 'symbol': '^STOXX50E', # EURO STOXX 50 | |
| 'type': 'stocks', | |
| 'timezone': 'Europe/London', | |
| 'open_time': '08:00', | |
| 'close_time': '16:30', | |
| 'days': [0, 1, 2, 3, 4], # Monday to Friday | |
| 'description': 'London, Frankfurt, Paris' | |
| }, | |
| 'ASIA': { | |
| 'name': 'Asian Markets', | |
| 'symbol': '^N225', # Nikkei 225 | |
| 'type': 'stocks', | |
| 'timezone': 'Asia/Tokyo', | |
| 'open_time': '09:00', | |
| 'close_time': '15:30', | |
| 'days': [0, 1, 2, 3, 4], # Monday to Friday | |
| 'description': 'Tokyo, Hong Kong, Shanghai' | |
| } | |
| } | |
| class MarketStatusManager: | |
| """Manages market status with periodic updates for multiple markets""" | |
| def __init__(self): | |
| self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds | |
| self._statuses = {} | |
| self._lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| self._update_thread = None | |
| self._start_update_thread() | |
| def _get_current_market_status(self, market_key: str = 'US_STOCKS') -> MarketStatus: | |
| """Get current market status with detailed information for specific market""" | |
| config = MARKET_CONFIGS[market_key] | |
| now = datetime.now() | |
| # Convert to market timezone | |
| market_tz = pytz.timezone(config['timezone']) | |
| market_time = now.astimezone(market_tz) | |
| # Parse market hours | |
| open_hour, open_minute = map(int, config['open_time'].split(':')) | |
| close_hour, close_minute = map(int, config['close_time'].split(':')) | |
| # Check if it's a trading day | |
| is_trading_day = market_time.weekday() in config['days'] | |
| # Create market open/close times | |
| market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0) | |
| market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0) | |
| # Handle 24/7 markets and overnight sessions | |
| if config['type'] in ['futures', 'forex', 'crypto', 'commodities']: | |
| # 24/7 markets are always considered open | |
| is_open = True | |
| status_text = f"{config['name']} is currently open (24/7)" | |
| else: | |
| # Traditional markets | |
| if not is_trading_day: | |
| is_open = False | |
| status_text = f"{config['name']} is closed (Weekend)" | |
| elif market_open_time <= market_time <= market_close_time: | |
| is_open = True | |
| status_text = f"{config['name']} is currently open" | |
| else: | |
| is_open = False | |
| if market_time < market_open_time: | |
| status_text = f"{config['name']} is closed (Before opening)" | |
| else: | |
| status_text = f"{config['name']} is closed (After closing)" | |
| # Calculate next trading day | |
| next_trading_day = self._get_next_trading_day(market_time, config['days']) | |
| # Calculate time until open/close | |
| time_until_open = self._get_time_until_open(market_time, market_open_time, config) | |
| time_until_close = self._get_time_until_close(market_time, market_close_time, config) | |
| return MarketStatus( | |
| is_open=is_open, | |
| status_text=status_text, | |
| next_trading_day=next_trading_day.strftime('%Y-%m-%d'), | |
| last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'), | |
| time_until_open=time_until_open, | |
| time_until_close=time_until_close, | |
| current_time_et=market_time.strftime('%H:%M:%S %Z'), | |
| market_name=config['name'], | |
| market_type=config['type'], | |
| market_symbol=config['symbol'] | |
| ) | |
| def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime: | |
| """Get the next trading day for a specific market""" | |
| next_day = current_time + timedelta(days=1) | |
| # Skip non-trading days | |
| while next_day.weekday() not in trading_days: | |
| next_day += timedelta(days=1) | |
| return next_day | |
| def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str: | |
| """Calculate time until market opens""" | |
| if config['type'] in ['futures', 'forex', 'crypto', 'commodities']: | |
| return "N/A (24/7 Market)" | |
| if current_time.weekday() not in config['days']: | |
| # Weekend - calculate to next trading day | |
| days_until_next = 1 | |
| while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: | |
| days_until_next += 1 | |
| next_trading_day = current_time + timedelta(days=days_until_next) | |
| next_open = next_trading_day.replace( | |
| hour=int(config['open_time'].split(':')[0]), | |
| minute=int(config['open_time'].split(':')[1]), | |
| second=0, microsecond=0 | |
| ) | |
| time_diff = next_open - current_time | |
| else: | |
| if current_time < market_open_time: | |
| time_diff = market_open_time - current_time | |
| else: | |
| # Market already opened today, next opening is tomorrow | |
| tomorrow = current_time + timedelta(days=1) | |
| if tomorrow.weekday() in config['days']: | |
| next_open = tomorrow.replace( | |
| hour=int(config['open_time'].split(':')[0]), | |
| minute=int(config['open_time'].split(':')[1]), | |
| second=0, microsecond=0 | |
| ) | |
| time_diff = next_open - current_time | |
| else: | |
| # Next day is weekend, calculate to next trading day | |
| days_until_next = 1 | |
| while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: | |
| days_until_next += 1 | |
| next_trading_day = current_time + timedelta(days=days_until_next) | |
| next_open = next_trading_day.replace( | |
| hour=int(config['open_time'].split(':')[0]), | |
| minute=int(config['open_time'].split(':')[1]), | |
| second=0, microsecond=0 | |
| ) | |
| time_diff = next_open - current_time | |
| return self._format_time_delta(time_diff) | |
| def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str: | |
| """Calculate time until market closes""" | |
| if config['type'] in ['futures', 'forex', 'crypto', 'commodities']: | |
| return "N/A (24/7 Market)" | |
| if current_time.weekday() not in config['days']: | |
| return "N/A (Weekend)" | |
| if current_time < market_close_time: | |
| time_diff = market_close_time - current_time | |
| return self._format_time_delta(time_diff) | |
| else: | |
| return "Market closed for today" | |
| def _format_time_delta(self, time_diff: timedelta) -> str: | |
| """Format timedelta into human-readable string""" | |
| total_seconds = int(time_diff.total_seconds()) | |
| if total_seconds < 0: | |
| return "N/A" | |
| days = total_seconds // 86400 | |
| hours = (total_seconds % 86400) // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| if days > 0: | |
| return f"{days}d {hours}h {minutes}m" | |
| elif hours > 0: | |
| return f"{hours}h {minutes}m" | |
| else: | |
| return f"{minutes}m" | |
| def _update_loop(self): | |
| """Background thread loop for updating market status""" | |
| while not self._stop_event.is_set(): | |
| try: | |
| new_statuses = {} | |
| for market_key in MARKET_CONFIGS.keys(): | |
| new_statuses[market_key] = self._get_current_market_status(market_key) | |
| with self._lock: | |
| self._statuses = new_statuses | |
| time.sleep(self.update_interval) | |
| except Exception as e: | |
| print(f"Error updating market status: {str(e)}") | |
| time.sleep(60) # Wait 1 minute before retrying | |
| def _start_update_thread(self): | |
| """Start the background update thread""" | |
| if self._update_thread is None or not self._update_thread.is_alive(): | |
| self._stop_event.clear() | |
| self._update_thread = threading.Thread(target=self._update_loop, daemon=True) | |
| self._update_thread.start() | |
| def get_status(self, market_key: str = 'US_STOCKS') -> MarketStatus: | |
| """Get current market status (thread-safe)""" | |
| with self._lock: | |
| if market_key not in self._statuses: | |
| # Initialize if not exists | |
| self._statuses[market_key] = self._get_current_market_status(market_key) | |
| return self._statuses[market_key] | |
| def get_all_markets_status(self) -> Dict[str, MarketStatus]: | |
| """Get status for all markets""" | |
| with self._lock: | |
| return self._statuses.copy() | |
| def stop(self): | |
| """Stop the update thread""" | |
| self._stop_event.set() | |
| if self._update_thread and self._update_thread.is_alive(): | |
| self._update_thread.join(timeout=5) | |
| # Initialize global variables | |
| pipeline = None | |
| # Global market data cache | |
| market_data_cache = {} | |
| cache_expiry = {} | |
| CACHE_DURATION = 3600 # 1 hour cache | |
| # Initialize market status manager | |
| market_status_manager = MarketStatusManager() | |
| # Enhanced covariate data sources | |
| COVARIATE_SOURCES = { | |
| 'market_indices': ['^GSPC', '^DJI', '^IXIC', '^VIX', '^TNX', '^TYX'], | |
| 'sectors': ['XLF', 'XLK', 'XLE', 'XLV', 'XLI', 'XLP', 'XLU', 'XLB', 'XLY'], | |
| 'commodities': ['GC=F', 'SI=F', 'CL=F', 'NG=F', 'ZC=F', 'ZS=F'], | |
| 'currencies': ['EURUSD=X', 'GBPUSD=X', 'JPYUSD=X', 'CHFUSD=X', 'CADUSD=X'] | |
| } | |
| # Economic indicators (using yfinance symbols for simplicity) | |
| ECONOMIC_INDICATORS = { | |
| 'inflation': '^TNX', # 10-year Treasury yield as proxy | |
| 'volatility': '^VIX', # VIX volatility index | |
| 'dollar': 'UUP', # US Dollar Index | |
| 'gold': 'GLD', # Gold ETF | |
| 'oil': 'USO' # Oil ETF | |
| } | |
| def retry_yfinance_request(func, max_retries=3, initial_delay=1): | |
| """ | |
| Retry mechanism for yfinance requests with exponential backoff up to 8 seconds. | |
| Args: | |
| func: Function to retry | |
| max_retries: Maximum number of retry attempts | |
| initial_delay: Initial delay in seconds before first retry | |
| Returns: | |
| Result of the function call if successful, or None if all attempts fail | |
| """ | |
| for attempt in range(max_retries): | |
| try: | |
| result = func() | |
| # Check if result is None (common with yfinance for unavailable data) | |
| if result is None: | |
| if attempt == max_retries - 1: | |
| print(f"Function returned None after {max_retries} attempts") | |
| return None | |
| else: | |
| print(f"Function returned None (attempt {attempt + 1}/{max_retries}), retrying...") | |
| time.sleep(initial_delay * (2 ** attempt)) | |
| continue | |
| return result | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| # Check if this is the last attempt | |
| if attempt == max_retries - 1: | |
| print(f"Final attempt failed after {max_retries} retries: {str(e)}") | |
| return None # Return None instead of raising to avoid crashes | |
| # Determine delay based on error type and attempt number | |
| if "401" in error_str or "unauthorized" in error_str: | |
| # Authentication errors - longer delay | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 2)) | |
| print(f"Authentication error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| elif "429" in error_str or "rate limit" in error_str or "too many requests" in error_str: | |
| # Rate limiting - longer delay | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(1, 3)) | |
| print(f"Rate limit error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| elif "500" in error_str or "502" in error_str or "503" in error_str or "504" in error_str: | |
| # Server errors - moderate delay | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0.5, 1.5)) | |
| print(f"Server error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| elif "timeout" in error_str or "connection" in error_str: | |
| # Network errors - shorter delay | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| print(f"Network error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| else: | |
| # Generic errors - standard exponential backoff | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| print(f"Generic error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| time.sleep(delay) | |
| def clear_gpu_memory(): | |
| """Clear memory cache (CPU version)""" | |
| # For CPU-only environment, just run garbage collection | |
| gc.collect() | |
| def load_pipeline(): | |
| """Load the Chronos model with CPU configuration""" | |
| global pipeline | |
| try: | |
| if pipeline is None: | |
| clear_gpu_memory() | |
| print("Loading Chronos model...") | |
| # Use a more compatible Chronos model | |
| # The fine-tuned model performs well for intermittent demand forecasts (e.g., Intermittent and Lumpy Time Series demand forecasts for seasonal products). | |
| pipeline = BaseChronosPipeline.from_pretrained( | |
| "nieche/chronos-bolt-base-fine-tuned-v2", | |
| device_map="cpu", # Use CPU device mapping | |
| torch_dtype=torch.float32, # Use float32 for CPU | |
| low_cpu_mem_usage=False, | |
| trust_remote_code=True, | |
| use_safetensors=True | |
| ) | |
| # Set model to evaluation mode | |
| pipeline.model = pipeline.model.eval() | |
| # Disable gradient computation | |
| for param in pipeline.model.parameters(): | |
| param.requires_grad = False | |
| print("Chronos model loaded successfully") | |
| return pipeline | |
| except Exception as e: | |
| print(f"Error loading pipeline: {str(e)}") | |
| print(f"Error type: {type(e)}") | |
| print(f"Error details: {str(e)}") | |
| raise RuntimeError(f"Failed to load model: {str(e)}") | |
| def _ensure_pipeline_cpu(pipe): | |
| """Ensure Chronos pipeline model/tokenizer are on CPU and in eval mode.""" | |
| try: | |
| pipe.model = pipe.model.to(torch.device("cpu")) | |
| pipe.model.eval() | |
| except Exception: | |
| pass | |
| try: | |
| tok = getattr(pipe, 'tokenizer', None) | |
| if tok is not None and hasattr(tok, 'to'): | |
| pipe.tokenizer = tok.to(torch.device("cpu")) | |
| except Exception: | |
| pass | |
| def is_market_open() -> bool: | |
| """Check if the US stock market is currently open (legacy function for backward compatibility)""" | |
| return market_status_manager.get_status('US_STOCKS').is_open | |
| def get_next_trading_day() -> datetime: | |
| """Get the next trading day for US stocks (legacy function for backward compatibility)""" | |
| next_day_str = market_status_manager.get_status('US_STOCKS').next_trading_day | |
| return datetime.strptime(next_day_str, '%Y-%m-%d') | |
| def get_market_status_display() -> str: | |
| """Get formatted market status for display with multiple markets""" | |
| all_statuses = market_status_manager.get_all_markets_status() | |
| if not all_statuses: | |
| # Fallback to US stocks only | |
| status = market_status_manager.get_status('US_STOCKS') | |
| return _format_single_market_status(status) | |
| # Create comprehensive market status display | |
| status_message = "## 🌍 Global Market Status\n\n" | |
| # Group markets by type | |
| market_groups = { | |
| 'stocks': ['US_STOCKS', 'EUROPE', 'ASIA'], | |
| '24/7': ['FOREX', 'CRYPTO', 'US_FUTURES', 'COMMODITIES'] | |
| } | |
| for group_name, market_keys in market_groups.items(): | |
| status_message += f"### {group_name.upper()} MARKETS\n\n" | |
| for market_key in market_keys: | |
| if market_key in all_statuses: | |
| status = all_statuses[market_key] | |
| status_message += _format_market_status_line(status) | |
| status_message += "\n" | |
| # Add summary | |
| open_markets = sum(1 for status in all_statuses.values() if status.is_open) | |
| total_markets = len(all_statuses) | |
| status_message += f"**Summary:** {open_markets}/{total_markets} markets currently open\n\n" | |
| status_message += f"*Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*" | |
| return status_message | |
| def _format_single_market_status(status: MarketStatus) -> str: | |
| """Format single market status for display""" | |
| status_icon = "🟢" if status.is_open else "🔴" | |
| status_message = f""" | |
| ### {status_icon} {status.market_name}: {status.status_text} | |
| **Current Time ({status.current_time_et.split()[-1]}):** {status.current_time_et} | |
| **Next Trading Day:** {status.next_trading_day} | |
| **Last Updated:** {status.last_updated} | |
| """ | |
| if status.is_open: | |
| status_message += f"**Time Until Close:** {status.time_until_close}" | |
| else: | |
| status_message += f"**Time Until Open:** {status.time_until_open}" | |
| return status_message | |
| def _format_market_status_line(status: MarketStatus) -> str: | |
| """Format a single line for market status display""" | |
| status_icon = "🟢" if status.is_open else "🔴" | |
| market_type_icon = { | |
| 'stocks': '📈', | |
| 'futures': '📊', | |
| 'forex': '💱', | |
| 'crypto': '₿', | |
| 'commodities': '🏭' | |
| }.get(status.market_type, '📊') | |
| time_info = "" | |
| if status.is_open: | |
| if status.time_until_close != "N/A (24/7 Market)": | |
| time_info = f" | Closes in: {status.time_until_close}" | |
| else: | |
| if status.time_until_open != "N/A (24/7 Market)": | |
| time_info = f" | Opens in: {status.time_until_open}" | |
| return f"{status_icon} {market_type_icon} **{status.market_name}** ({status.market_symbol}){time_info}\n" | |
| def update_market_status() -> str: | |
| """Function to update market status display (called by Gradio every parameter)""" | |
| return get_market_status_display() | |
| def cleanup_on_exit(): | |
| """Cleanup function to stop market status manager when application exits""" | |
| try: | |
| market_status_manager.stop() | |
| print("Market status manager stopped successfully") | |
| except Exception as e: | |
| print(f"Error stopping market status manager: {str(e)}") | |
| def get_historical_data(symbol: str, timeframe: str = "1d", lookback_days: int = 365) -> pd.DataFrame: | |
| """ | |
| Fetch historical data using yfinance with enhanced support for intraday data. | |
| Uses recommended API methods for better reliability. | |
| Args: | |
| symbol (str): The stock symbol (e.g., 'AAPL') | |
| timeframe (str): The timeframe for data ('1d', '1h', '15m') | |
| lookback_days (int): Number of days to look back | |
| Returns: | |
| pd.DataFrame: Historical data with OHLCV and technical indicators | |
| """ | |
| try: | |
| # Check if market is open for intraday data | |
| if timeframe in ["1h", "15m"] and not is_market_open(): | |
| next_trading_day = get_next_trading_day() | |
| raise Exception(f"Market is currently closed. Next trading day is {next_trading_day.strftime('%Y-%m-%d')}") | |
| # Map timeframe to yfinance interval and adjust lookback period | |
| tf_map = { | |
| "1d": {"interval": "1d", "period": f"{lookback_days}d"}, | |
| "1h": {"interval": "1h", "period": f"{min(lookback_days * 24, 730)}h"}, # Max 730 hours (30 days) | |
| "15m": {"interval": "15m", "period": f"{min(lookback_days * 96, 60)}d"} # Max 60 days for 15m | |
| } | |
| if timeframe not in tf_map: | |
| raise ValueError(f"Unsupported timeframe: {timeframe}. Supported: {list(tf_map.keys())}") | |
| interval_config = tf_map[timeframe] | |
| # Create ticker object | |
| ticker = yf.Ticker(symbol) | |
| # Fetch historical data with retry mechanism | |
| def fetch_history(): | |
| return ticker.history( | |
| period=interval_config["period"], | |
| interval=interval_config["interval"], | |
| prepost=True, | |
| actions=True, | |
| auto_adjust=True, | |
| back_adjust=True, | |
| repair=True | |
| ) | |
| df = retry_yfinance_request(fetch_history) | |
| if df is None or df.empty: | |
| raise Exception(f"No data returned for {symbol}") | |
| # Validate data quality | |
| if len(df) < 10: | |
| raise Exception(f"Insufficient data for {symbol}: only {len(df)} data points") | |
| # Check for missing values in critical columns | |
| critical_columns = ['Open', 'High', 'Low', 'Close', 'Volume'] | |
| missing_data = df[critical_columns].isnull().sum() | |
| if missing_data.sum() > len(df) * 0.1: # More than 10% missing data | |
| print(f"Warning: Significant missing data for {symbol}: {missing_data.to_dict()}") | |
| # Fill any remaining missing values with forward fill then backward fill | |
| df = df.fillna(method='ffill').fillna(method='bfill') | |
| # Calculate returns and volatility | |
| df['Returns'] = df['Close'].pct_change() | |
| df['Volatility'] = df['Returns'].rolling(window=20).std() | |
| # Calculate technical indicators | |
| df = calculate_technical_indicators(df) | |
| print(f"Successfully fetched {len(df)} data points for {symbol} ({timeframe})") | |
| return df | |
| except Exception as e: | |
| print(f"Error fetching historical data for {symbol}: {str(e)}") | |
| raise | |
| def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series: | |
| """Calculate Relative Strength Index""" | |
| # Handle None values by forward filling | |
| prices = prices.ffill().bfill() | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| return 100 - (100 / (1 + rs)) | |
| def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]: | |
| """Calculate MACD and Signal line""" | |
| # Handle None values by forward filling | |
| prices = prices.ffill().bfill() | |
| exp1 = prices.ewm(span=fast, adjust=False).mean() | |
| exp2 = prices.ewm(span=slow, adjust=False).mean() | |
| macd = exp1 - exp2 | |
| signal_line = macd.ewm(span=signal, adjust=False).mean() | |
| return macd, signal_line | |
| def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| """Calculate Bollinger Bands""" | |
| # Handle None values by forward filling | |
| prices = prices.ffill().bfill() | |
| middle_band = prices.rolling(window=period).mean() | |
| std = prices.rolling(window=period).std() | |
| upper_band = middle_band + (std * std_dev) | |
| lower_band = middle_band - (std * std_dev) | |
| return upper_band, middle_band, lower_band | |
| def predict_technical_indicators(df: pd.DataFrame, price_prediction: np.ndarray, | |
| timeframe: str = "1d") -> Dict[str, np.ndarray]: | |
| """ | |
| Predict technical indicators based on price predictions. | |
| Args: | |
| df (pd.DataFrame): Historical data with technical indicators | |
| price_prediction (np.ndarray): Predicted prices | |
| timeframe (str): Data timeframe for scaling | |
| Returns: | |
| Dict[str, np.ndarray]: Predicted technical indicators | |
| """ | |
| try: | |
| predictions = {} | |
| # Get last values for initialization | |
| last_rsi = df['RSI'].iloc[-1] | |
| last_macd = df['MACD'].iloc[-1] | |
| last_macd_signal = df['MACD_Signal'].iloc[-1] | |
| last_bb_upper = df['BB_Upper'].iloc[-1] if 'BB_Upper' in df.columns else None | |
| last_bb_middle = df['BB_Middle'].iloc[-1] if 'BB_Middle' in df.columns else None | |
| last_bb_lower = df['BB_Lower'].iloc[-1] if 'BB_Lower' in df.columns else None | |
| # Predict RSI | |
| rsi_predictions = [] | |
| for i, pred_price in enumerate(price_prediction): | |
| # RSI tends to mean-revert, so we use a simple mean reversion model | |
| if last_rsi > 70: # Overbought | |
| rsi_change = -0.5 * (i + 1) # Gradual decline | |
| elif last_rsi < 30: # Oversold | |
| rsi_change = 0.5 * (i + 1) # Gradual rise | |
| else: # Neutral zone | |
| rsi_change = 0.1 * np.random.normal(0, 1) # Small random change | |
| predicted_rsi = max(0, min(100, last_rsi + rsi_change)) | |
| rsi_predictions.append(predicted_rsi) | |
| predictions['RSI'] = np.array(rsi_predictions) | |
| # Predict MACD | |
| macd_predictions = [] | |
| macd_signal_predictions = [] | |
| # MACD follows price momentum | |
| price_changes = np.diff(price_prediction, prepend=price_prediction[0]) | |
| for i, price_change in enumerate(price_changes): | |
| # MACD change based on price momentum | |
| macd_change = price_change / price_prediction[i] * 100 # Scale to MACD range | |
| predicted_macd = last_macd + macd_change * 0.1 # Dampened change | |
| # MACD signal line (slower moving average of MACD) | |
| signal_change = macd_change * 0.05 # Even slower | |
| predicted_signal = last_macd_signal + signal_change | |
| macd_predictions.append(predicted_macd) | |
| macd_signal_predictions.append(predicted_signal) | |
| predictions['MACD'] = np.array(macd_predictions) | |
| predictions['MACD_Signal'] = np.array(macd_signal_predictions) | |
| # Predict Bollinger Bands if available | |
| if all(x is not None for x in [last_bb_upper, last_bb_middle, last_bb_lower]): | |
| bb_upper_predictions = [] | |
| bb_middle_predictions = [] | |
| bb_lower_predictions = [] | |
| # Calculate rolling statistics for Bollinger Bands | |
| window_size = 20 | |
| for i in range(len(price_prediction)): | |
| if i < window_size: | |
| # Use historical data + predictions for calculation | |
| window_prices = np.concatenate([df['Close'].values[-window_size+i:], price_prediction[:i+1]]) | |
| else: | |
| # Use only predictions | |
| window_prices = price_prediction[i-window_size+1:i+1] | |
| # Calculate Bollinger Bands for this window | |
| window_mean = np.mean(window_prices) | |
| window_std = np.std(window_prices) | |
| bb_middle = window_mean | |
| bb_upper = window_mean + (window_std * 2) | |
| bb_lower = window_mean - (window_std * 2) | |
| bb_upper_predictions.append(bb_upper) | |
| bb_middle_predictions.append(bb_middle) | |
| bb_lower_predictions.append(bb_lower) | |
| predictions['BB_Upper'] = np.array(bb_upper_predictions) | |
| predictions['BB_Middle'] = np.array(bb_middle_predictions) | |
| predictions['BB_Lower'] = np.array(bb_lower_predictions) | |
| # Predict moving averages | |
| sma_predictions = {} | |
| for period in [20, 50, 200]: | |
| if f'SMA_{period}' in df.columns: | |
| last_sma = df[f'SMA_{period}'].iloc[-1] | |
| sma_predictions[f'SMA_{period}'] = [] | |
| for i in range(len(price_prediction)): | |
| # Simple moving average prediction | |
| if i < period: | |
| # Use historical data + predictions | |
| window_prices = np.concatenate([df['Close'].values[-period+i+1:], price_prediction[:i+1]]) | |
| else: | |
| # Use only predictions | |
| window_prices = price_prediction[i-period+1:i+1] | |
| predicted_sma = np.mean(window_prices) | |
| sma_predictions[f'SMA_{period}'].append(predicted_sma) | |
| predictions[f'SMA_{period}'] = np.array(sma_predictions[f'SMA_{period}']) | |
| return predictions | |
| except Exception as e: | |
| print(f"Error predicting technical indicators: {str(e)}") | |
| # Return empty predictions on error | |
| return {} | |
| def calculate_technical_uncertainty(df: pd.DataFrame, indicator_predictions: Dict[str, np.ndarray], | |
| price_prediction: np.ndarray) -> Dict[str, np.ndarray]: | |
| """ | |
| Calculate uncertainty for technical indicator predictions. | |
| Args: | |
| df (pd.DataFrame): Historical data | |
| indicator_predictions (Dict[str, np.ndarray]): Predicted indicators | |
| price_prediction (np.ndarray): Predicted prices | |
| Returns: | |
| Dict[str, np.ndarray]: Uncertainty estimates for each indicator | |
| """ | |
| try: | |
| uncertainties = {} | |
| # Calculate historical volatility of each indicator | |
| for indicator_name, predictions in indicator_predictions.items(): | |
| if indicator_name in df.columns: | |
| # Calculate historical volatility | |
| historical_values = df[indicator_name].dropna() | |
| if len(historical_values) > 10: | |
| volatility = historical_values.std() | |
| # Scale uncertainty by prediction horizon | |
| indicator_uncertainty = [] | |
| for i in range(len(predictions)): | |
| # Uncertainty increases with prediction horizon | |
| uncertainty = volatility * np.sqrt(i + 1) | |
| indicator_uncertainty.append(uncertainty) | |
| uncertainties[indicator_name] = np.array(indicator_uncertainty) | |
| else: | |
| # Use a default uncertainty if insufficient data | |
| uncertainties[indicator_name] = np.array([0.1 * abs(predictions[0])] * len(predictions)) | |
| else: | |
| # For new indicators, use a percentage of the prediction value | |
| uncertainties[indicator_name] = np.array([0.05 * abs(pred) for pred in predictions]) | |
| return uncertainties | |
| except Exception as e: | |
| print(f"Error calculating technical uncertainty: {str(e)}") | |
| return {} | |
| def make_prediction_enhanced(symbol: str, timeframe: str = "1d", prediction_days: int = 5, strategy: str = "chronos", | |
| use_ensemble: bool = True, use_regime_detection: bool = True, use_stress_testing: bool = True, | |
| risk_free_rate: float = 0.02, ensemble_weights: Dict = None, | |
| market_index: str = "^GSPC", use_covariates: bool = True, use_sentiment: bool = True, | |
| random_real_points: int = 4, use_smoothing: bool = True, | |
| smoothing_type: str = "exponential", smoothing_window: int = 5, | |
| smoothing_alpha: float = 0.3, | |
| high_accuracy: bool = True, accuracy_boost_level: int = 3) -> Tuple[Dict, go.Figure]: | |
| """ | |
| Enhanced prediction using Chronos with covariate data, advanced uncertainty calculations, and improved algorithms. | |
| Args: | |
| symbol (str): Stock symbol | |
| timeframe (str): Data timeframe ('1d', '1h', '15m') | |
| prediction_days (int): Number of days to predict | |
| strategy (str): Prediction strategy to use | |
| use_ensemble (bool): Whether to use ensemble methods | |
| use_regime_detection (bool): Whether to use regime detection | |
| use_stress_testing (bool): Whether to perform stress testing | |
| risk_free_rate (float): Risk-free rate for calculations | |
| ensemble_weights (Dict): Weights for ensemble models | |
| market_index (str): Market index for correlation analysis | |
| use_covariates (bool): Whether to use covariate data | |
| use_sentiment (bool): Whether to use sentiment analysis | |
| random_real_points (int): Number of random real points to include in long-horizon context | |
| use_smoothing (bool): Whether to apply smoothing to predictions | |
| smoothing_type (str): Type of smoothing to apply ('exponential', 'moving_average', 'kalman', 'savitzky_golay', 'none') | |
| Returns: | |
| Tuple[Dict, go.Figure]: Trading signals and visualization plot | |
| """ | |
| try: | |
| # Get historical data | |
| df = get_historical_data(symbol, timeframe) | |
| # Initialize variables that might not be set in all strategy paths | |
| advanced_uncertainties = {} | |
| volume_pred = None | |
| volume_uncertainty = None | |
| technical_predictions = {} | |
| technical_uncertainties = {} | |
| # Collect enhanced covariate data | |
| covariate_data = {} | |
| market_conditions = {} | |
| if use_covariates: | |
| print("Collecting enhanced covariate data...") | |
| covariate_data = get_enhanced_covariate_data(symbol, timeframe, 365) | |
| # Extract market conditions from covariate data | |
| if 'economic_indicators' in covariate_data: | |
| vix_data = covariate_data['economic_indicators'].get('volatility', None) | |
| if vix_data is not None and len(vix_data) > 0: | |
| market_conditions['vix'] = vix_data.iloc[-1] | |
| # Calculate market sentiment | |
| sentiment_data = {} | |
| if use_sentiment: | |
| print("Calculating market sentiment...") | |
| sentiment_data = calculate_market_sentiment(symbol, 30) | |
| # Detect market regime | |
| regime_info = {} | |
| if use_regime_detection: | |
| print("Detecting market regime...") | |
| returns = df['Close'].pct_change().dropna() | |
| regime_info = detect_market_regime(returns) | |
| if strategy == "chronos": | |
| try: | |
| # Prepare data for Chronos | |
| prices = df['Close'].values | |
| chronos_context_size = 64 # Chronos model's context window size (fixed at 64) | |
| input_context_size = len(prices) # Available input data can be much larger | |
| # Use a larger range for scaler fitting to get better normalization | |
| scaler_range = min(input_context_size, chronos_context_size * 2) # Use up to 128 points for scaler | |
| # Select the most recent chronos_context_size points for the model input | |
| context_window = prices[-chronos_context_size:] | |
| scaler = MinMaxScaler(feature_range=(-1, 1)) | |
| # Fit scaler on a larger range for better normalization | |
| scaler.fit(prices[-scaler_range:].reshape(-1, 1)) | |
| normalized_prices = scaler.transform(context_window.reshape(-1, 1)).flatten() | |
| # Ensure we have enough data points for Chronos | |
| min_data_points = chronos_context_size | |
| if len(normalized_prices) < min_data_points: | |
| padding = np.full(min_data_points - len(normalized_prices), normalized_prices[-1]) | |
| normalized_prices = np.concatenate([padding, normalized_prices]) | |
| elif len(normalized_prices) > min_data_points: | |
| normalized_prices = normalized_prices[-min_data_points:] | |
| # Load pipeline and move to CPU | |
| pipe = load_pipeline() | |
| # Get the model's device and dtype | |
| device = torch.device("cpu") # Use CPU device | |
| dtype = torch.float32 # Use float32 for CPU | |
| print(f"Model device: {device}") | |
| print(f"Model dtype: {dtype}") | |
| # Convert to tensor and ensure proper shape and device | |
| context = torch.tensor(normalized_prices, dtype=dtype, device=device) | |
| # Validate context data | |
| if torch.isnan(context).any() or torch.isinf(context).any(): | |
| print("Warning: Context contains NaN or Inf values, replacing with zeros") | |
| context = torch.nan_to_num(context, nan=0.0, posinf=0.0, neginf=0.0) | |
| # Ensure context is finite and reasonable | |
| if torch.abs(context).max() > 1000: | |
| print("Warning: Context values are very large, normalizing") | |
| context = torch.clamp(context, -1000, 1000) | |
| print(f"Context validation - Shape: {context.shape}, Min: {context.min():.4f}, Max: {context.max():.4f}, Mean: {context.mean():.4f}") | |
| # Adjust prediction length based on timeframe | |
| if timeframe == "1d": | |
| max_prediction_length = chronos_context_size # 64 days | |
| actual_prediction_length = min(prediction_days, max_prediction_length) | |
| trim_length = prediction_days | |
| elif timeframe == "1h": | |
| max_prediction_length = chronos_context_size # 64 hours | |
| actual_prediction_length = min(prediction_days * 24, max_prediction_length) | |
| trim_length = prediction_days * 24 | |
| else: # 15m | |
| max_prediction_length = chronos_context_size # 64 intervals | |
| actual_prediction_length = min(prediction_days * 96, max_prediction_length) | |
| trim_length = prediction_days * 96 | |
| # Ensure prediction length is valid (must be positive and reasonable) | |
| actual_prediction_length = max(1, min(actual_prediction_length, 64)) | |
| print(f"Prediction length: {actual_prediction_length}") | |
| print(f"Context length: {len(context)}") | |
| # Use predict_quantiles with proper formatting | |
| with torch.amp.autocast('cpu'): | |
| # Ensure inputs and pipeline are on CPU | |
| if len(context.shape) == 1: | |
| context = context.unsqueeze(0) | |
| context = context.to(device) | |
| _ensure_pipeline_cpu(pipe) | |
| # Fix generation configuration to prevent min_length errors | |
| if hasattr(pipe.model, 'config'): | |
| gen_cfg = getattr(pipe.model.config, 'generation_config', None) | |
| if gen_cfg is None: | |
| pipe.model.config.generation_config = GenerationConfig( | |
| min_length=0, max_length=512, do_sample=False, num_beams=1 | |
| ) | |
| else: | |
| gen_cfg.min_length = 0 | |
| gen_cfg.max_length = 512 | |
| gen_cfg.do_sample = False | |
| gen_cfg.num_beams = 1 | |
| # Make prediction with proper parameters | |
| try: | |
| quantiles, mean = pipe.predict_quantiles( | |
| context=context, | |
| prediction_length=actual_prediction_length, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| except Exception as prediction_error: | |
| print(f"Chronos prediction failed: {str(prediction_error)}") | |
| print(f"Context shape: {context.shape}") | |
| print(f"Context dtype: {context.dtype}") | |
| print(f"Context device: {context.device}") | |
| print(f"Prediction length: {actual_prediction_length}") | |
| print(f"Model device: {next(pipe.model.parameters()).device}") | |
| print(f"Model dtype: {next(pipe.model.parameters()).dtype}") | |
| # Try with a smaller prediction length as fallback | |
| if actual_prediction_length > 1: | |
| print(f"Retrying with prediction length 1...") | |
| actual_prediction_length = 1 | |
| quantiles, mean = pipe.predict_quantiles( | |
| context=context, | |
| prediction_length=actual_prediction_length, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| else: | |
| raise prediction_error | |
| if quantiles is None or mean is None or len(quantiles) == 0 or len(mean) == 0: | |
| raise ValueError("Chronos returned empty prediction") | |
| print(f"Quantiles shape: {quantiles.shape}, Mean shape: {mean.shape}") | |
| # Convert to numpy arrays | |
| quantiles = quantiles.detach().cpu().numpy() | |
| mean = mean.detach().cpu().numpy() | |
| # Denormalize predictions using the same scaler as context | |
| mean_pred = scaler.inverse_transform(mean.reshape(-1, 1)).flatten() | |
| lower_bound = scaler.inverse_transform(quantiles[0, :, 0].reshape(-1, 1)).flatten() | |
| upper_bound = scaler.inverse_transform(quantiles[0, :, 2].reshape(-1, 1)).flatten() | |
| # Calculate uncertainty using advanced methods | |
| historical_volatility = df['Volatility'].iloc[-1] | |
| advanced_uncertainties = calculate_advanced_uncertainty( | |
| quantiles, historical_volatility, market_conditions | |
| ) | |
| std_pred = advanced_uncertainties.get('ensemble', | |
| (upper_bound - lower_bound) / (2 * 1.645)) | |
| # High-accuracy bagging: run multiple jittered contexts and average | |
| if high_accuracy and accuracy_boost_level and accuracy_boost_level > 0: | |
| try: | |
| runs_map = {1: 3, 2: 6, 3: 10} | |
| num_runs = runs_map.get(int(accuracy_boost_level), 3) | |
| bagged_means = [mean_pred] | |
| bagged_stds = [std_pred] | |
| for _ in range(num_runs - 1): | |
| # Create a slightly jittered context to diversify predictions | |
| jitter = torch.randn_like(context) * 0.01 | |
| context_j = (context + jitter).to(device) | |
| if len(context_j.shape) == 1: | |
| context_j = context_j.unsqueeze(0) | |
| q_j, m_j = pipe.predict_quantiles( | |
| context=context_j, | |
| prediction_length=actual_prediction_length, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| q_j = q_j.detach().cpu().numpy() | |
| m_j = m_j.detach().cpu().numpy() | |
| mean_pred_j = scaler.inverse_transform(m_j.reshape(-1, 1)).flatten() | |
| lb_j = scaler.inverse_transform(q_j[0, :, 0].reshape(-1, 1)).flatten() | |
| ub_j = scaler.inverse_transform(q_j[0, :, 2].reshape(-1, 1)).flatten() | |
| std_pred_j = (ub_j - lb_j) / (2 * 1.645) | |
| bagged_means.append(mean_pred_j) | |
| bagged_stds.append(std_pred_j) | |
| # Average across runs | |
| mean_pred = np.mean(np.stack(bagged_means, axis=0), axis=0) | |
| std_pred = np.mean(np.stack(bagged_stds, axis=0), axis=0) | |
| print(f"High-accuracy bagging applied with {num_runs} runs") | |
| except Exception as bag_err: | |
| print(f"High-accuracy bagging failed: {str(bag_err)} - proceeding with base prediction") | |
| # Apply continuity correction | |
| last_actual = df['Close'].iloc[-1] | |
| first_pred = mean_pred[0] | |
| discontinuity_threshold = max(1e-6, 0.02 * abs(last_actual)) # 2% threshold | |
| if abs(first_pred - last_actual) > discontinuity_threshold: | |
| print(f"Warning: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})") | |
| print(f"Discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)") | |
| # Apply improved continuity correction | |
| if len(mean_pred) > 1: | |
| # Calculate the overall trend from the original predictions | |
| original_trend = mean_pred[-1] - first_pred | |
| total_steps = len(mean_pred) - 1 | |
| # Calculate the desired trend per step to reach the final prediction | |
| if total_steps > 0: | |
| trend_per_step = original_trend / total_steps | |
| else: | |
| trend_per_step = 0 | |
| # Apply smooth transition starting from last actual | |
| # Use a gradual transition that preserves the overall trend | |
| transition_length = min(5, len(mean_pred)) # Longer transition for smoother curve | |
| for i in range(transition_length): | |
| if i == 0: | |
| # First prediction should be very close to last actual | |
| mean_pred[i] = last_actual + trend_per_step * 0.1 | |
| else: | |
| # Gradually increase the trend contribution | |
| transition_factor = min(1.0, i / transition_length) | |
| trend_contribution = trend_per_step * i * transition_factor | |
| mean_pred[i] = last_actual + trend_contribution | |
| # For remaining predictions, maintain the original relative differences | |
| if len(mean_pred) > transition_length: | |
| # Calculate the scale factor to maintain relative relationships | |
| original_diff = mean_pred[transition_length] - mean_pred[transition_length-1] | |
| if original_diff != 0: | |
| # Scale the remaining predictions to maintain continuity | |
| for i in range(transition_length, len(mean_pred)): | |
| if i == transition_length: | |
| # Ensure smooth transition at the boundary | |
| mean_pred[i] = mean_pred[i-1] + original_diff * 0.5 | |
| else: | |
| # Maintain the original relative differences | |
| original_diff_i = mean_pred[i] - mean_pred[i-1] | |
| mean_pred[i] = mean_pred[i-1] + original_diff_i | |
| print(f"Applied continuity correction: First prediction adjusted from {first_pred:.4f} to {mean_pred[0]:.4f}") | |
| # Apply financial smoothing if enabled | |
| if use_smoothing: | |
| mean_pred = apply_financial_smoothing(mean_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing) | |
| else: | |
| # Single prediction case - set to last actual | |
| mean_pred[0] = last_actual | |
| print(f"Single prediction case: Set to last actual value {last_actual:.4f}") | |
| # If we had to limit the prediction length, extend the prediction recursively | |
| if actual_prediction_length < trim_length: | |
| extended_mean_pred = mean_pred.copy() | |
| extended_std_pred = std_pred.copy() | |
| # Store the original scaler for consistency | |
| original_scaler = scaler | |
| # Calculate the number of extension steps needed | |
| remaining_steps = trim_length - actual_prediction_length | |
| steps_needed = (remaining_steps + actual_prediction_length - 1) // actual_prediction_length | |
| for step in range(steps_needed): | |
| # Use all available datapoints for context, including predictions | |
| # This allows the model to build upon its own predictions for better long-horizon forecasting | |
| all_available_data = np.concatenate([prices, extended_mean_pred]) | |
| # If we have more data than chronos_context_size, use the most recent chronos_context_size points | |
| # Otherwise, use all available data (this allows for longer context when available) | |
| if len(all_available_data) > chronos_context_size: | |
| context_window = all_available_data[-chronos_context_size:] | |
| else: | |
| context_window = all_available_data | |
| # Use the original scaler to maintain consistency - fit on historical data only | |
| # but transform the combined context window | |
| normalized_context = original_scaler.transform(context_window.reshape(-1, 1)).flatten() | |
| context = torch.tensor(normalized_context, dtype=dtype, device=device) | |
| if len(context.shape) == 1: | |
| context = context.unsqueeze(0) | |
| next_length = min(max_prediction_length, remaining_steps) | |
| # Ensure next_length is valid (must be positive and reasonable) | |
| next_length = max(1, min(next_length, 64)) | |
| with torch.amp.autocast('cpu'): | |
| try: | |
| next_quantiles, next_mean = pipe.predict_quantiles( | |
| context=context, | |
| prediction_length=next_length, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| except Exception as extension_error: | |
| print(f"Chronos extension prediction failed: {str(extension_error)}") | |
| print(f"Extension context shape: {context.shape}") | |
| print(f"Extension prediction length: {next_length}") | |
| # Try with a smaller prediction length as fallback | |
| if next_length > 1: | |
| print(f"Retrying extension with prediction length 1...") | |
| next_length = 1 | |
| next_quantiles, next_mean = pipe.predict_quantiles( | |
| context=context, | |
| prediction_length=next_length, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| else: | |
| raise extension_error | |
| # Convert predictions to numpy and denormalize using original scaler | |
| next_mean = next_mean.detach().cpu().numpy() | |
| next_quantiles = next_quantiles.detach().cpu().numpy() | |
| # Denormalize predictions using the original scaler | |
| next_mean_pred = original_scaler.inverse_transform(next_mean.reshape(-1, 1)).flatten() | |
| # Calculate uncertainty for extended predictions | |
| next_uncertainties = calculate_advanced_uncertainty( | |
| next_quantiles, historical_volatility, market_conditions | |
| ) | |
| next_std_pred = next_uncertainties.get('ensemble', | |
| (next_quantiles[0, :, 2] - next_quantiles[0, :, 0]) / (2 * 1.645)) | |
| # Check for discontinuity and apply continuity correction | |
| if abs(next_mean_pred[0] - extended_mean_pred[-1]) > max(1e-6, 0.02 * abs(extended_mean_pred[-1])): | |
| print(f"Warning: Discontinuity detected between last prediction ({extended_mean_pred[-1]:.4f}) and next prediction ({next_mean_pred[0]:.4f})") | |
| print(f"Extension discontinuity magnitude: {abs(next_mean_pred[0] - extended_mean_pred[-1]):.4f}") | |
| # Apply improved continuity correction for extensions | |
| if len(next_mean_pred) > 1: | |
| # Calculate the overall trend from the original predictions | |
| original_trend = next_mean_pred[-1] - next_mean_pred[0] | |
| total_steps = len(next_mean_pred) - 1 | |
| # Calculate the desired trend per step | |
| if total_steps > 0: | |
| trend_per_step = original_trend / total_steps | |
| else: | |
| trend_per_step = 0 | |
| # Apply smooth transition starting from last extended prediction | |
| transition_length = min(5, len(next_mean_pred)) # Longer transition for smoother curve | |
| for i in range(transition_length): | |
| if i == 0: | |
| # First prediction should be very close to last extended prediction | |
| next_mean_pred[i] = extended_mean_pred[-1] + trend_per_step * 0.1 | |
| else: | |
| # Gradually increase the trend contribution | |
| transition_factor = min(1.0, i / transition_length) | |
| trend_contribution = trend_per_step * i * transition_factor | |
| next_mean_pred[i] = extended_mean_pred[-1] + trend_contribution | |
| # For remaining predictions, maintain the original relative differences | |
| if len(next_mean_pred) > transition_length: | |
| # Calculate the scale factor to maintain relative relationships | |
| original_diff = next_mean_pred[transition_length] - next_mean_pred[transition_length-1] | |
| if original_diff != 0: | |
| # Scale the remaining predictions to maintain continuity | |
| for i in range(transition_length, len(next_mean_pred)): | |
| if i == transition_length: | |
| # Ensure smooth transition at the boundary | |
| next_mean_pred[i] = next_mean_pred[i-1] + original_diff * 0.5 | |
| else: | |
| # Maintain the original relative differences | |
| original_diff_i = next_mean_pred[i] - next_mean_pred[i-1] | |
| next_mean_pred[i] = next_mean_pred[i-1] + original_diff_i | |
| print(f"Applied extension continuity correction: First extension prediction adjusted from {next_mean_pred[0]:.4f} to {next_mean_pred[0]:.4f}") | |
| else: | |
| # Single prediction case - set to last extended prediction | |
| next_mean_pred[0] = extended_mean_pred[-1] | |
| print(f"Single extension prediction case: Set to last extended prediction value {extended_mean_pred[-1]:.4f}") | |
| # Apply financial smoothing if enabled | |
| if use_smoothing and len(next_mean_pred) > 1: | |
| next_mean_pred = apply_financial_smoothing(next_mean_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing) | |
| # Append predictions | |
| extended_mean_pred = np.concatenate([extended_mean_pred, next_mean_pred]) | |
| extended_std_pred = np.concatenate([extended_std_pred, next_std_pred]) | |
| remaining_steps -= len(next_mean_pred) | |
| if remaining_steps <= 0: | |
| break | |
| # Trim to exact prediction length if needed | |
| mean_pred = extended_mean_pred[:trim_length] | |
| std_pred = extended_std_pred[:trim_length] | |
| # Enhanced volume prediction | |
| volume_pred, volume_uncertainty = calculate_volume_prediction_enhanced( | |
| df, mean_pred, covariate_data | |
| ) | |
| # Ensure volume prediction is properly handled | |
| if volume_pred is None or len(volume_pred) == 0: | |
| print("Warning: Volume prediction failed, using fallback") | |
| volume_pred = np.full(len(mean_pred), df['Volume'].iloc[-1]) | |
| volume_uncertainty = np.full(len(mean_pred), df['Volume'].iloc[-1] * 0.2) | |
| elif len(volume_pred) != len(mean_pred): | |
| print(f"Warning: Volume prediction length mismatch. Expected {len(mean_pred)}, got {len(volume_pred)}") | |
| # Pad or truncate to match | |
| if len(volume_pred) < len(mean_pred): | |
| last_vol = volume_pred[-1] if len(volume_pred) > 0 else df['Volume'].iloc[-1] | |
| volume_pred = np.pad(volume_pred, (0, len(mean_pred) - len(volume_pred)), | |
| mode='constant', constant_values=last_vol) | |
| volume_uncertainty = np.pad(volume_uncertainty, (0, len(mean_pred) - len(volume_uncertainty)), | |
| mode='constant', constant_values=volume_uncertainty[-1] if len(volume_uncertainty) > 0 else df['Volume'].iloc[-1] * 0.2) | |
| else: | |
| volume_pred = volume_pred[:len(mean_pred)] | |
| volume_uncertainty = volume_uncertainty[:len(mean_pred)] | |
| # Predict technical indicators | |
| print("Predicting technical indicators...") | |
| technical_predictions = predict_technical_indicators(df, mean_pred, timeframe) | |
| technical_uncertainties = calculate_technical_uncertainty(df, technical_predictions, mean_pred) | |
| # Create ensemble prediction if enabled | |
| ensemble_pred = np.array([]) | |
| ensemble_uncertainty = np.array([]) | |
| if use_ensemble and covariate_data: | |
| print("Creating enhanced ensemble model...") | |
| ensemble_pred, ensemble_uncertainty = create_enhanced_ensemble_model( | |
| df, covariate_data, trim_length | |
| ) | |
| # Combine Chronos and ensemble predictions | |
| if len(ensemble_pred) > 0: | |
| # Weighted combination | |
| chronos_weight = 0.7 | |
| ensemble_weight = 0.3 | |
| final_pred = chronos_weight * mean_pred + ensemble_weight * ensemble_pred | |
| final_uncertainty = np.sqrt( | |
| (chronos_weight * std_pred)**2 + (ensemble_weight * ensemble_uncertainty)**2 | |
| ) | |
| else: | |
| final_pred = mean_pred | |
| final_uncertainty = std_pred | |
| # Final continuity validation and correction | |
| print("Performing final continuity validation...") | |
| last_actual = df['Close'].iloc[-1] | |
| first_pred = final_pred[0] | |
| discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check | |
| if abs(first_pred - last_actual) > discontinuity_threshold: | |
| print(f"Final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})") | |
| print(f"Final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)") | |
| # Apply final continuity correction | |
| if len(final_pred) > 1: | |
| # Calculate the overall trend | |
| overall_trend = final_pred[-1] - first_pred | |
| total_steps = len(final_pred) - 1 | |
| if total_steps > 0: | |
| trend_per_step = overall_trend / total_steps | |
| else: | |
| trend_per_step = 0 | |
| # Apply very smooth transition | |
| transition_length = min(8, len(final_pred)) # Longer transition for final correction | |
| for i in range(transition_length): | |
| if i == 0: | |
| # First prediction should be extremely close to last actual | |
| final_pred[i] = last_actual + trend_per_step * 0.05 | |
| else: | |
| # Very gradual transition | |
| transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing | |
| trend_contribution = trend_per_step * i * transition_factor | |
| final_pred[i] = last_actual + trend_contribution | |
| # Maintain relative relationships for remaining predictions | |
| if len(final_pred) > transition_length: | |
| for i in range(transition_length, len(final_pred)): | |
| if i == transition_length: | |
| # Smooth transition at boundary | |
| final_pred[i] = final_pred[i-1] + trend_per_step * 0.8 | |
| else: | |
| # Maintain original relative differences | |
| original_diff = final_pred[i] - final_pred[i-1] | |
| final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened | |
| print(f"Final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}") | |
| # Apply additional smoothing for final correction | |
| if use_smoothing: | |
| final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha * 0.5, 3, use_smoothing) | |
| else: | |
| # Single prediction case | |
| final_pred[0] = last_actual | |
| print(f"Final single prediction correction: Set to last actual value {last_actual:.4f}") | |
| # Verify final continuity | |
| final_first_pred = final_pred[0] | |
| final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100 | |
| print(f"Final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)") | |
| if final_discontinuity <= 1.0: | |
| print("✓ Continuity validation passed - predictions are smooth") | |
| else: | |
| print(f"⚠ Continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains") | |
| except Exception as e: | |
| print(f"Chronos prediction error: {str(e)}") | |
| raise | |
| elif strategy == "technical": | |
| # Technical analysis fallback strategy | |
| print("Using technical analysis strategy...") | |
| try: | |
| # Use the same MinMaxScaler for consistency | |
| prices = df['Close'].values | |
| scaler = MinMaxScaler(feature_range=(-1, 1)) | |
| scaler.fit(prices.reshape(-1, 1)) | |
| # Calculate technical indicators for prediction | |
| last_price = df['Close'].iloc[-1] | |
| last_rsi = df['RSI'].iloc[-1] | |
| last_macd = df['MACD'].iloc[-1] | |
| last_macd_signal = df['MACD_Signal'].iloc[-1] | |
| last_volatility = df['Volatility'].iloc[-1] | |
| # Get trend direction from technical indicators | |
| rsi_trend = 1 if last_rsi > 50 else -1 | |
| macd_trend = 1 if last_macd > last_macd_signal else -1 | |
| # Calculate momentum and mean reversion factors | |
| sma_20 = df['SMA_20'].iloc[-1] if 'SMA_20' in df.columns else last_price | |
| sma_50 = df['SMA_50'].iloc[-1] if 'SMA_50' in df.columns else last_price | |
| sma_200 = df['SMA_200'].iloc[-1] if 'SMA_200' in df.columns else last_price | |
| # Mean reversion factor (distance from long-term average) | |
| mean_reversion = (sma_200 - last_price) / last_price | |
| # Momentum factor (short vs long-term trend) | |
| momentum = (sma_20 - sma_50) / sma_50 | |
| # Combine technical signals | |
| technical_score = (rsi_trend * 0.3 + macd_trend * 0.3 + | |
| np.sign(momentum) * 0.2 + np.sign(mean_reversion) * 0.2) | |
| # Generate price predictions | |
| final_pred = [] | |
| final_uncertainty = [] | |
| for i in range(1, prediction_days + 1): | |
| # Base prediction with trend and mean reversion | |
| trend_factor = technical_score * last_volatility * 0.5 | |
| mean_reversion_factor = mean_reversion * 0.1 * i | |
| momentum_factor = momentum * 0.05 * i | |
| # Combine factors with decay | |
| prediction = last_price * (1 + trend_factor + mean_reversion_factor + momentum_factor) | |
| final_pred.append(prediction) | |
| # Uncertainty based on volatility and prediction horizon | |
| uncertainty = last_volatility * last_price * np.sqrt(i) | |
| final_uncertainty.append(uncertainty) | |
| final_pred = np.array(final_pred) | |
| final_uncertainty = np.array(final_uncertainty) | |
| # Apply smoothing if enabled | |
| if use_smoothing: | |
| final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing) | |
| # Enhanced volume prediction | |
| volume_pred, volume_uncertainty = calculate_volume_prediction_enhanced( | |
| df, final_pred, covariate_data | |
| ) | |
| # Ensure volume prediction is properly handled | |
| if volume_pred is None or len(volume_pred) == 0: | |
| print("Warning: Volume prediction failed, using fallback") | |
| volume_pred = np.full(len(final_pred), df['Volume'].iloc[-1]) | |
| volume_uncertainty = np.full(len(final_pred), df['Volume'].iloc[-1] * 0.2) | |
| elif len(volume_pred) != len(final_pred): | |
| print(f"Warning: Volume prediction length mismatch. Expected {len(final_pred)}, got {len(volume_pred)}") | |
| # Pad or truncate to match | |
| if len(volume_pred) < len(final_pred): | |
| last_vol = volume_pred[-1] if len(volume_pred) > 0 else df['Volume'].iloc[-1] | |
| volume_pred = np.pad(volume_pred, (0, len(final_pred) - len(volume_pred)), | |
| mode='constant', constant_values=last_vol) | |
| volume_uncertainty = np.pad(volume_uncertainty, (0, len(final_pred) - len(volume_uncertainty)), | |
| mode='constant', constant_values=volume_uncertainty[-1] if len(volume_uncertainty) > 0 else df['Volume'].iloc[-1] * 0.2) | |
| else: | |
| volume_pred = volume_pred[:len(final_pred)] | |
| volume_uncertainty = volume_uncertainty[:len(final_pred)] | |
| # Predict technical indicators | |
| print("Predicting technical indicators for technical strategy...") | |
| technical_predictions = predict_technical_indicators(df, final_pred, timeframe) | |
| technical_uncertainties = calculate_technical_uncertainty(df, technical_predictions, final_pred) | |
| # Create ensemble prediction if enabled | |
| ensemble_pred = np.array([]) | |
| ensemble_uncertainty = np.array([]) | |
| if use_ensemble and covariate_data: | |
| print("Creating enhanced ensemble model for technical strategy...") | |
| ensemble_pred, ensemble_uncertainty = create_enhanced_ensemble_model( | |
| df, covariate_data, prediction_days | |
| ) | |
| # Combine technical and ensemble predictions | |
| if len(ensemble_pred) > 0: | |
| technical_weight = 0.7 | |
| ensemble_weight = 0.3 | |
| final_pred = technical_weight * final_pred + ensemble_weight * ensemble_pred | |
| final_uncertainty = np.sqrt( | |
| (technical_weight * final_uncertainty)**2 + (ensemble_weight * ensemble_uncertainty)**2 | |
| ) | |
| # Calculate advanced uncertainties for technical strategy | |
| historical_volatility = df['Volatility'].iloc[-1] | |
| # Create dummy quantiles for technical strategy (since we don't have quantiles from Chronos) | |
| dummy_quantiles = np.array([[ | |
| [final_pred[i] - 2 * final_uncertainty[i], final_pred[i], final_pred[i] + 2 * final_uncertainty[i]] | |
| for i in range(len(final_pred)) | |
| ]]) | |
| advanced_uncertainties = calculate_advanced_uncertainty( | |
| dummy_quantiles, historical_volatility, market_conditions | |
| ) | |
| print(f"Technical strategy completed: {len(final_pred)} predictions generated") | |
| # Final continuity validation for technical strategy | |
| print("Performing final continuity validation for technical strategy...") | |
| last_actual = df['Close'].iloc[-1] | |
| first_pred = final_pred[0] | |
| discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check | |
| if abs(first_pred - last_actual) > discontinuity_threshold: | |
| print(f"Technical strategy final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})") | |
| print(f"Technical strategy final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)") | |
| # Apply final continuity correction for technical strategy | |
| if len(final_pred) > 1: | |
| # Calculate the overall trend | |
| overall_trend = final_pred[-1] - first_pred | |
| total_steps = len(final_pred) - 1 | |
| if total_steps > 0: | |
| trend_per_step = overall_trend / total_steps | |
| else: | |
| trend_per_step = 0 | |
| # Apply very smooth transition | |
| transition_length = min(8, len(final_pred)) # Longer transition for final correction | |
| for i in range(transition_length): | |
| if i == 0: | |
| # First prediction should be extremely close to last actual | |
| final_pred[i] = last_actual + trend_per_step * 0.05 | |
| else: | |
| # Very gradual transition | |
| transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing | |
| trend_contribution = trend_per_step * i * transition_factor | |
| final_pred[i] = last_actual + trend_contribution | |
| # Maintain relative relationships for remaining predictions | |
| if len(final_pred) > transition_length: | |
| for i in range(transition_length, len(final_pred)): | |
| if i == transition_length: | |
| # Smooth transition at boundary | |
| final_pred[i] = final_pred[i-1] + trend_per_step * 0.8 | |
| else: | |
| # Maintain original relative differences | |
| original_diff = final_pred[i] - final_pred[i-1] | |
| final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened | |
| print(f"Technical strategy final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}") | |
| # Apply additional smoothing for final correction | |
| if use_smoothing: | |
| final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha * 0.5, 3, use_smoothing) | |
| else: | |
| # Single prediction case | |
| final_pred[0] = last_actual | |
| print(f"Technical strategy final single prediction correction: Set to last actual value {last_actual:.4f}") | |
| # Verify final continuity for technical strategy | |
| final_first_pred = final_pred[0] | |
| final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100 | |
| print(f"Technical strategy final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)") | |
| if final_discontinuity <= 1.0: | |
| print("✓ Technical strategy continuity validation passed - predictions are smooth") | |
| else: | |
| print(f"⚠ Technical strategy continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains") | |
| except Exception as e: | |
| print(f"Technical strategy error: {str(e)}") | |
| # Fallback to simple moving average prediction | |
| print("Falling back to simple moving average prediction...") | |
| try: | |
| last_price = df['Close'].iloc[-1] | |
| volatility = df['Volatility'].iloc[-1] | |
| final_pred = np.array([last_price * (1 + 0.001 * i) for i in range(1, prediction_days + 1)]) | |
| final_uncertainty = np.array([volatility * last_price * np.sqrt(i) for i in range(1, prediction_days + 1)]) | |
| volume_pred = None | |
| volume_uncertainty = None | |
| ensemble_pred = np.array([]) | |
| ensemble_uncertainty = np.array([]) | |
| # Calculate advanced uncertainties for fallback case | |
| dummy_quantiles = np.array([[ | |
| [final_pred[i] - 2 * final_uncertainty[i], final_pred[i], final_pred[i] + 2 * final_uncertainty[i]] | |
| for i in range(len(final_pred)) | |
| ]]) | |
| advanced_uncertainties = calculate_advanced_uncertainty( | |
| dummy_quantiles, volatility, market_conditions | |
| ) | |
| # Final continuity validation for fallback case | |
| print("Performing final continuity validation for fallback prediction...") | |
| last_actual = df['Close'].iloc[-1] | |
| first_pred = final_pred[0] | |
| discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check | |
| if abs(first_pred - last_actual) > discontinuity_threshold: | |
| print(f"Fallback final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})") | |
| print(f"Fallback final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)") | |
| # Apply final continuity correction for fallback case | |
| if len(final_pred) > 1: | |
| # Calculate the overall trend | |
| overall_trend = final_pred[-1] - first_pred | |
| total_steps = len(final_pred) - 1 | |
| if total_steps > 0: | |
| trend_per_step = overall_trend / total_steps | |
| else: | |
| trend_per_step = 0 | |
| # Apply very smooth transition | |
| transition_length = min(8, len(final_pred)) # Longer transition for final correction | |
| for i in range(transition_length): | |
| if i == 0: | |
| # First prediction should be extremely close to last actual | |
| final_pred[i] = last_actual + trend_per_step * 0.05 | |
| else: | |
| # Very gradual transition | |
| transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing | |
| trend_contribution = trend_per_step * i * transition_factor | |
| final_pred[i] = last_actual + trend_contribution | |
| # Maintain relative relationships for remaining predictions | |
| if len(final_pred) > transition_length: | |
| for i in range(transition_length, len(final_pred)): | |
| if i == transition_length: | |
| # Smooth transition at boundary | |
| final_pred[i] = final_pred[i-1] + trend_per_step * 0.8 | |
| else: | |
| # Maintain original relative differences | |
| original_diff = final_pred[i] - final_pred[i-1] | |
| final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened | |
| print(f"Fallback final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}") | |
| else: | |
| # Single prediction case | |
| final_pred[0] = last_actual | |
| print(f"Fallback final single prediction correction: Set to last actual value {last_actual:.4f}") | |
| # Verify final continuity for fallback case | |
| final_first_pred = final_pred[0] | |
| final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100 | |
| print(f"Fallback final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)") | |
| if final_discontinuity <= 1.0: | |
| print("✓ Fallback continuity validation passed - predictions are smooth") | |
| else: | |
| print(f"⚠ Fallback continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains") | |
| except Exception as fallback_error: | |
| print(f"Fallback prediction error: {str(fallback_error)}") | |
| raise | |
| else: | |
| raise ValueError(f"Unknown strategy: {strategy}. Supported strategies: 'chronos', 'technical'") | |
| # Create prediction dates aligned exactly with prediction length | |
| last_date = df.index[-1] | |
| pred_len = int(len(final_pred)) if 'final_pred' in locals() and final_pred is not None else int(prediction_days) | |
| pred_len = max(1, pred_len) | |
| if timeframe == "1d": | |
| pred_dates = pd.date_range(start=last_date + timedelta(days=1), periods=pred_len) | |
| elif timeframe == "1h": | |
| pred_dates = pd.date_range(start=last_date + timedelta(hours=1), periods=pred_len, freq='H') | |
| else: # 15m | |
| pred_dates = pd.date_range(start=last_date + timedelta(minutes=15), periods=pred_len, freq='15min') | |
| # Create enhanced visualization with uncertainties integrated | |
| fig = make_subplots(rows=3, cols=1, | |
| shared_xaxes=True, | |
| vertical_spacing=0.12, | |
| subplot_titles=('Price Prediction with Uncertainty', 'Technical Indicators with Uncertainty', 'Volume with Uncertainty')) | |
| # Add historical price | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['Close'], name='Historical Price', | |
| line=dict(color='blue', width=2)), | |
| row=1, col=1 | |
| ) | |
| # Ensure uncertainty length matches prediction length to avoid plotting issues | |
| if final_uncertainty is None: | |
| final_uncertainty = np.zeros(len(final_pred), dtype=float) | |
| else: | |
| final_uncertainty = np.array(final_uncertainty, dtype=float) | |
| if len(final_uncertainty) < len(final_pred): | |
| last_u = final_uncertainty[-1] if len(final_uncertainty) > 0 else 0.0 | |
| final_uncertainty = np.pad(final_uncertainty, (0, len(final_pred) - len(final_uncertainty)), mode='constant', constant_values=last_u) | |
| elif len(final_uncertainty) > len(final_pred): | |
| final_uncertainty = final_uncertainty[:len(final_pred)] | |
| # Clean NaN/Inf and negative values | |
| final_uncertainty = np.where(np.isfinite(final_uncertainty), final_uncertainty, 0.0) | |
| final_uncertainty = np.maximum(final_uncertainty, 0.0) | |
| # Add predicted price with integrated uncertainty bands | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=final_pred, name='Predicted Price', | |
| line=dict(color='red', width=3)), | |
| row=1, col=1 | |
| ) | |
| # Add price uncertainty bands on the same subplot | |
| if final_uncertainty is not None and len(final_uncertainty) > 0: | |
| uncertainty_clean = np.array(final_uncertainty) | |
| uncertainty_clean = np.where(np.isnan(uncertainty_clean), 0, uncertainty_clean) | |
| # Create confidence bands | |
| confidence_68 = uncertainty_clean # 1 standard deviation (68% confidence) | |
| confidence_95 = 2 * uncertainty_clean # 2 standard deviations (95% confidence) | |
| # Plot 95% confidence bands | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=final_pred + confidence_95, name='95% Confidence Upper', | |
| line=dict(color='red', width=1, dash='dash'), showlegend=False), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=final_pred - confidence_95, name='95% Confidence Lower', | |
| line=dict(color='red', width=1, dash='dash'), fill='tonexty', | |
| fillcolor='rgba(255,0,0,0.1)', showlegend=False), | |
| row=1, col=1 | |
| ) | |
| # Plot 68% confidence bands | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=final_pred + confidence_68, name='68% Confidence Upper', | |
| line=dict(color='darkred', width=1, dash='dot'), showlegend=False), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=final_pred - confidence_68, name='68% Confidence Lower', | |
| line=dict(color='darkred', width=1, dash='dot'), fill='tonexty', | |
| fillcolor='rgba(139,0,0,0.1)', showlegend=False), | |
| row=1, col=1 | |
| ) | |
| # Add Bollinger Bands if available (on price subplot) | |
| if 'BB_Upper' in df.columns and 'BB_Middle' in df.columns and 'BB_Lower' in df.columns: | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['BB_Upper'], name='BB Upper (Historical)', | |
| line=dict(color='gray', width=1, dash='dash')), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['BB_Middle'], name='BB Middle (Historical)', | |
| line=dict(color='gray', width=1)), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['BB_Lower'], name='BB Lower (Historical)', | |
| line=dict(color='gray', width=1, dash='dash')), | |
| row=1, col=1 | |
| ) | |
| # Add predicted Bollinger Bands if available | |
| if 'BB_Upper' in technical_predictions: | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['BB_Upper'], name='BB Upper (Predicted)', | |
| line=dict(color='darkgray', width=2, dash='dash')), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['BB_Middle'], name='BB Middle (Predicted)', | |
| line=dict(color='darkgray', width=2)), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['BB_Lower'], name='BB Lower (Predicted)', | |
| line=dict(color='darkgray', width=2, dash='dash')), | |
| row=1, col=1 | |
| ) | |
| # Add technical indicators with their uncertainties on the same subplot | |
| # RSI with uncertainty | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['RSI'], name='RSI (Historical)', | |
| line=dict(color='purple', width=1)), | |
| row=2, col=1 | |
| ) | |
| if 'RSI' in technical_predictions: | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['RSI'], name='RSI (Predicted)', | |
| line=dict(color='purple', width=2, dash='dash')), | |
| row=2, col=1 | |
| ) | |
| # Add RSI uncertainty bands | |
| if 'RSI' in technical_uncertainties: | |
| rsi_upper = technical_predictions['RSI'] + 2 * technical_uncertainties['RSI'] | |
| rsi_lower = technical_predictions['RSI'] - 2 * technical_uncertainties['RSI'] | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=rsi_upper, name='RSI Upper Bound', | |
| line=dict(color='purple', width=1, dash='dot'), showlegend=False), | |
| row=2, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=rsi_lower, name='RSI Lower Bound', | |
| line=dict(color='purple', width=1, dash='dot'), fill='tonexty', | |
| fillcolor='rgba(128,0,128,0.1)', showlegend=False), | |
| row=2, col=1 | |
| ) | |
| # MACD with uncertainty | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=df['MACD'], name='MACD (Historical)', | |
| line=dict(color='orange', width=1)), | |
| row=2, col=1 | |
| ) | |
| if 'MACD' in technical_predictions: | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['MACD'], name='MACD (Predicted)', | |
| line=dict(color='orange', width=2, dash='dash')), | |
| row=2, col=1 | |
| ) | |
| # Add MACD signal line if available | |
| if 'MACD_Signal' in technical_predictions: | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=technical_predictions['MACD_Signal'], name='MACD Signal (Predicted)', | |
| line=dict(color='red', width=1, dash='dash')), | |
| row=2, col=1 | |
| ) | |
| # Add MACD uncertainty bands | |
| if 'MACD' in technical_uncertainties: | |
| macd_upper = technical_predictions['MACD'] + 2 * technical_uncertainties['MACD'] | |
| macd_lower = technical_predictions['MACD'] - 2 * technical_uncertainties['MACD'] | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=macd_upper, name='MACD Upper Bound', | |
| line=dict(color='orange', width=1, dash='dot'), showlegend=False), | |
| row=2, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=macd_lower, name='MACD Lower Bound', | |
| line=dict(color='orange', width=1, dash='dot'), fill='tonexty', | |
| fillcolor='rgba(255,165,0,0.1)', showlegend=False), | |
| row=2, col=1 | |
| ) | |
| # Add volume with uncertainty on the same subplot | |
| if 'Volume' in df.columns and not df['Volume'].isna().all(): | |
| volume_data = pd.to_numeric(df['Volume'], errors='coerce').fillna(0) | |
| fig.add_trace( | |
| go.Bar(x=df.index, y=volume_data, name='Historical Volume', | |
| marker_color='lightblue', opacity=0.7), | |
| row=3, col=1 | |
| ) | |
| else: | |
| print("Warning: No valid volume data available for plotting") | |
| # Add predicted volume with better handling | |
| if volume_pred is not None and len(volume_pred) > 0: | |
| # Ensure volume prediction is numeric and handle any NaN values | |
| volume_pred_clean = np.array(volume_pred) | |
| volume_pred_clean = np.where(np.isnan(volume_pred_clean), 0, volume_pred_clean) | |
| fig.add_trace( | |
| go.Bar(x=pred_dates, y=volume_pred_clean, name='Predicted Volume', | |
| marker_color='red', opacity=0.7), | |
| row=3, col=1 | |
| ) | |
| # Add volume uncertainty bands if available | |
| if volume_uncertainty is not None and len(volume_uncertainty) > 0: | |
| volume_uncertainty_clean = np.array(volume_uncertainty) | |
| volume_uncertainty_clean = np.where(np.isnan(volume_uncertainty_clean), 0, volume_uncertainty_clean) | |
| volume_upper = volume_pred_clean + 2 * volume_uncertainty_clean | |
| volume_lower = np.maximum(0, volume_pred_clean - 2 * volume_uncertainty_clean) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=volume_upper, name='Volume Upper Bound', | |
| line=dict(color='red', width=1, dash='dot'), showlegend=False), | |
| row=3, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter(x=pred_dates, y=volume_lower, name='Volume Lower Bound', | |
| line=dict(color='red', width=1, dash='dot'), fill='tonexty', | |
| fillcolor='rgba(255,0,0,0.1)', showlegend=False), | |
| row=3, col=1 | |
| ) | |
| # Add reference lines for technical indicators | |
| # RSI overbought/oversold lines | |
| fig.add_hline(y=70, line_dash="dash", line_color="red", | |
| annotation_text="Overbought (70)", row=2, col=1) | |
| fig.add_hline(y=30, line_dash="dash", line_color="green", | |
| annotation_text="Oversold (30)", row=2, col=1) | |
| # MACD zero line | |
| fig.add_hline(y=0, line_dash="dash", line_color="black", | |
| annotation_text="MACD Zero", row=2, col=1) | |
| # Add volume reference line (average volume) | |
| if 'Volume' in df.columns and not df['Volume'].isna().all(): | |
| avg_volume = df['Volume'].mean() | |
| fig.add_hline(y=avg_volume, line_dash="dash", line_color="blue", | |
| annotation_text=f"Avg Volume: {avg_volume:,.0f}", row=3, col=1) | |
| fig.update_layout( | |
| title=dict( | |
| text=f'Enhanced Stock Prediction for {symbol}', | |
| x=0.5, | |
| xanchor='center', | |
| font=dict(size=18, color='black'), | |
| y=0.95 # Moved down slightly to avoid overlap | |
| ), | |
| height=1000, | |
| showlegend=True, | |
| legend=dict( | |
| orientation="h", | |
| yanchor="top", | |
| y=-0.15, # Position legend below all subplots | |
| xanchor="center", | |
| x=0.5, # Center the legend horizontally | |
| bgcolor='rgba(255,255,255,0.9)', | |
| bordercolor='black', | |
| borderwidth=1, | |
| font=dict(size=10) # Smaller font for better fit | |
| ), | |
| margin=dict(t=120, b=150, l=80, r=80), # Increased bottom margin for legend | |
| autosize=True, | |
| hovermode='x unified' | |
| ) | |
| fig.update_xaxes( | |
| title_text="Date", | |
| row=3, col=1, | |
| title_font=dict(size=12, color='black'), | |
| tickfont=dict(size=10) | |
| ) | |
| fig.update_yaxes( | |
| title_text="Price ($)", | |
| row=1, col=1, | |
| title_font=dict(size=12, color='black'), | |
| tickfont=dict(size=10) | |
| ) | |
| fig.update_yaxes( | |
| title_text="Technical Indicators", | |
| row=2, col=1, | |
| title_font=dict(size=12, color='black'), | |
| tickfont=dict(size=10) | |
| ) | |
| fig.update_yaxes( | |
| title_text="Volume", | |
| row=3, col=1, | |
| title_font=dict(size=12, color='black'), | |
| tickfont=dict(size=10) | |
| ) | |
| for i in range(len(fig.layout.annotations)): | |
| if i < 3: | |
| fig.layout.annotations[i].update( | |
| font=dict(size=13, color='darkblue', family='Arial, sans-serif'), | |
| y=fig.layout.annotations[i].y + 0.01, # Reduced adjustment to prevent overlap | |
| bgcolor='rgba(255,255,255,0.9)', | |
| bordercolor='lightgray', | |
| borderwidth=1 | |
| ) | |
| # Create comprehensive trading signals | |
| trading_signals = { | |
| 'prediction': { | |
| 'dates': pred_dates.tolist(), | |
| 'prices': final_pred.tolist(), | |
| 'uncertainty': final_uncertainty.tolist(), | |
| 'volume': volume_pred.tolist() if volume_pred is not None else None | |
| }, | |
| 'historical': { | |
| 'dates': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(), | |
| 'prices': df['Close'].tolist(), | |
| 'volume': df['Volume'].tolist() if 'Volume' in df.columns else None | |
| }, | |
| 'technical_indicators': { | |
| 'predictions': {k: v.tolist() for k, v in technical_predictions.items()}, | |
| 'uncertainties': {k: v.tolist() for k, v in technical_uncertainties.items()} | |
| }, | |
| 'advanced_uncertainties': advanced_uncertainties, | |
| 'regime_info': regime_info, | |
| 'sentiment_data': sentiment_data, | |
| 'market_conditions': market_conditions, | |
| 'covariate_data_available': len(covariate_data) > 0 | |
| } | |
| # Add stress testing results if enabled | |
| stress_test_results = {} | |
| if use_stress_testing: | |
| try: | |
| print("Performing stress testing...") | |
| stress_test_results = stress_test_scenarios(df, final_pred) | |
| trading_signals['stress_test_results'] = stress_test_results | |
| except Exception as stress_error: | |
| print(f"Stress testing failed: {str(stress_error)}") | |
| trading_signals['stress_test_results'] = {"error": str(stress_error)} | |
| # Add advanced trading signals | |
| try: | |
| print("Generating advanced trading signals...") | |
| advanced_signals = advanced_trading_signals(df, regime_info) | |
| trading_signals['advanced_signals'] = advanced_signals | |
| except Exception as advanced_error: | |
| print(f"Advanced trading signals failed: {str(advanced_error)}") | |
| trading_signals['advanced_signals'] = {"error": str(advanced_error)} | |
| # Add basic trading signals | |
| try: | |
| basic_signals = calculate_trading_signals(df) | |
| trading_signals.update(basic_signals) | |
| trading_signals['symbol'] = symbol | |
| trading_signals['timeframe'] = timeframe | |
| trading_signals['strategy_used'] = strategy | |
| trading_signals['ensemble_used'] = use_ensemble | |
| except Exception as basic_error: | |
| print(f"Basic trading signals failed: {str(basic_error)}") | |
| trading_signals['error'] = str(basic_error) | |
| # Debug information for volume and uncertainty | |
| print(f"Volume data info:") | |
| print(f" Historical volume shape: {df['Volume'].shape if 'Volume' in df.columns else 'No volume column'}") | |
| print(f" Historical volume NaN count: {df['Volume'].isna().sum() if 'Volume' in df.columns else 'N/A'}") | |
| print(f" Predicted volume shape: {volume_pred.shape if volume_pred is not None else 'None'}") | |
| print(f" Predicted volume NaN count: {np.isnan(volume_pred).sum() if volume_pred is not None else 'N/A'}") | |
| print(f"Uncertainty data info:") | |
| print(f" Final uncertainty shape: {final_uncertainty.shape if final_uncertainty is not None else 'None'}") | |
| print(f" Final uncertainty NaN count: {np.isnan(final_uncertainty).sum() if final_uncertainty is not None else 'N/A'}") | |
| print(f" Final uncertainty range: [{np.min(final_uncertainty):.4f}, {np.max(final_uncertainty):.4f}]" if final_uncertainty is not None else 'N/A') | |
| return trading_signals, fig | |
| except Exception as e: | |
| print(f"Enhanced prediction error: {str(e)}") | |
| raise | |
| def calculate_trading_signals(df: pd.DataFrame) -> Dict: | |
| """Calculate trading signals based on technical indicators""" | |
| signals = { | |
| "RSI": "Oversold" if df['RSI'].iloc[-1] < 30 else "Overbought" if df['RSI'].iloc[-1] > 70 else "Neutral", | |
| "MACD": "Buy" if df['MACD'].iloc[-1] > df['MACD_Signal'].iloc[-1] else "Sell", | |
| "Bollinger": "Buy" if df['Close'].iloc[-1] < df['BB_Lower'].iloc[-1] else "Sell" if df['Close'].iloc[-1] > df['BB_Upper'].iloc[-1] else "Hold", | |
| "SMA": "Buy" if df['SMA_20'].iloc[-1] > df['SMA_50'].iloc[-1] else "Sell" | |
| } | |
| # Calculate overall signal | |
| buy_signals = sum(1 for signal in signals.values() if signal == "Buy") | |
| sell_signals = sum(1 for signal in signals.values() if signal == "Sell") | |
| if buy_signals > sell_signals: | |
| signals["Overall"] = "Buy" | |
| elif sell_signals > buy_signals: | |
| signals["Overall"] = "Sell" | |
| else: | |
| signals["Overall"] = "Hold" | |
| return signals | |
| def get_market_data(symbol: str = "^GSPC", lookback_days: int = 1095) -> pd.DataFrame: | |
| """ | |
| Fetch market data (S&P 500 by default) for correlation analysis and regime detection. | |
| Uses recommended yfinance API methods for better reliability. | |
| Args: | |
| symbol (str): Market index symbol (default: ^GSPC for S&P 500) | |
| lookback_days (int): Number of days to look back | |
| Returns: | |
| pd.DataFrame: Market data with returns | |
| """ | |
| cache_key = f"{symbol}_{lookback_days}" | |
| current_time = time.time() | |
| # Check cache | |
| if cache_key in market_data_cache and current_time < cache_expiry.get(cache_key, 0): | |
| return market_data_cache[cache_key] | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| def fetch_market_history(): | |
| return ticker.history( | |
| period=f"{lookback_days}d", | |
| interval="1d", | |
| prepost=False, | |
| actions=False, | |
| auto_adjust=True, | |
| back_adjust=True, | |
| repair=True | |
| ) | |
| df = retry_yfinance_request(fetch_market_history) | |
| if df is not None and not df.empty: | |
| df['Returns'] = df['Close'].pct_change() | |
| df['Volatility'] = df['Returns'].rolling(window=20).std() | |
| # Cache the data | |
| market_data_cache[cache_key] = df | |
| cache_expiry[cache_key] = current_time + CACHE_DURATION | |
| print(f"Successfully fetched market data for {symbol}: {len(df)} data points") | |
| else: | |
| print(f"Warning: No data returned for {symbol}") | |
| df = pd.DataFrame() | |
| return df | |
| except Exception as e: | |
| print(f"Warning: Could not fetch market data for {symbol}: {str(e)}") | |
| return pd.DataFrame() | |
| def detect_market_regime(returns: pd.Series, n_regimes: int = 3) -> Dict: | |
| """ | |
| Detect market regime using Hidden Markov Model or simplified methods. | |
| Args: | |
| returns (pd.Series): Price returns | |
| n_regimes (int): Number of regimes to detect | |
| Returns: | |
| Dict: Regime information including probabilities and characteristics | |
| """ | |
| def get_regime_name(regime_idx: int, means: List[float], volatilities: List[float]) -> str: | |
| """ | |
| Convert regime index to descriptive name based on characteristics. | |
| Args: | |
| regime_idx (int): Regime index (0, 1, 2) | |
| means (List[float]): List of regime means | |
| volatilities (List[float]): List of regime volatilities | |
| Returns: | |
| str: Descriptive regime name | |
| """ | |
| if len(means) != 3 or len(volatilities) != 3: | |
| return f"Regime {regime_idx}" | |
| # Sort regimes by volatility (low to high) | |
| vol_sorted = sorted(range(len(volatilities)), key=lambda i: volatilities[i]) | |
| # Sort regimes by mean return (low to high) | |
| mean_sorted = sorted(range(len(means)), key=lambda i: means[i]) | |
| # Determine regime characteristics | |
| if regime_idx == vol_sorted[0]: # Lowest volatility | |
| if means[regime_idx] > 0: | |
| return "Low Volatility Bull" | |
| else: | |
| return "Low Volatility Bear" | |
| elif regime_idx == vol_sorted[2]: # Highest volatility | |
| if means[regime_idx] > 0: | |
| return "High Volatility Bull" | |
| else: | |
| return "High Volatility Bear" | |
| else: # Medium volatility | |
| if means[regime_idx] > 0: | |
| return "Moderate Bull" | |
| else: | |
| return "Moderate Bear" | |
| if len(returns) < 50: | |
| return {"regime": "Normal Market", "probabilities": [1.0], "volatility": returns.std()} | |
| try: | |
| if HMM_AVAILABLE: | |
| # Use HMM for regime detection | |
| # Convert pandas Series to numpy array for reshape | |
| returns_array = returns.dropna().values | |
| # Try different HMM configurations if convergence fails | |
| for attempt in range(3): | |
| try: | |
| if attempt == 0: | |
| model = hmm.GaussianHMM( | |
| n_components=n_regimes, | |
| random_state=42, | |
| covariance_type="full", | |
| n_iter=500, | |
| tol=1e-4, | |
| init_params="stmc" | |
| ) | |
| elif attempt == 1: | |
| model = hmm.GaussianHMM( | |
| n_components=n_regimes, | |
| random_state=42, | |
| covariance_type="diag", | |
| n_iter=1000, | |
| tol=1e-3, | |
| init_params="stmc" | |
| ) | |
| else: | |
| model = hmm.GaussianHMM( | |
| n_components=n_regimes, | |
| random_state=42, | |
| covariance_type="spherical", | |
| n_iter=1500, | |
| tol=1e-2, | |
| init_params="stmc" | |
| ) | |
| # Add data preprocessing to improve convergence | |
| returns_clean = returns_array[~np.isnan(returns_array)] | |
| returns_clean = returns_clean[~np.isinf(returns_clean)] | |
| # Remove outliers that might cause convergence issues | |
| q1, q3 = np.percentile(returns_clean, [25, 75]) | |
| iqr = q3 - q1 | |
| lower_bound = q1 - 1.5 * iqr | |
| upper_bound = q3 + 1.5 * iqr | |
| returns_filtered = returns_clean[(returns_clean >= lower_bound) & (returns_clean <= upper_bound)] | |
| # Ensure we have enough data | |
| if len(returns_filtered) < 50: | |
| returns_filtered = returns_clean | |
| # Fit the model with filtered data | |
| model.fit(returns_filtered.reshape(-1, 1)) | |
| # Check if model converged | |
| if model.monitor_.converged: | |
| print(f"HMM converged successfully with {model.covariance_type} covariance type") | |
| else: | |
| print(f"HMM did not converge with {model.covariance_type} covariance type, trying next configuration...") | |
| if attempt < 2: # Not the last attempt | |
| continue | |
| else: | |
| print("HMM failed to converge with all configurations, using fallback method") | |
| raise Exception("HMM convergence failed") | |
| # Get regime probabilities for the last observation | |
| regime_probs = model.predict_proba(returns_array.reshape(-1, 1)) | |
| current_regime = model.predict(returns_array.reshape(-1, 1))[-1] | |
| # Calculate regime characteristics | |
| regime_means = model.means_.flatten() | |
| regime_vols = np.sqrt(model.covars_.diagonal(axis1=1, axis2=2)) if model.covariance_type == "full" else np.sqrt(model.covars_) | |
| # Convert regime index to descriptive name | |
| regime_name = get_regime_name(int(current_regime), regime_means.tolist(), regime_vols.tolist()) | |
| return { | |
| "regime": regime_name, | |
| "regime_index": int(current_regime), | |
| "probabilities": regime_probs[-1].tolist(), | |
| "means": regime_means.tolist(), | |
| "volatilities": regime_vols.tolist(), | |
| "method": f"HMM-{model.covariance_type}" | |
| } | |
| except Exception as e: | |
| if attempt == 2: # Last attempt failed | |
| print(f"HMM failed after {attempt + 1} attempts: {str(e)}") | |
| break | |
| continue | |
| else: | |
| # Simplified regime detection using volatility clustering | |
| volatility = returns.rolling(window=20).std().dropna() | |
| vol_percentile = volatility.iloc[-1] / volatility.quantile(0.8) | |
| if vol_percentile > 1.2: | |
| regime_name = "High Volatility Market" | |
| regime = 2 # High volatility regime | |
| elif vol_percentile < 0.8: | |
| regime_name = "Low Volatility Market" | |
| regime = 0 # Low volatility regime | |
| else: | |
| regime_name = "Normal Market" | |
| regime = 1 # Normal regime | |
| return { | |
| "regime": regime_name, | |
| "regime_index": regime, | |
| "probabilities": [0.1, 0.8, 0.1] if regime == 1 else [0.8, 0.1, 0.1] if regime == 0 else [0.1, 0.1, 0.8], | |
| "volatility": volatility.iloc[-1], | |
| "method": "Volatility-based" | |
| } | |
| except Exception as e: | |
| print(f"Warning: Regime detection failed: {str(e)}") | |
| return {"regime": "Normal Market", "regime_index": 1, "probabilities": [1.0], "volatility": returns.std(), "method": "Fallback"} | |
| def calculate_advanced_risk_metrics(df: pd.DataFrame, market_returns: pd.Series = None, | |
| risk_free_rate: float = 0.02) -> Dict: | |
| """ | |
| Calculate advanced risk metrics including tail risk and market correlation. | |
| Args: | |
| df (pd.DataFrame): Stock data | |
| market_returns (pd.Series): Market returns for correlation analysis | |
| risk_free_rate (float): Annual risk-free rate | |
| Returns: | |
| Dict: Advanced risk metrics | |
| """ | |
| try: | |
| returns = df['Returns'].dropna() | |
| if len(returns) < 30: | |
| return {"error": "Insufficient data for risk calculation"} | |
| # Basic metrics | |
| annual_return = returns.mean() * 252 | |
| annual_vol = returns.std() * np.sqrt(252) | |
| # Market-adjusted metrics | |
| beta = 1.0 | |
| alpha = 0.0 | |
| correlation = 0.0 | |
| aligned_returns = None | |
| aligned_market = None | |
| if market_returns is not None and len(market_returns) > 0: | |
| try: | |
| # Align dates | |
| aligned_returns = returns.reindex(market_returns.index).dropna() | |
| aligned_market = market_returns.reindex(aligned_returns.index).dropna() | |
| # Ensure both arrays have the same length | |
| if len(aligned_returns) > 10 and len(aligned_market) > 10: | |
| # Find the common length | |
| min_length = min(len(aligned_returns), len(aligned_market)) | |
| aligned_returns = aligned_returns.iloc[-min_length:] | |
| aligned_market = aligned_market.iloc[-min_length:] | |
| # Ensure they have the same length | |
| if len(aligned_returns) == len(aligned_market) and len(aligned_returns) > 10: | |
| try: | |
| beta = np.cov(aligned_returns, aligned_market)[0,1] / np.var(aligned_market) | |
| alpha = aligned_returns.mean() - beta * aligned_market.mean() | |
| correlation = np.corrcoef(aligned_returns, aligned_market)[0,1] | |
| except Exception as e: | |
| print(f"Market correlation calculation error: {str(e)}") | |
| beta = 1.0 | |
| alpha = 0.0 | |
| correlation = 0.0 | |
| else: | |
| beta = 1.0 | |
| alpha = 0.0 | |
| correlation = 0.0 | |
| else: | |
| beta = 1.0 | |
| alpha = 0.0 | |
| correlation = 0.0 | |
| except Exception as e: | |
| print(f"Market data alignment error: {str(e)}") | |
| beta = 1.0 | |
| alpha = 0.0 | |
| correlation = 0.0 | |
| aligned_returns = None | |
| aligned_market = None | |
| # Tail risk metrics | |
| var_95 = np.percentile(returns, 5) | |
| var_99 = np.percentile(returns, 1) | |
| cvar_95 = returns[returns <= var_95].mean() | |
| cvar_99 = returns[returns <= var_99].mean() | |
| # Maximum drawdown | |
| cumulative_returns = (1 + returns).cumprod() | |
| rolling_max = cumulative_returns.expanding().max() | |
| drawdown = (cumulative_returns - rolling_max) / rolling_max | |
| max_drawdown = drawdown.min() | |
| # Skewness and kurtosis | |
| skewness = stats.skew(returns) | |
| kurtosis = stats.kurtosis(returns) | |
| # Risk-adjusted returns | |
| sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0 | |
| sortino_ratio = (annual_return - risk_free_rate) / (returns[returns < 0].std() * np.sqrt(252)) if returns[returns < 0].std() > 0 else 0 | |
| calmar_ratio = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0 | |
| # Information ratio (if market data available) | |
| information_ratio = 0 | |
| if aligned_returns is not None and aligned_market is not None: | |
| try: | |
| if len(aligned_returns) > 10 and len(aligned_market) > 10: | |
| min_length = min(len(aligned_returns), len(aligned_market)) | |
| aligned_returns_for_ir = aligned_returns.iloc[-min_length:] | |
| aligned_market_for_ir = aligned_market.iloc[-min_length:] | |
| if len(aligned_returns_for_ir) == len(aligned_market_for_ir): | |
| excess_returns = aligned_returns_for_ir - aligned_market_for_ir | |
| information_ratio = excess_returns.mean() / excess_returns.std() if excess_returns.std() > 0 else 0 | |
| else: | |
| information_ratio = 0 | |
| else: | |
| information_ratio = 0 | |
| except Exception as e: | |
| print(f"Information ratio calculation error: {str(e)}") | |
| information_ratio = 0 | |
| return { | |
| "Annual_Return": annual_return, | |
| "Annual_Volatility": annual_vol, | |
| "Sharpe_Ratio": sharpe_ratio, | |
| "Sortino_Ratio": sortino_ratio, | |
| "Calmar_Ratio": calmar_ratio, | |
| "Information_Ratio": information_ratio, | |
| "Beta": beta, | |
| "Alpha": alpha * 252, | |
| "Correlation_with_Market": correlation, | |
| "VaR_95": var_95, | |
| "VaR_99": var_99, | |
| "CVaR_95": cvar_95, | |
| "CVaR_99": cvar_99, | |
| "Max_Drawdown": max_drawdown, | |
| "Skewness": skewness, | |
| "Kurtosis": kurtosis, | |
| "Risk_Free_Rate": risk_free_rate | |
| } | |
| except Exception as e: | |
| print(f"Advanced risk metrics calculation error: {str(e)}") | |
| return {"error": f"Risk calculation failed: {str(e)}"} | |
| def create_ensemble_prediction(df: pd.DataFrame, prediction_days: int, | |
| ensemble_weights: Dict = None) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Create ensemble prediction combining multiple models. | |
| Args: | |
| df (pd.DataFrame): Historical data | |
| prediction_days (int): Number of days to predict | |
| ensemble_weights (Dict): Weights for different models | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray]: Mean and uncertainty predictions | |
| """ | |
| if ensemble_weights is None: | |
| ensemble_weights = {"chronos": 0.6, "technical": 0.2, "statistical": 0.2} | |
| predictions = {} | |
| uncertainties = {} | |
| # Chronos prediction (placeholder - will be filled by main prediction function) | |
| predictions["chronos"] = np.array([]) | |
| uncertainties["chronos"] = np.array([]) | |
| # Technical prediction | |
| if ensemble_weights.get("technical", 0) > 0: | |
| try: | |
| last_price = df['Close'].iloc[-1] | |
| rsi = df['RSI'].iloc[-1] | |
| macd = df['MACD'].iloc[-1] | |
| macd_signal = df['MACD_Signal'].iloc[-1] | |
| volatility = df['Volatility'].iloc[-1] | |
| # Enhanced technical prediction | |
| trend = 1 if (rsi > 50 and macd > macd_signal) else -1 | |
| mean_reversion = (df['SMA_200'].iloc[-1] - last_price) / last_price if 'SMA_200' in df.columns else 0 | |
| tech_pred = [] | |
| for i in range(1, prediction_days + 1): | |
| # Combine trend and mean reversion | |
| prediction = last_price * (1 + trend * volatility * 0.3 + mean_reversion * 0.1 * i) | |
| tech_pred.append(prediction) | |
| predictions["technical"] = np.array(tech_pred) | |
| uncertainties["technical"] = np.array([volatility * last_price * i for i in range(1, prediction_days + 1)]) | |
| except Exception as e: | |
| print(f"Technical prediction error: {str(e)}") | |
| predictions["technical"] = np.array([]) | |
| uncertainties["technical"] = np.array([]) | |
| # Statistical prediction (ARIMA-like) | |
| if ensemble_weights.get("statistical", 0) > 0: | |
| try: | |
| returns = df['Returns'].dropna() | |
| if len(returns) > 10: | |
| # Simple moving average with momentum | |
| ma_short = df['Close'].rolling(window=10).mean().iloc[-1] | |
| ma_long = df['Close'].rolling(window=30).mean().iloc[-1] | |
| momentum = (ma_short - ma_long) / ma_long | |
| last_price = df['Close'].iloc[-1] | |
| stat_pred = [] | |
| for i in range(1, prediction_days + 1): | |
| # Mean reversion with momentum | |
| prediction = last_price * (1 + momentum * 0.5 - 0.001 * i) # Decay factor | |
| stat_pred.append(prediction) | |
| predictions["statistical"] = np.array(stat_pred) | |
| uncertainties["statistical"] = np.array([returns.std() * last_price * np.sqrt(i) for i in range(1, prediction_days + 1)]) | |
| else: | |
| predictions["statistical"] = np.array([]) | |
| uncertainties["statistical"] = np.array([]) | |
| except Exception as e: | |
| print(f"Statistical prediction error: {str(e)}") | |
| predictions["statistical"] = np.array([]) | |
| uncertainties["statistical"] = np.array([]) | |
| # Combine predictions | |
| valid_predictions = {k: v for k, v in predictions.items() if len(v) > 0} | |
| valid_uncertainties = {k: v for k, v in uncertainties.items() if len(v) > 0} | |
| if not valid_predictions: | |
| return np.array([]), np.array([]) | |
| # Weighted ensemble | |
| total_weight = sum(ensemble_weights.get(k, 0) for k in valid_predictions.keys()) | |
| if total_weight == 0: | |
| return np.array([]), np.array([]) | |
| # Normalize weights | |
| normalized_weights = {k: ensemble_weights.get(k, 0) / total_weight for k in valid_predictions.keys()} | |
| # Calculate weighted mean and uncertainty | |
| max_length = max(len(v) for v in valid_predictions.values()) | |
| ensemble_mean = np.zeros(max_length) | |
| ensemble_uncertainty = np.zeros(max_length) | |
| for model, pred in valid_predictions.items(): | |
| weight = normalized_weights[model] | |
| if len(pred) < max_length: | |
| # Extend prediction using last value | |
| extended_pred = np.concatenate([pred, np.full(max_length - len(pred), pred[-1])]) | |
| extended_unc = np.concatenate([valid_uncertainties[model], np.full(max_length - len(pred), valid_uncertainties[model][-1])]) | |
| else: | |
| extended_pred = pred[:max_length] | |
| extended_unc = valid_uncertainties[model][:max_length] | |
| ensemble_mean += weight * extended_pred | |
| ensemble_uncertainty += weight * extended_unc | |
| return ensemble_mean, ensemble_uncertainty | |
| def stress_test_scenarios(df: pd.DataFrame, prediction: np.ndarray, | |
| scenarios: Dict = None) -> Dict: | |
| """ | |
| Perform stress testing under various market scenarios. | |
| Args: | |
| df (pd.DataFrame): Historical data | |
| prediction (np.ndarray): Base prediction | |
| scenarios (Dict): Stress test scenarios | |
| Returns: | |
| Dict: Stress test results | |
| """ | |
| if scenarios is None: | |
| scenarios = { | |
| "market_crash": {"volatility_multiplier": 3.0, "return_shock": -0.15}, | |
| "high_volatility": {"volatility_multiplier": 2.0, "return_shock": -0.05}, | |
| "low_volatility": {"volatility_multiplier": 0.5, "return_shock": 0.02}, | |
| "bull_market": {"volatility_multiplier": 1.2, "return_shock": 0.10}, | |
| "interest_rate_shock": {"volatility_multiplier": 1.5, "return_shock": -0.08} | |
| } | |
| base_volatility = df['Volatility'].iloc[-1] | |
| base_return = df['Returns'].mean() | |
| last_price = df['Close'].iloc[-1] | |
| stress_results = {} | |
| for scenario_name, params in scenarios.items(): | |
| try: | |
| # Calculate stressed parameters | |
| stressed_vol = base_volatility * params["volatility_multiplier"] | |
| stressed_return = base_return + params["return_shock"] | |
| # Generate stressed prediction | |
| stressed_pred = [] | |
| for i, pred in enumerate(prediction): | |
| # Apply stress factors | |
| stress_factor = 1 + stressed_return * (i + 1) / 252 | |
| volatility_impact = np.random.normal(0, stressed_vol * np.sqrt((i + 1) / 252)) | |
| stressed_price = pred * stress_factor * (1 + volatility_impact) | |
| stressed_pred.append(stressed_price) | |
| # Calculate stress metrics | |
| stress_results[scenario_name] = { | |
| "prediction": np.array(stressed_pred), | |
| "max_loss": min(stressed_pred) / last_price - 1, | |
| "volatility": stressed_vol, | |
| "expected_return": stressed_return, | |
| "var_95": np.percentile([p / last_price - 1 for p in stressed_pred], 5) | |
| } | |
| except Exception as e: | |
| print(f"Stress test error for {scenario_name}: {str(e)}") | |
| stress_results[scenario_name] = {"error": str(e)} | |
| return stress_results | |
| def calculate_skewed_uncertainty(quantiles: np.ndarray, confidence_level: float = 0.9) -> np.ndarray: | |
| """ | |
| Calculate uncertainty accounting for skewness in return distributions. | |
| Args: | |
| quantiles (np.ndarray): Quantile predictions from Chronos | |
| confidence_level (float): Confidence level for uncertainty calculation | |
| Returns: | |
| np.ndarray: Uncertainty estimates | |
| """ | |
| try: | |
| lower = quantiles[0, :, 0] | |
| median = quantiles[0, :, 1] | |
| upper = quantiles[0, :, 2] | |
| # Calculate skewness for each prediction point | |
| uncertainties = [] | |
| for i in range(len(lower)): | |
| # Calculate skewness | |
| if upper[i] != median[i] and median[i] != lower[i]: | |
| skewness = (median[i] - lower[i]) / (upper[i] - median[i]) | |
| else: | |
| skewness = 1.0 | |
| # Adjust z-score based on skewness | |
| if skewness > 1.2: # Right-skewed | |
| z_score = stats.norm.ppf(confidence_level) * (1 + 0.1 * skewness) | |
| elif skewness < 0.8: # Left-skewed | |
| z_score = stats.norm.ppf(confidence_level) * (1 - 0.1 * abs(skewness)) | |
| else: | |
| z_score = stats.norm.ppf(confidence_level) | |
| # Calculate uncertainty | |
| uncertainty = (upper[i] - lower[i]) / (2 * z_score) | |
| uncertainties.append(uncertainty) | |
| return np.array(uncertainties) | |
| except Exception as e: | |
| print(f"Skewed uncertainty calculation error: {str(e)}") | |
| # Fallback to simple calculation | |
| return (quantiles[0, :, 2] - quantiles[0, :, 0]) / (2 * 1.645) | |
| def adaptive_smoothing(new_pred: np.ndarray, historical_pred: np.ndarray, | |
| prediction_uncertainty: np.ndarray) -> np.ndarray: | |
| """ | |
| Apply adaptive smoothing based on prediction uncertainty. | |
| Args: | |
| new_pred (np.ndarray): New predictions | |
| historical_pred (np.ndarray): Historical predictions | |
| prediction_uncertainty (np.ndarray): Prediction uncertainty | |
| Returns: | |
| np.ndarray: Smoothed predictions | |
| """ | |
| try: | |
| if len(historical_pred) == 0: | |
| return new_pred | |
| # Calculate adaptive alpha based on uncertainty | |
| uncertainty_ratio = prediction_uncertainty / np.mean(np.abs(historical_pred)) | |
| if uncertainty_ratio > 0.1: # High uncertainty | |
| alpha = 0.1 # More smoothing | |
| elif uncertainty_ratio < 0.05: # Low uncertainty | |
| alpha = 0.5 # Less smoothing | |
| else: | |
| alpha = 0.3 # Default | |
| # Apply weighted smoothing | |
| smoothed = alpha * new_pred + (1 - alpha) * historical_pred[-len(new_pred):] | |
| return smoothed | |
| except Exception as e: | |
| print(f"Adaptive smoothing error: {str(e)}") | |
| return new_pred | |
| def advanced_trading_signals(df: pd.DataFrame, regime_info: Dict = None) -> Dict: | |
| """ | |
| Generate advanced trading signals with confidence levels and regime awareness. | |
| Args: | |
| df (pd.DataFrame): Stock data | |
| regime_info (Dict): Market regime information | |
| Returns: | |
| Dict: Advanced trading signals | |
| """ | |
| try: | |
| # Calculate signal strength and confidence | |
| rsi = df['RSI'].iloc[-1] | |
| macd = df['MACD'].iloc[-1] | |
| macd_signal = df['MACD_Signal'].iloc[-1] | |
| rsi_strength = abs(rsi - 50) / 50 # 0-1 scale | |
| macd_strength = abs(macd - macd_signal) / df['Close'].iloc[-1] | |
| # Regime-adjusted thresholds | |
| if regime_info and "volatilities" in regime_info: | |
| volatility_regime = df['Volatility'].iloc[-1] / np.mean(regime_info["volatilities"]) | |
| else: | |
| volatility_regime = 1.0 | |
| # Adjust RSI thresholds based on volatility | |
| rsi_oversold = 30 + (volatility_regime - 1) * 10 | |
| rsi_overbought = 70 - (volatility_regime - 1) * 10 | |
| # Calculate signals with confidence | |
| signals = {} | |
| # RSI signal | |
| if rsi < rsi_oversold: | |
| rsi_signal = "Oversold" | |
| rsi_confidence = min(0.9, 0.5 + rsi_strength * 0.4) | |
| elif rsi > rsi_overbought: | |
| rsi_signal = "Overbought" | |
| rsi_confidence = min(0.9, 0.5 + rsi_strength * 0.4) | |
| else: | |
| rsi_signal = "Neutral" | |
| rsi_confidence = 0.3 | |
| signals["RSI"] = { | |
| "signal": rsi_signal, | |
| "strength": rsi_strength, | |
| "confidence": rsi_confidence, | |
| "value": rsi | |
| } | |
| # MACD signal | |
| if macd > macd_signal: | |
| macd_signal = "Buy" | |
| macd_confidence = min(0.8, 0.4 + macd_strength * 40) | |
| else: | |
| macd_signal = "Sell" | |
| macd_confidence = min(0.8, 0.4 + macd_strength * 40) | |
| signals["MACD"] = { | |
| "signal": macd_signal, | |
| "strength": macd_strength, | |
| "confidence": macd_confidence, | |
| "value": macd | |
| } | |
| # Bollinger Bands signal | |
| if 'BB_Upper' in df.columns and 'BB_Lower' in df.columns: | |
| current_price = df['Close'].iloc[-1] | |
| bb_upper = df['BB_Upper'].iloc[-1] | |
| bb_lower = df['BB_Lower'].iloc[-1] | |
| # Calculate position within Bollinger Bands (0-1 scale) | |
| bb_position = (current_price - bb_lower) / (bb_upper - bb_lower) if bb_upper != bb_lower else 0.5 | |
| bb_strength = abs(bb_position - 0.5) * 2 # 0-1 scale, strongest at edges | |
| if current_price < bb_lower: | |
| bb_signal = "Buy" | |
| bb_confidence = 0.7 | |
| elif current_price > bb_upper: | |
| bb_signal = "Sell" | |
| bb_confidence = 0.7 | |
| else: | |
| bb_signal = "Hold" | |
| bb_confidence = 0.5 | |
| signals["Bollinger"] = { | |
| "signal": bb_signal, | |
| "strength": bb_strength, | |
| "confidence": bb_confidence, | |
| "position": bb_position | |
| } | |
| # SMA signal | |
| if 'SMA_20' in df.columns and 'SMA_50' in df.columns: | |
| sma_20 = df['SMA_20'].iloc[-1] | |
| sma_50 = df['SMA_50'].iloc[-1] | |
| # Calculate SMA strength based on ratio | |
| sma_ratio = sma_20 / sma_50 if sma_50 != 0 else 1.0 | |
| sma_strength = abs(sma_ratio - 1.0) # 0-1 scale, strongest when ratio differs most from 1 | |
| if sma_20 > sma_50: | |
| sma_signal = "Buy" | |
| sma_confidence = 0.6 | |
| else: | |
| sma_signal = "Sell" | |
| sma_confidence = 0.6 | |
| signals["SMA"] = { | |
| "signal": sma_signal, | |
| "strength": sma_strength, | |
| "confidence": sma_confidence, | |
| "ratio": sma_ratio | |
| } | |
| # Calculate weighted overall signal | |
| buy_signals = [] | |
| sell_signals = [] | |
| for signal_name, signal_data in signals.items(): | |
| # Get strength with default value if not present | |
| strength = signal_data.get("strength", 0.5) # Default strength of 0.5 | |
| confidence = signal_data.get("confidence", 0.5) # Default confidence of 0.5 | |
| if signal_data["signal"] == "Buy": | |
| buy_signals.append(strength * confidence) | |
| elif signal_data["signal"] == "Sell": | |
| sell_signals.append(strength * confidence) | |
| weighted_buy = sum(buy_signals) if buy_signals else 0 | |
| weighted_sell = sum(sell_signals) if sell_signals else 0 | |
| if weighted_buy > weighted_sell: | |
| overall_signal = "Buy" | |
| overall_confidence = weighted_buy / (weighted_buy + weighted_sell) if (weighted_buy + weighted_sell) > 0 else 0 | |
| elif weighted_sell > weighted_buy: | |
| overall_signal = "Sell" | |
| overall_confidence = weighted_sell / (weighted_buy + weighted_sell) if (weighted_buy + weighted_sell) > 0 else 0 | |
| else: | |
| overall_signal = "Hold" | |
| overall_confidence = 0.5 | |
| return { | |
| "signals": signals, | |
| "overall_signal": overall_signal, | |
| "confidence": overall_confidence, | |
| "regime_adjusted": regime_info is not None | |
| } | |
| except Exception as e: | |
| print(f"Advanced trading signals error: {str(e)}") | |
| return {"error": str(e)} | |
| def apply_financial_smoothing(data: np.ndarray, smoothing_type: str = "exponential", | |
| window_size: int = 5, alpha: float = 0.3, | |
| poly_order: int = 3, use_smoothing: bool = True) -> np.ndarray: | |
| """ | |
| Apply financial smoothing algorithms to time series data. | |
| Args: | |
| data (np.ndarray): Input time series data | |
| smoothing_type (str): Type of smoothing to apply | |
| - 'exponential': Exponential moving average (good for trend following) | |
| - 'moving_average': Simple moving average (good for noise reduction) | |
| - 'kalman': Kalman filter (good for adaptive smoothing) | |
| - 'savitzky_golay': Savitzky-Golay filter (good for preserving peaks/valleys) | |
| - 'double_exponential': Double exponential smoothing (good for trend + seasonality) | |
| - 'triple_exponential': Triple exponential smoothing (Holt-Winters, good for complex patterns) | |
| - 'adaptive': Adaptive smoothing based on volatility | |
| - 'none': No smoothing applied | |
| window_size (int): Window size for moving average and Savitzky-Golay | |
| alpha (float): Smoothing factor for exponential methods (0-1) | |
| poly_order (int): Polynomial order for Savitzky-Golay filter | |
| use_smoothing (bool): Whether to apply smoothing | |
| Returns: | |
| np.ndarray: Smoothed data | |
| """ | |
| if not use_smoothing or smoothing_type == "none" or len(data) < 3: | |
| return data | |
| try: | |
| if smoothing_type == "exponential": | |
| # Exponential Moving Average - good for trend following | |
| smoothed = np.zeros_like(data) | |
| smoothed[0] = data[0] | |
| for i in range(1, len(data)): | |
| smoothed[i] = alpha * data[i] + (1 - alpha) * smoothed[i-1] | |
| return smoothed | |
| elif smoothing_type == "moving_average": | |
| # Simple Moving Average - good for noise reduction | |
| if len(data) < window_size: | |
| return data | |
| smoothed = np.zeros_like(data) | |
| # Handle the beginning of the series | |
| for i in range(min(window_size - 1, len(data))): | |
| smoothed[i] = np.mean(data[:i+1]) | |
| # Apply moving average for the rest | |
| for i in range(window_size - 1, len(data)): | |
| smoothed[i] = np.mean(data[i-window_size+1:i+1]) | |
| return smoothed | |
| elif smoothing_type == "kalman": | |
| # Kalman Filter - adaptive smoothing | |
| if len(data) < 2: | |
| return data | |
| # Initialize Kalman filter parameters | |
| Q = 0.01 # Process noise | |
| R = 0.1 # Measurement noise | |
| P = 1.0 # Initial estimate error | |
| x = data[0] # Initial state estimate | |
| smoothed = np.zeros_like(data) | |
| smoothed[0] = x | |
| for i in range(1, len(data)): | |
| # Prediction step | |
| x_pred = x | |
| P_pred = P + Q | |
| # Update step | |
| K = P_pred / (P_pred + R) # Kalman gain | |
| x = x_pred + K * (data[i] - x_pred) | |
| P = (1 - K) * P_pred | |
| smoothed[i] = x | |
| return smoothed | |
| elif smoothing_type == "savitzky_golay": | |
| # Savitzky-Golay filter - preserves peaks and valleys | |
| if len(data) < window_size: | |
| return data | |
| # Ensure window_size is odd | |
| if window_size % 2 == 0: | |
| window_size += 1 | |
| # Ensure polynomial order is less than window_size | |
| if poly_order >= window_size: | |
| poly_order = window_size - 1 | |
| try: | |
| from scipy.signal import savgol_filter | |
| return savgol_filter(data, window_size, poly_order) | |
| except ImportError: | |
| # Fallback to simple moving average if scipy not available | |
| return apply_financial_smoothing(data, "moving_average", window_size) | |
| elif smoothing_type == "double_exponential": | |
| # Double Exponential Smoothing (Holt's method) - trend + level | |
| if len(data) < 3: | |
| return data | |
| smoothed = np.zeros_like(data) | |
| trend = np.zeros_like(data) | |
| # Initialize | |
| smoothed[0] = data[0] | |
| trend[0] = data[1] - data[0] if len(data) > 1 else 0 | |
| # Apply double exponential smoothing | |
| for i in range(1, len(data)): | |
| prev_smoothed = smoothed[i-1] | |
| prev_trend = trend[i-1] | |
| smoothed[i] = alpha * data[i] + (1 - alpha) * (prev_smoothed + prev_trend) | |
| trend[i] = alpha * (smoothed[i] - prev_smoothed) + (1 - alpha) * prev_trend | |
| return smoothed | |
| elif smoothing_type == "triple_exponential": | |
| # Triple Exponential Smoothing (Holt-Winters) - trend + level + seasonality | |
| if len(data) < 6: | |
| return apply_financial_smoothing(data, "double_exponential", window_size, alpha) | |
| # For simplicity, we'll use a seasonal period of 5 (common for financial data) | |
| season_period = min(5, len(data) // 2) | |
| smoothed = np.zeros_like(data) | |
| trend = np.zeros_like(data) | |
| season = np.zeros_like(data) | |
| # Initialize | |
| smoothed[0] = data[0] | |
| trend[0] = (data[season_period] - data[0]) / season_period if len(data) > season_period else 0 | |
| # Initialize seasonal components | |
| for i in range(season_period): | |
| season[i] = data[i] - smoothed[0] | |
| # Apply triple exponential smoothing | |
| for i in range(1, len(data)): | |
| prev_smoothed = smoothed[i-1] | |
| prev_trend = trend[i-1] | |
| prev_season = season[(i-1) % season_period] | |
| smoothed[i] = alpha * (data[i] - prev_season) + (1 - alpha) * (prev_smoothed + prev_trend) | |
| trend[i] = alpha * (smoothed[i] - prev_smoothed) + (1 - alpha) * prev_trend | |
| season[i % season_period] = alpha * (data[i] - smoothed[i]) + (1 - alpha) * prev_season | |
| return smoothed | |
| elif smoothing_type == "adaptive": | |
| # Adaptive smoothing based on volatility | |
| if len(data) < 5: | |
| return data | |
| # Calculate rolling volatility | |
| returns = np.diff(data) / data[:-1] | |
| volatility = np.zeros_like(data) | |
| volatility[0] = np.std(returns) if len(returns) > 0 else 0.01 | |
| for i in range(1, len(data)): | |
| if i < 5: | |
| volatility[i] = np.std(returns[:i]) if i > 0 else 0.01 | |
| else: | |
| volatility[i] = np.std(returns[i-5:i]) | |
| # Normalize volatility to smoothing factor | |
| vol_factor = np.clip(volatility / np.mean(volatility), 0.1, 0.9) | |
| adaptive_alpha = 1 - vol_factor # Higher volatility = less smoothing | |
| # Apply adaptive exponential smoothing | |
| smoothed = np.zeros_like(data) | |
| smoothed[0] = data[0] | |
| for i in range(1, len(data)): | |
| current_alpha = adaptive_alpha[i] | |
| smoothed[i] = current_alpha * data[i] + (1 - current_alpha) * smoothed[i-1] | |
| return smoothed | |
| else: | |
| # Default to exponential smoothing | |
| return apply_financial_smoothing(data, "exponential", window_size, alpha) | |
| except Exception as e: | |
| print(f"Smoothing error: {str(e)}") | |
| return data | |
| def create_interface(): | |
| """Create the Gradio interface with separate tabs for different timeframes""" | |
| # Enhanced title and descriptions | |
| title = """# 🙋🏻♂️Welcome to 🌟Tonic's 🚀 Stock Prediction with 📦Amazon ⌚Chronos | |
| --- | |
| """ | |
| description = """ | |
| The **Advanced Stock Prediction System** is a cutting-edge AI-powered platform with **580M+ parameters**, designed to analyze and predict stock prices across multiple timeframes. Equipped with **Amazon's Chronos foundation model** and **advanced ensemble methods**, it excels in both short-term trading and long-term investment analysis. The system supports **multi-timeframe analysis**, **real-time market monitoring**, and **comprehensive risk assessment**, enhancing its versatility for all types of traders and investors. | |
| ### Key Features | |
| - **Multi-Timeframe Analysis**: Daily (up to 365 days), Hourly (up to 7 days), and 15-minute (up to 3 days) predictions | |
| - **Real-Time Market Status**: Check if markets are open with one-click monitoring | |
| - **Advanced Ensemble Methods**: Combines Chronos, Technical Analysis, and Statistical Models | |
| - **Enhanced Uncertainty Quantification**: Multiple uncertainty calculation methods for robust predictions | |
| - **Market Regime Detection**: Identifies bull, bear, and sideways market conditions | |
| - **Stress Testing**: Scenario analysis under various market conditions | |
| - **Sentiment Analysis**: News sentiment integration for enhanced predictions | |
| ## Supported Markets | |
| - **US Stock Market** (NYSE, NASDAQ, AMEX): 9:30 AM - 4:00 PM ET | |
| - **European Markets** (London, Frankfurt, Paris): 8:00 AM - 4:30 PM GMT | |
| - **Asian Markets** (Tokyo, Hong Kong, Shanghai): 9:00 AM - 3:30 PM JST | |
| - **Forex Market** (24/7 Global Currency Exchange) | |
| - **Cryptocurrency Market** (24/7 Bitcoin, Ethereum, Altcoins) | |
| - **US Futures Market** (24/7 CME, ICE, CBOT) | |
| - **Commodities Market** (24/7 Gold, Silver, Oil, Natural Gas) | |
| """ | |
| model_info = """ | |
| ## How to Use | |
| 1. **Market Status Check**: Use the dropdown to check if markets are open | |
| 2. **Select Analysis Type**: Choose from Daily, Hourly, or 15-minute analysis | |
| 3. **Enter Stock Symbol**: Input any valid stock symbol (e.g., AAPL, TSLA, GOOGL) | |
| 4. **Configure Parameters**: Adjust prediction days, lookback period, and advanced settings | |
| 5. **Click Analyze**: View comprehensive predictions with uncertainty estimates | |
| ## Model Information | |
| - **Core Model**: Amazon Chronos T5 Foundation Model | |
| - **Ensemble Methods**: Random Forest, Gradient Boosting, SVR, Neural Networks | |
| - **Technical Indicators**: RSI, MACD, Bollinger Bands, Moving Averages | |
| - **Risk Metrics**: Sharpe Ratio, VaR, Maximum Drawdown, Beta | |
| - **Data Sources**: Yahoo Finance, Market Indices, Economic Indicators | |
| - **Environment**: PyTorch 2.1.2 + CUDA Support + Advanced ML Libraries | |
| """ | |
| join_us = """ | |
| ## Join the Community | |
| 🌟 **Advanced Stock Prediction** is continuously evolving! Join our active builder's community 👻 | |
| [](https://discord.gg/qdfnvSPcqP) | |
| [](https://huggingface.co/TeamTonic) | |
| [](https://github.com/Tonic-AI) | |
| 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 | |
| """ | |
| with gr.Blocks(title="Advanced Stock Prediction Analysis", theme=gr.themes.Base()) as demo: | |
| with gr.Row(): | |
| gr.Markdown(title) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown(description) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown(model_info) | |
| gr.Markdown(join_us) | |
| gr.Markdown("---") # Add a separator | |
| # System Monitor (CPU, RAM, GPU, Process RSS) | |
| with gr.Accordion("🖥️ System Monitor (CPU & RAM)", open=False): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| system_monitor_md_left = gr.Markdown( | |
| value="Loading system metrics...", | |
| label="System Metrics (Left)" | |
| ) | |
| with gr.Column(scale=1): | |
| system_monitor_md_right = gr.Markdown( | |
| value="Loading GPU/Process metrics...", | |
| label="System Metrics (Right)" | |
| ) | |
| with gr.Row(): | |
| refresh_system_btn = gr.Button("🔄 Refresh", variant="secondary") | |
| # Add comprehensive market information section with nested accordions | |
| with gr.Accordion("🌎 Global Market Information", open=False): | |
| # Quick Market Status Check Section | |
| with gr.Accordion("📊 Quick Market Status Check", open=False): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📊 Quick Market Status Check") | |
| # Create user-friendly market choices | |
| market_choices = [(config['name'], key) for key, config in MARKET_CONFIGS.items()] | |
| market_dropdown = gr.Dropdown( | |
| choices=market_choices, | |
| label="Select Market", | |
| value="US_STOCKS", | |
| info="Choose a market to check its current status" | |
| ) | |
| check_market_btn = gr.Button("🔍 Check Market Status", variant="primary") | |
| with gr.Column(scale=2): | |
| market_status_result = gr.Markdown( | |
| value="Select a market and click 'Check Market Status' to see current trading status.", | |
| label="Market Status Result" | |
| ) | |
| # Enhanced Market Information Section | |
| with gr.Accordion("🌍 Enhanced Market Information", open=False): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Market Summary") | |
| market_summary_display = gr.JSON(label="Market Summary", value={}) | |
| def update_market_summary(): | |
| """Update market summary with enhanced yfinance data""" | |
| return get_enhanced_market_summary() | |
| update_market_summary_btn = gr.Button("🔄 Update Market Summary") | |
| update_market_summary_btn.click( | |
| fn=update_market_summary, | |
| outputs=[market_summary_display] | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### Market Types Supported") | |
| gr.Markdown(""" | |
| **📈 Stock Markets:** | |
| - **US Stocks** (NYSE, NASDAQ, AMEX): 9:30 AM - 4:00 PM ET | |
| - **European Markets** (London, Frankfurt, Paris): 8:00 AM - 4:30 PM GMT | |
| - **Asian Markets** (Tokyo, Hong Kong, Shanghai): 9:00 AM - 3:30 PM JST | |
| **📊 24/7 Markets:** | |
| - **Forex** (Global Currency Exchange): 24/7 trading | |
| - **Cryptocurrency** (Bitcoin, Ethereum, Altcoins): 24/7 trading | |
| - **US Futures** (CME, ICE, CBOT): 24/7 trading | |
| - **Commodities** (Gold, Silver, Oil, Natural Gas): 24/7 trading | |
| **💡 Features:** | |
| - Real-time market status updates every 10 minutes | |
| - Timezone-aware calculations | |
| - Market-specific trading hours | |
| - Enhanced data from yfinance API | |
| """) | |
| # Periodic system monitor updater | |
| def _format_bytes(num_bytes: int) -> str: | |
| units = ["B", "KB", "MB", "GB", "TB"] | |
| size = float(num_bytes) | |
| unit_idx = 0 | |
| while size >= 1024 and unit_idx < len(units) - 1: | |
| size /= 1024.0 | |
| unit_idx += 1 | |
| return f"{size:.1f} {units[unit_idx]}" | |
| def update_system_monitor() -> Tuple[str, str]: | |
| try: | |
| # System-wide CPU and RAM - use short sampling; fallback if zero | |
| cpu_percent = psutil.cpu_percent(interval=0.3) | |
| if cpu_percent == 0.0: | |
| cpu_percent = psutil.cpu_percent(interval=1.0) | |
| vm = psutil.virtual_memory() | |
| ram_used = _format_bytes(vm.used) | |
| ram_total = _format_bytes(vm.total) | |
| ram_percent = vm.percent | |
| # Current process memory | |
| proc = psutil.Process() | |
| rss = _format_bytes(proc.memory_info().rss) | |
| # CPU name | |
| cpu_name = platform.processor() or platform.uname().processor or "Nebula Processor" | |
| # GPU name and utilization (best effort) | |
| gpu_name = "No GPU" | |
| gpu_usage = "%" | |
| try: | |
| import pynvml as nv | |
| try: | |
| nv.nvmlInit() | |
| handle = nv.nvmlDeviceGetHandleByIndex(0) | |
| name_bytes = nv.nvmlDeviceGetName(handle) | |
| gpu_name = name_bytes.decode() if isinstance(name_bytes, bytes) else str(name_bytes) | |
| util = nv.nvmlDeviceGetUtilizationRates(handle) | |
| gpu_usage = f"{getattr(util, 'gpu', 0)}%" | |
| finally: | |
| try: | |
| nv.nvmlShutdown() | |
| except Exception: | |
| pass | |
| except Exception: | |
| try: | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| except Exception: | |
| pass | |
| md_left = f""" | |
| ### CPU ({cpu_name}) | |
| **Usage:** {cpu_percent:.1f}% | |
| ### RAM | |
| **Usage:** {ram_percent:.1f}% ({ram_used} / {ram_total}) | |
| """ | |
| md_right = f""" | |
| ### GPU ({gpu_name}) | |
| **Usage:** {gpu_usage} | |
| ### Process | |
| Memory (RSS): {rss} | |
| """ | |
| return md_left, md_right | |
| except Exception as e: | |
| err = f"Error reading system metrics: {str(e)}" | |
| return err, err | |
| # Populate once on load | |
| demo.load(fn=update_system_monitor, outputs=[system_monitor_md_left, system_monitor_md_right]) | |
| # Auto-refresh system monitor every 30 seconds using Timer | |
| auto_refresh_timer = gr.Timer(30.0) | |
| auto_refresh_timer.tick(fn=update_system_monitor, outputs=[system_monitor_md_left, system_monitor_md_right]) | |
| # Connect manual refresh button | |
| refresh_system_btn.click(fn=update_system_monitor, outputs=[system_monitor_md_left, system_monitor_md_right]) | |
| # Connect the button to the function | |
| check_market_btn.click( | |
| fn=check_market_status_simple, | |
| inputs=[market_dropdown], | |
| outputs=[market_status_result] | |
| ) | |
| # Advanced Settings Accordion | |
| with gr.Accordion("Advanced Settings", open=False): | |
| with gr.Row(): | |
| with gr.Column(): | |
| use_ensemble = gr.Checkbox(label="Use Ensemble Methods", value=True) | |
| use_regime_detection = gr.Checkbox(label="Use Regime Detection", value=True) | |
| use_stress_testing = gr.Checkbox(label="Use Stress Testing", value=True) | |
| use_covariates = gr.Checkbox(label="Use Enhanced Covariate Data", value=True, | |
| info="Include market indices, sectors, and economic indicators") | |
| use_sentiment = gr.Checkbox(label="Use Sentiment Analysis", value=True, | |
| info="Include news sentiment analysis") | |
| use_smoothing = gr.Checkbox(label="Use Smoothing", value=True) | |
| smoothing_type = gr.Dropdown( | |
| choices=["exponential", "moving_average", "kalman", "savitzky_golay", | |
| "double_exponential", "triple_exponential", "adaptive", "none"], | |
| label="Smoothing Type", | |
| value="exponential", | |
| info="""Smoothing algorithms: | |
| • Exponential: Trend following (default) | |
| • Moving Average: Noise reduction | |
| • Kalman: Adaptive smoothing | |
| • Savitzky-Golay: Preserves peaks/valleys | |
| • Double Exponential: Trend + level | |
| • Triple Exponential: Complex patterns | |
| • Adaptive: Volatility-based | |
| • None: No smoothing""" | |
| ) | |
| smoothing_window = gr.Slider( | |
| minimum=3, | |
| maximum=21, | |
| value=5, | |
| step=1, | |
| label="Smoothing Window Size", | |
| info="Window size for moving average and Savitzky-Golay filters" | |
| ) | |
| smoothing_alpha = gr.Slider( | |
| minimum=0.1, | |
| maximum=0.9, | |
| value=0.3, | |
| step=0.05, | |
| label="Smoothing Alpha", | |
| info="Smoothing factor for exponential methods (0.1-0.9)" | |
| ) | |
| risk_free_rate = gr.Slider( | |
| minimum=0.0, | |
| maximum=0.1, | |
| value=0.02, | |
| step=0.001, | |
| label="Risk-Free Rate (Annual)" | |
| ) | |
| market_index = gr.Dropdown( | |
| choices=["^GSPC", "^DJI", "^IXIC", "^RUT"], | |
| label="Market Index for Correlation", | |
| value="^GSPC" | |
| ) | |
| random_real_points = gr.Slider( | |
| minimum=0, | |
| maximum=16, | |
| value=4, | |
| step=1, | |
| label="Random Real Points in Long-Horizon Context" | |
| ) | |
| high_accuracy = gr.Checkbox(label="BOOST Accuracy Mode", value=True) | |
| accuracy_boost_level = gr.Slider( | |
| minimum=1, | |
| maximum=3, | |
| value=3, | |
| step=1, | |
| label="Accuracy Boost Level (1-3)", | |
| info="More runs improve accuracy, but it also requires more time." | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### Ensemble Weights") | |
| chronos_weight = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.6, | |
| step=0.1, | |
| label="Chronos Weight" | |
| ) | |
| technical_weight = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.2, | |
| step=0.1, | |
| label="Technical Weight" | |
| ) | |
| statistical_weight = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.2, | |
| step=0.1, | |
| label="Statistical Weight" | |
| ) | |
| gr.Markdown("### Enhanced Features") | |
| gr.Markdown(""" | |
| **New Enhanced Features:** | |
| - **Covariate Data**: Market indices, sector ETFs, commodities, currencies | |
| - **Sentiment Analysis**: News sentiment scoring and confidence | |
| - **Advanced Uncertainty**: Multiple uncertainty calculation methods | |
| - **Enhanced Volume Prediction**: Price-volume relationship modeling | |
| - **Regime-Aware Uncertainty**: Market condition adjustments | |
| - **Multi-Algorithm Ensemble**: Random Forest, Gradient Boosting, SVR, Neural Networks | |
| - **Real-Time Market Status**: Updates every 10 minutes with detailed timing information | |
| """) | |
| with gr.Tabs() as tabs: | |
| # Daily Analysis Tab | |
| with gr.TabItem("Daily Analysis"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| daily_symbol = gr.Textbox(label="Stock Symbol (e.g., AAPL)", value="AAPL") | |
| daily_prediction_days = gr.Slider( | |
| minimum=1, | |
| maximum=365, | |
| value=30, | |
| step=1, | |
| label="Days to Predict" | |
| ) | |
| daily_lookback_days = gr.Slider( | |
| minimum=1, | |
| maximum=2190, | |
| value=1825, | |
| step=5, | |
| label="Historical Lookback (Days)" | |
| ) | |
| daily_strategy = gr.Dropdown( | |
| choices=["chronos", "technical"], | |
| label="Prediction Strategy", | |
| value="chronos" | |
| ) | |
| daily_predict_btn = gr.Button("Analyze Stock") | |
| gr.Markdown(""" | |
| **Daily Analysis Features:** | |
| - **Extended Data Range**: Up to 10 years of historical data (3650 days) | |
| - **24/7 Availability**: Available regardless of market hours | |
| - **Auto-Adjusted Data**: Automatically adjusted for splits and dividends | |
| - **Comprehensive Financial Ratios**: P/E, PEG, Price-to-Book, Price-to-Sales, and more | |
| - **Advanced Risk Metrics**: Sharpe ratio, VaR, drawdown analysis, market correlation | |
| - **Market Regime Detection**: Identifies bull/bear/sideways market conditions | |
| - **Stress Testing**: Scenario analysis under various market conditions | |
| - **Ensemble Methods**: Combines multiple prediction models for improved accuracy | |
| - **Maximum prediction period**: 365 days | |
| - **Ideal for**: Medium to long-term investment analysis, portfolio management, and strategic planning | |
| - **Technical Indicators**: RSI, MACD, Bollinger Bands, moving averages optimized for daily data | |
| - **Volume Analysis**: Average daily volume, volume volatility, and liquidity metrics | |
| - **Sector Analysis**: Industry classification, market cap ranking, and sector-specific metrics | |
| """) | |
| with gr.Column(): | |
| daily_plot = gr.Plot(label="Analysis and Prediction") | |
| daily_historical_json = gr.JSON(label="Historical Data") | |
| daily_predicted_json = gr.JSON(label="Predicted Data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Structured Product Metrics") | |
| daily_metrics = gr.JSON(label="Product Metrics") | |
| gr.Markdown("### Advanced Risk Analysis") | |
| daily_risk_metrics = gr.JSON(label="Risk Metrics") | |
| gr.Markdown("### Market Regime Analysis") | |
| daily_regime_metrics = gr.JSON(label="Regime Metrics") | |
| gr.Markdown("### Trading Signals") | |
| daily_signals = gr.JSON(label="Trading Signals") | |
| gr.Markdown("### Advanced Trading Signals") | |
| daily_signals_advanced = gr.JSON(label="Advanced Trading Signals") | |
| with gr.Column(): | |
| gr.Markdown("### Sector & Financial Analysis") | |
| daily_sector_metrics = gr.JSON(label="Sector Metrics") | |
| gr.Markdown("### Stress Test Results") | |
| daily_stress_results = gr.JSON(label="Stress Test Results") | |
| gr.Markdown("### Ensemble Analysis") | |
| daily_ensemble_metrics = gr.JSON(label="Ensemble Metrics") | |
| # Hourly Analysis Tab | |
| with gr.TabItem("Hourly Analysis"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| hourly_symbol = gr.Textbox(label="Stock Symbol (e.g., AAPL)", value="AAPL") | |
| hourly_prediction_days = gr.Slider( | |
| minimum=1, | |
| maximum=365, # Limited to 365 days for hourly predictions | |
| value=3, | |
| step=1, | |
| label="Days to Predict" | |
| ) | |
| hourly_lookback_days = gr.Slider( | |
| minimum=1, | |
| maximum=2190, # Enhanced to 2190 days (6 years) for hourly data | |
| value=14, | |
| step=2, | |
| label="Historical Lookback (Days)" | |
| ) | |
| hourly_strategy = gr.Dropdown( | |
| choices=["chronos", "technical"], | |
| label="Prediction Strategy", | |
| value="chronos" | |
| ) | |
| hourly_predict_btn = gr.Button("Analyze Stock") | |
| gr.Markdown(""" | |
| **Hourly Analysis Features:** | |
| - **Extended Data Range**: Up to 60 days of historical data | |
| - **Pre/Post Market Data**: Includes extended hours trading data | |
| - **Auto-Adjusted Data**: Automatically adjusted for splits and dividends | |
| - **Metrics**: Intraday volatility, volume analysis, and momentum indicators | |
| - **Comprehensive Financial Ratios**: P/E, PEG, Price-to-Book, and more | |
| - **Maximum prediction period**: 7 days | |
| - **Data available during market hours only** | |
| """) | |
| with gr.Column(): | |
| hourly_plot = gr.Plot(label="Analysis and Prediction") | |
| hourly_signals = gr.JSON(label="Trading Signals") | |
| hourly_historical_json = gr.JSON(label="Historical Data") | |
| hourly_predicted_json = gr.JSON(label="Predicted Data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Structured Product Metrics") | |
| hourly_metrics = gr.JSON(label="Product Metrics") | |
| gr.Markdown("### Advanced Risk Analysis") | |
| hourly_risk_metrics = gr.JSON(label="Risk Metrics") | |
| gr.Markdown("### Market Regime Analysis") | |
| hourly_regime_metrics = gr.JSON(label="Regime Metrics") | |
| gr.Markdown("### Trading Signals") | |
| hourly_signals_advanced = gr.JSON(label="Advanced Trading Signals") | |
| with gr.Column(): | |
| gr.Markdown("### Sector & Financial Analysis") | |
| hourly_sector_metrics = gr.JSON(label="Sector Metrics") | |
| gr.Markdown("### Stress Test Results") | |
| hourly_stress_results = gr.JSON(label="Stress Test Results") | |
| gr.Markdown("### Ensemble Analysis") | |
| hourly_ensemble_metrics = gr.JSON(label="Ensemble Metrics") | |
| # 15-Minute Analysis Tab | |
| with gr.TabItem("15-Minute Analysis"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| min15_symbol = gr.Textbox(label="Stock Symbol (e.g., AAPL)", value="AAPL") | |
| min15_prediction_days = gr.Slider( | |
| minimum=1, | |
| maximum=365, # Limited to 2 days for 15-minute predictions | |
| value=1, | |
| step=1, | |
| label="Days to Predict" | |
| ) | |
| min15_lookback_days = gr.Slider( | |
| minimum=1, | |
| maximum=2190, # 7 days for 15-minute data | |
| value=3, | |
| step=2, | |
| label="Historical Lookback (Days)" | |
| ) | |
| min15_strategy = gr.Dropdown( | |
| choices=["chronos", "technical"], | |
| label="Prediction Strategy", | |
| value="chronos" | |
| ) | |
| min15_predict_btn = gr.Button("Analyze Stock") | |
| gr.Markdown(""" | |
| **15-Minute Analysis Features:** | |
| - **Data Range**: Up to 7 days of historical data (vs 5 days previously) | |
| - **High-Frequency Metrics**: Intraday volatility, volume-price trends, momentum analysis | |
| - **Pre/Post Market Data**: Includes extended hours trading data | |
| - **Auto-Adjusted Data**: Automatically adjusted for splits and dividends | |
| - **Enhanced Technical Indicators**: Optimized for short-term trading | |
| - **Maximum prediction period**: 2 days | |
| - **Requires at least 64 data points for Chronos predictions** | |
| - **Data available during market hours only** | |
| """) | |
| with gr.Column(): | |
| min15_plot = gr.Plot(label="Analysis and Prediction") | |
| min15_signals = gr.JSON(label="Trading Signals") | |
| min15_historical_json = gr.JSON(label="Historical Data") | |
| min15_predicted_json = gr.JSON(label="Predicted Data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Structured Product Metrics") | |
| min15_metrics = gr.JSON(label="Product Metrics") | |
| gr.Markdown("### Advanced Risk Analysis") | |
| min15_risk_metrics = gr.JSON(label="Risk Metrics") | |
| gr.Markdown("### Market Regime Analysis") | |
| min15_regime_metrics = gr.JSON(label="Regime Metrics") | |
| gr.Markdown("### Trading Signals") | |
| min15_signals_advanced = gr.JSON(label="Advanced Trading Signals") | |
| with gr.Column(): | |
| gr.Markdown("### Sector & Financial Analysis") | |
| min15_sector_metrics = gr.JSON(label="Sector Metrics") | |
| gr.Markdown("### Stress Test Results") | |
| min15_stress_results = gr.JSON(label="Stress Test Results") | |
| gr.Markdown("### Ensemble Analysis") | |
| min15_ensemble_metrics = gr.JSON(label="Ensemble Metrics") | |
| def analyze_stock(symbol, timeframe, prediction_days, lookback_days, strategy, | |
| use_ensemble, use_regime_detection, use_stress_testing, | |
| risk_free_rate, market_index, chronos_weight, technical_weight, statistical_weight, | |
| random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha, | |
| high_accuracy, accuracy_boost_level, | |
| use_covariates=True, use_sentiment=True): | |
| try: | |
| # Create ensemble weights | |
| ensemble_weights = { | |
| "chronos": chronos_weight, | |
| "technical": technical_weight, | |
| "statistical": statistical_weight | |
| } | |
| # Get market data for correlation analysis | |
| market_df = get_market_data(market_index, lookback_days) | |
| market_returns = market_df['Returns'] if not market_df.empty else None | |
| # Make prediction with enhanced features | |
| signals, fig = make_prediction_enhanced( | |
| symbol=symbol, | |
| timeframe=timeframe, | |
| prediction_days=prediction_days, | |
| strategy=strategy, | |
| use_ensemble=use_ensemble, | |
| use_regime_detection=use_regime_detection, | |
| use_stress_testing=use_stress_testing, | |
| risk_free_rate=risk_free_rate, | |
| ensemble_weights=ensemble_weights, | |
| market_index=market_index, | |
| use_covariates=use_covariates, | |
| use_sentiment=use_sentiment, | |
| random_real_points=random_real_points, | |
| use_smoothing=use_smoothing, | |
| smoothing_type=smoothing_type, | |
| smoothing_window=smoothing_window, | |
| smoothing_alpha=smoothing_alpha, | |
| high_accuracy=high_accuracy, | |
| accuracy_boost_level=accuracy_boost_level | |
| ) | |
| # Get historical data for additional metrics | |
| df = get_historical_data(symbol, timeframe, lookback_days) | |
| # Fetch fundamental data from yfinance info property | |
| fundamentals = get_fundamental_data(symbol) | |
| # Calculate structured product metrics using fundamentals and price data | |
| # Resolve average daily volume with robust fallbacks | |
| avg_daily_volume = ( | |
| fundamentals.get("averageDailyVolume3Month") | |
| or fundamentals.get("averageDailyVolume10Day") | |
| or fundamentals.get("averageVolume10days") | |
| ) | |
| if (avg_daily_volume is None) and ('Volume' in df.columns): | |
| try: | |
| avg_daily_volume = float(pd.to_numeric(df['Volume'], errors='coerce').dropna().mean()) | |
| except Exception: | |
| avg_daily_volume = None | |
| product_metrics = { | |
| "Market_Cap": fundamentals.get("marketCap"), | |
| "Sector": fundamentals.get("sector"), | |
| "Industry": fundamentals.get("industry"), | |
| "Dividend_Yield": fundamentals.get("dividendYield"), | |
| "Avg_Daily_Volume": avg_daily_volume, | |
| "Volume_Volatility": df['Volume'].rolling(window=20, min_periods=1).std().iloc[-1] if 'Volume' in df.columns else None, | |
| "Enterprise_Value": fundamentals.get("enterpriseValue"), | |
| "P/E_Ratio": fundamentals.get("trailingPE"), | |
| "Forward_P/E": fundamentals.get("forwardPE"), | |
| "PEG_Ratio": fundamentals.get("pegRatio"), | |
| "Price_to_Book": fundamentals.get("priceToBook"), | |
| "Price_to_Sales": fundamentals.get("priceToSalesTrailing12Months"), | |
| } | |
| # Calculate advanced risk metrics | |
| risk_metrics = calculate_advanced_risk_metrics(df, market_returns, risk_free_rate) | |
| # Calculate sector metrics using fundamentals | |
| sector_metrics = { | |
| "Sector": fundamentals.get("sector"), | |
| "Industry": fundamentals.get("industry"), | |
| "Market_Cap_Rank": "Large" if fundamentals.get("marketCap", 0) > 1e10 else "Mid" if fundamentals.get("marketCap", 0) > 1e9 else "Small", | |
| "Liquidity_Score": "High" if fundamentals.get("averageDailyVolume", 0) > 1e6 else "Medium" if fundamentals.get("averageDailyVolume", 0) > 1e5 else "Low", | |
| "Gross_Margin": fundamentals.get("grossMargins"), | |
| "Operating_Margin": fundamentals.get("operatingMargins"), | |
| "Net_Margin": fundamentals.get("netMargins"), | |
| } | |
| # Add enhanced features information | |
| enhanced_metrics = { | |
| "covariate_data_used": signals.get("covariate_data_available", False), | |
| "sentiment_analysis_used": use_sentiment, | |
| "advanced_uncertainty_methods": list(signals.get("advanced_uncertainties", {}).keys()), | |
| "regime_aware_uncertainty": use_regime_detection, | |
| "enhanced_volume_prediction": signals.get("prediction", {}).get("volume") is not None | |
| } | |
| # Add intraday-specific metrics for shorter timeframes | |
| if timeframe in ["1h", "15m"]: | |
| intraday_metrics = { | |
| "Intraday_Volatility": df['Intraday_Volatility'].iloc[-1] if 'Intraday_Volatility' in df.columns else 0, | |
| "Volume_Ratio": df['Volume_Ratio'].iloc[-1] if 'Volume_Ratio' in df.columns else 0, | |
| "Price_Momentum": df['Price_Momentum'].iloc[-1] if 'Price_Momentum' in df.columns else 0, | |
| "Volume_Momentum": df['Volume_Momentum'].iloc[-1] if 'Volume_Momentum' in df.columns else 0, | |
| "Volume_Price_Trend": df['Volume_Price_Trend'].iloc[-1] if 'Volume_Price_Trend' in df.columns else 0 | |
| } | |
| product_metrics.update(intraday_metrics) | |
| # Extract regime and stress test information | |
| regime_metrics = signals.get("regime_info", {}) | |
| stress_results = signals.get("stress_test_results", {}) | |
| ensemble_metrics = { | |
| "ensemble_used": signals.get("ensemble_used", False), | |
| "ensemble_weights": ensemble_weights, | |
| "enhanced_features": enhanced_metrics | |
| } | |
| # Separate basic and advanced signals | |
| basic_signals = { | |
| "RSI": signals.get("RSI", "Neutral"), | |
| "MACD": signals.get("MACD", "Hold"), | |
| "Bollinger": signals.get("Bollinger", "Hold"), | |
| "SMA": signals.get("SMA", "Hold"), | |
| "Overall": signals.get("Overall", "Hold"), | |
| "symbol": signals.get("symbol", symbol), | |
| "timeframe": signals.get("timeframe", timeframe), | |
| "strategy_used": signals.get("strategy_used", strategy) | |
| } | |
| advanced_signals = signals.get("advanced_signals", {}) | |
| # In analyze_stock, extract historical and predicted values for UI | |
| historical = signals.get('historical', {}) | |
| predicted = signals.get('prediction', {}) | |
| return basic_signals, fig, product_metrics, risk_metrics, sector_metrics, regime_metrics, stress_results, ensemble_metrics, advanced_signals, historical, predicted | |
| except Exception as e: | |
| error_message = str(e) | |
| if "Market is currently closed" in error_message: | |
| error_message = f"{error_message}. Please try again during market hours or use daily timeframe." | |
| elif "Insufficient data points" in error_message: | |
| error_message = f"Not enough data available for {symbol} in {timeframe} timeframe. Please try a different timeframe or symbol." | |
| elif "no price data found" in error_message: | |
| error_message = f"No data available for {symbol} in {timeframe} timeframe. Please try a different timeframe or symbol." | |
| raise gr.Error(error_message) | |
| # Daily analysis button click | |
| def daily_analysis(s: str, pd: int, ld: int, st: str, ue: bool, urd: bool, ust: bool, | |
| rfr: float, mi: str, cw: float, tw: float, sw: float, | |
| rrp: int, usm: bool, smt: str, sww: float, sa: float, | |
| ha: bool, abl: float, | |
| uc: bool, us: bool) -> Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: | |
| """ | |
| Process daily timeframe stock analysis with enhanced features. | |
| This function performs comprehensive stock analysis using daily data with support for | |
| multiple prediction strategies, ensemble methods, regime detection, stress testing, | |
| covariate data, and sentiment analysis. It's designed for medium to long-term investment | |
| analysis with up to 365 days of prediction. | |
| Args: | |
| s (str): Stock Symbol (e.g., AAPL) - The input value from the "Stock Symbol" Textbox component | |
| pd (int): Days to Predict - The input value from the "Days to Predict" Slider component (1-365) | |
| ld (int): Historical Lookback (Days) - The input value from the "Historical Lookback (Days)" Slider component (1-3650) | |
| st (str): Prediction Strategy - The input value from the "Prediction Strategy" Dropdown component | |
| Options: "chronos" (uses Amazon's Chronos T5 model) or "technical" (traditional technical analysis) | |
| ue (bool): Use Ensemble Methods - The input value from the "Use Ensemble Methods" Checkbox component | |
| When True, combines multiple prediction models for improved accuracy | |
| urd (bool): Use Regime Detection - The input value from the "Use Regime Detection" Checkbox component | |
| When True, detects market regimes (bull/bear/sideways) to adjust predictions | |
| ust (bool): Use Stress Testing - The input value from the "Use Stress Testing" Checkbox component | |
| When True, performs scenario analysis under various market conditions | |
| rfr (float): Risk-Free Rate (Annual) - The input value from the "Risk-Free Rate (Annual)" Slider component (0.0-0.1) | |
| Annual risk-free rate used for risk-adjusted return calculations | |
| mi (str): Market Index for Correlation - The input value from the "Market Index for Correlation" Dropdown component | |
| Options: "^GSPC" (S&P 500), "^DJI" (Dow Jones), "^IXIC" (NASDAQ), "^RUT" (Russell 2000) | |
| cw (float): Chronos Weight - The input value from the "Chronos Weight" Slider component (0.0-1.0) | |
| Weight given to Chronos model predictions in ensemble methods | |
| tw (float): Technical Weight - The input value from the "Technical Weight" Slider component (0.0-1.0) | |
| Weight given to technical analysis predictions in ensemble methods | |
| sw (float): Statistical Weight - The input value from the "Statistical Weight" Slider component (0.0-1.0) | |
| Weight given to statistical model predictions in ensemble methods | |
| rrp (int): Random Real Points in Long-Horizon Context - The input value from the "Random Real Points in Long-Horizon Context" Slider component | |
| Number of random real points to include in long-horizon context for improved predictions | |
| usm (bool): Use Smoothing - The input value from the "Use Smoothing" Checkbox component | |
| When True, applies smoothing to predictions to reduce noise and improve continuity | |
| smt (str): Smoothing Type - The input value from the "Smoothing Type" Dropdown component | |
| Options: "exponential", "moving_average", "kalman", "savitzky_golay", "double_exponential", "triple_exponential", "adaptive", "none" | |
| sww (float): Smoothing Window Size - The input value from the "Smoothing Window Size" Slider component | |
| Window size for moving average and Savitzky-Golay smoothing methods | |
| sa (float): Smoothing Alpha - The input value from the "Smoothing Alpha" Slider component (0.1-0.9) | |
| Alpha parameter for exponential smoothing methods | |
| uc (bool): Use Enhanced Covariate Data - The input value from the "Use Enhanced Covariate Data" Checkbox component | |
| When True, includes market indices, sectors, and economic indicators in analysis | |
| us (bool): Use Sentiment Analysis - The input value from the "Use Sentiment Analysis" Checkbox component | |
| When True, includes news sentiment analysis in the prediction model | |
| Returns: | |
| Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: Analysis results containing: | |
| [0] Dict: Trading Signals - Output value for the "Trading Signals" Json component | |
| Contains RSI, MACD, Bollinger Bands, SMA, and overall trading signals | |
| [1] go.Figure: Analysis and Prediction - Output value for the "Analysis and Prediction" Plot component | |
| Interactive plot with historical data, predictions, and confidence intervals | |
| [2] Dict: Product Metrics - Output value for the "Product Metrics" Json component | |
| Structured product metrics including Market Cap, P/E ratios, and financial ratios | |
| [3] Dict: Risk Metrics - Output value for the "Risk Metrics" Json component | |
| Advanced risk metrics including Sharpe ratio, VaR, drawdown, and correlation analysis | |
| [4] Dict: Sector Metrics - Output value for the "Sector Metrics" Json component | |
| Sector and industry analysis metrics | |
| [5] Dict: Regime Metrics - Output value for the "Regime Metrics" Json component | |
| Market regime detection results and analysis | |
| [6] Dict: Stress Test Results - Output value for the "Stress Test Results" Json component | |
| Stress testing scenario results under various market conditions | |
| [7] Dict: Ensemble Metrics - Output value for the "Ensemble Metrics" Json component | |
| Ensemble method configuration and performance results | |
| [8] Dict: Advanced Trading Signals - Output value for the "Advanced Trading Signals" Json component | |
| Advanced trading signals with confidence levels and sophisticated indicators | |
| [9] Dict: Historical Data - Output value for the "Historical Data" Json component | |
| Historical data for the selected stock | |
| [10] Dict: Predicted Data - Output value for the "Predicted Data" Json component | |
| Predicted data for the selected stock | |
| Raises: | |
| gr.Error: If data cannot be fetched, insufficient data points, or other analysis errors | |
| Common errors include invalid symbols, market closure, or insufficient historical data | |
| Example: | |
| >>> signals, plot, metrics, risk, sector, regime, stress, ensemble, advanced, historical, predicted = daily_analysis( | |
| ... "AAPL", 30, 365, "chronos", True, True, True, 0.02, "^GSPC", 0.6, 0.2, 0.2, 4, True, "exponential", 5, 0.3, True, True | |
| ... ) | |
| Notes: | |
| - Daily analysis is available 24/7 regardless of market hours | |
| - Maximum prediction period is 365 days | |
| - Historical data can go back up to 10 years (3650 days) | |
| - Ensemble weights (cw + tw + sw) should sum to 1.0 for optimal results | |
| - Risk-free rate is typically between 0.02-0.05 (2-5% annually) | |
| - Smoothing helps reduce prediction noise but may reduce responsiveness to sudden changes | |
| - Enhanced covariate data includes market indices, sector ETFs, commodities, and currencies | |
| - Sentiment analysis provides news sentiment scoring and confidence levels | |
| """ | |
| return analyze_stock(s, "1d", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, ha, abl, uc, us) | |
| daily_predict_btn.click( | |
| fn=daily_analysis, | |
| inputs=[daily_symbol, daily_prediction_days, daily_lookback_days, daily_strategy, | |
| use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index, | |
| chronos_weight, technical_weight, statistical_weight, | |
| random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha, | |
| high_accuracy, accuracy_boost_level, | |
| use_covariates, use_sentiment], | |
| outputs=[daily_signals, daily_plot, daily_metrics, daily_risk_metrics, daily_sector_metrics, | |
| daily_regime_metrics, daily_stress_results, daily_ensemble_metrics, daily_signals_advanced, daily_historical_json, daily_predicted_json] | |
| ) | |
| # Hourly analysis button click | |
| def hourly_analysis(s: str, pd: int, ld: int, st: str, ue: bool, urd: bool, ust: bool, | |
| rfr: float, mi: str, cw: float, tw: float, sw: float, | |
| rrp: int, usm: bool, smt: str, sww: float, sa: float, | |
| ha: bool, abl: float, | |
| uc: bool, us: bool) -> Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: | |
| """ | |
| Process hourly timeframe stock analysis with enhanced features. | |
| This function performs high-frequency stock analysis using hourly data, ideal for | |
| short to medium-term trading strategies. It includes intraday volatility analysis, | |
| volume-price trends, momentum indicators, covariate data, and sentiment analysis | |
| optimized for hourly timeframes. | |
| Args: | |
| s (str): Stock Symbol (e.g., AAPL) - The input value from the "Stock Symbol" Textbox component | |
| pd (int): Days to Predict - The input value from the "Days to Predict" Slider component (1-7) | |
| Limited to 7 days due to Yahoo Finance hourly data constraints | |
| ld (int): Historical Lookback (Days) - The input value from the "Historical Lookback (Days)" Slider component (1-60) | |
| Enhanced to 60 days for hourly data (vs standard 30 days) | |
| st (str): Prediction Strategy - The input value from the "Prediction Strategy" Dropdown component | |
| Options: "chronos" (uses Amazon's Chronos T5 model optimized for hourly data) or "technical" (technical indicators adjusted for hourly timeframes) | |
| ue (bool): Use Ensemble Methods - The input value from the "Use Ensemble Methods" Checkbox component | |
| Combines multiple models for improved short-term prediction accuracy | |
| urd (bool): Use Regime Detection - The input value from the "Use Regime Detection" Checkbox component | |
| Detects intraday market regimes and volatility patterns | |
| ust (bool): Use Stress Testing - The input value from the "Use Stress Testing" Checkbox component | |
| Performs scenario analysis for short-term market shocks | |
| rfr (float): Risk-Free Rate (Annual) - The input value from the "Risk-Free Rate (Annual)" Slider component (0.0-0.1) | |
| Annual risk-free rate for risk-adjusted calculations | |
| mi (str): Market Index for Correlation - The input value from the "Market Index for Correlation" Dropdown component | |
| Options: "^GSPC" (S&P 500), "^DJI" (Dow Jones), "^IXIC" (NASDAQ), "^RUT" (Russell 2000) | |
| cw (float): Chronos Weight - The input value from the "Chronos Weight" Slider component (0.0-1.0) | |
| Weight for Chronos model in ensemble predictions | |
| tw (float): Technical Weight - The input value from the "Technical Weight" Slider component (0.0-1.0) | |
| Weight for technical analysis in ensemble predictions | |
| sw (float): Statistical Weight - The input value from the "Statistical Weight" Slider component (0.0-1.0) | |
| Weight for statistical models in ensemble predictions | |
| rrp (int): Random Real Points in Long-Horizon Context - The input value from the "Random Real Points in Long-Horizon Context" Slider component | |
| Number of random real points to include in long-horizon context for improved predictions | |
| usm (bool): Use Smoothing - The input value from the "Use Smoothing" Checkbox component | |
| When True, applies smoothing to predictions to reduce noise and improve continuity | |
| smt (str): Smoothing Type - The input value from the "Smoothing Type" Dropdown component | |
| Options: "exponential", "moving_average", "kalman", "savitzky_golay", "double_exponential", "triple_exponential", "adaptive", "none" | |
| sww (float): Smoothing Window Size - The input value from the "Smoothing Window Size" Slider component | |
| Window size for moving average and Savitzky-Golay smoothing methods | |
| sa (float): Smoothing Alpha - The input value from the "Smoothing Alpha" Slider component (0.1-0.9) | |
| Alpha parameter for exponential smoothing methods | |
| uc (bool): Use Enhanced Covariate Data - The input value from the "Use Enhanced Covariate Data" Checkbox component | |
| When True, includes market indices, sectors, and economic indicators in analysis | |
| us (bool): Use Sentiment Analysis - The input value from the "Use Sentiment Analysis" Checkbox component | |
| When True, includes news sentiment analysis in the prediction model | |
| Returns: | |
| Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: Analysis results containing: | |
| [0] Dict: Trading Signals - Output value for the "Trading Signals" Json component | |
| Basic trading signals optimized for hourly timeframes | |
| [1] go.Figure: Analysis and Prediction - Output value for the "Analysis and Prediction" Plot component | |
| Interactive plot with hourly data, predictions, and intraday patterns | |
| [2] Dict: Product Metrics - Output value for the "Product Metrics" Json component | |
| Product metrics including intraday volatility and volume analysis | |
| [3] Dict: Risk Metrics - Output value for the "Risk Metrics" Json component | |
| Risk metrics adjusted for hourly data frequency | |
| [4] Dict: Sector Metrics - Output value for the "Sector Metrics" Json component | |
| Sector analysis with intraday-specific metrics | |
| [5] Dict: Regime Metrics - Output value for the "Regime Metrics" Json component | |
| Market regime detection for hourly patterns | |
| [6] Dict: Stress Test Results - Output value for the "Stress Test Results" Json component | |
| Stress testing results for short-term scenarios | |
| [7] Dict: Ensemble Metrics - Output value for the "Ensemble Metrics" Json component | |
| Ensemble analysis configuration and results | |
| [8] Dict: Advanced Trading Signals - Output value for the "Advanced Trading Signals" Json component | |
| Advanced signals with intraday-specific indicators | |
| [9] Dict: Historical Data - Output value for the "Historical Data" Json component | |
| Historical data for the selected stock | |
| [10] Dict: Predicted Data - Output value for the "Predicted Data" Json component | |
| Predicted data for the selected stock | |
| Raises: | |
| gr.Error: If market is closed, insufficient data, or analysis errors | |
| Hourly data is only available during market hours (9:30 AM - 4:00 PM ET) | |
| Example: | |
| >>> signals, plot, metrics, risk, sector, regime, stress, ensemble, advanced, historical, predicted = hourly_analysis( | |
| ... "AAPL", 3, 14, "chronos", True, True, True, 0.02, "^GSPC", 0.6, 0.2, 0.2, 4, True, "exponential", 5, 0.3, True, True | |
| ... ) | |
| Notes: | |
| - Only available during market hours (9:30 AM - 4:00 PM ET, weekdays) | |
| - Maximum prediction period is 7 days (168 hours) | |
| - Historical data limited to 60 days due to Yahoo Finance constraints | |
| - Includes pre/post market data for extended hours analysis | |
| - Optimized for day trading and swing trading strategies | |
| - Requires high-liquidity stocks for reliable hourly analysis | |
| - Smoothing helps reduce prediction noise but may reduce responsiveness to sudden changes | |
| - Enhanced covariate data includes market indices, sector ETFs, commodities, and currencies | |
| - Sentiment analysis provides news sentiment scoring and confidence levels | |
| """ | |
| return analyze_stock(s, "1h", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, ha, abl, uc, us) | |
| hourly_predict_btn.click( | |
| fn=hourly_analysis, | |
| inputs=[hourly_symbol, hourly_prediction_days, hourly_lookback_days, hourly_strategy, | |
| use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index, | |
| chronos_weight, technical_weight, statistical_weight, | |
| random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha, | |
| high_accuracy, accuracy_boost_level, | |
| use_covariates, use_sentiment], | |
| outputs=[hourly_signals, hourly_plot, hourly_metrics, hourly_risk_metrics, hourly_sector_metrics, | |
| hourly_regime_metrics, hourly_stress_results, hourly_ensemble_metrics, hourly_signals_advanced, hourly_historical_json, hourly_predicted_json] | |
| ) | |
| # 15-minute analysis button click | |
| def min15_analysis(s: str, pd: int, ld: int, st: str, ue: bool, urd: bool, ust: bool, | |
| rfr: float, mi: str, cw: float, tw: float, sw: float, | |
| rrp: int, usm: bool, smt: str, sww: float, sa: float, | |
| ha: bool, abl: float, | |
| uc: bool, us: bool) -> Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: | |
| """ | |
| Process 15-minute timeframe stock analysis with enhanced features. | |
| This function performs ultra-high-frequency stock analysis using 15-minute data, ideal for | |
| scalping and very short-term trading strategies. It includes micro-volatility analysis, | |
| volume-price relationships, momentum indicators, covariate data, and sentiment analysis | |
| optimized for 15-minute timeframes. | |
| Args: | |
| s (str): Stock Symbol (e.g., AAPL) - The input value from the "Stock Symbol" Textbox component | |
| pd (int): Days to Predict - The input value from the "Days to Predict" Slider component (1-3) | |
| Limited to 3 days due to Yahoo Finance 15-minute data constraints | |
| ld (int): Historical Lookback (Days) - The input value from the "Historical Lookback (Days)" Slider component (1-7) | |
| Limited to 7 days for 15-minute data | |
| st (str): Prediction Strategy - The input value from the "Prediction Strategy" Dropdown component | |
| Options: "chronos" (uses Amazon's Chronos T5 model optimized for 15-minute data) or "technical" (technical indicators adjusted for 15-minute timeframes) | |
| ue (bool): Use Ensemble Methods - The input value from the "Use Ensemble Methods" Checkbox component | |
| Combines multiple models for improved ultra-short-term prediction accuracy | |
| urd (bool): Use Regime Detection - The input value from the "Use Regime Detection" Checkbox component | |
| Detects micro-market regimes and volatility patterns | |
| ust (bool): Use Stress Testing - The input value from the "Use Stress Testing" Checkbox component | |
| Performs scenario analysis for micro-market shocks | |
| Performs scenario analysis for intraday market shocks and volatility spikes | |
| rfr (float): Risk-Free Rate (Annual) - The input value from the "Risk-Free Rate (Annual)" Slider component (0.0-0.1) | |
| Annual risk-free rate for risk-adjusted calculations (less relevant for 15m analysis) | |
| mi (str): Market Index for Correlation - The input value from the "Market Index for Correlation" Dropdown component | |
| Options: "^GSPC" (S&P 500), "^DJI" (Dow Jones), "^IXIC" (NASDAQ), "^RUT" (Russell 2000) | |
| cw (float): Chronos Weight - The input value from the "Chronos Weight" Slider component (0.0-1.0) | |
| Weight for Chronos model in ensemble predictions | |
| tw (float): Technical Weight - The input value from the "Technical Weight" Slider component (0.0-1.0) | |
| Weight for technical analysis in ensemble predictions | |
| sw (float): Statistical Weight - The input value from the "Statistical Weight" Slider component (0.0-1.0) | |
| Weight for statistical models in ensemble predictions | |
| rrp (int): Random Real Points in Long-Horizon Context - The input value from the "Random Real Points in Long-Horizon Context" Slider component | |
| Number of random real points to include in long-horizon context for improved predictions | |
| usm (bool): Use Smoothing - The input value from the "Use Smoothing" Checkbox component | |
| When True, applies smoothing to predictions to reduce noise and improve continuity | |
| smt (str): Smoothing Type - The input value from the "Smoothing Type" Dropdown component | |
| Options: "exponential", "moving_average", "kalman", "savitzky_golay", "double_exponential", "triple_exponential", "adaptive", "none" | |
| sww (float): Smoothing Window Size - The input value from the "Smoothing Window Size" Slider component | |
| Window size for moving average and Savitzky-Golay smoothing methods | |
| sa (float): Smoothing Alpha - The input value from the "Smoothing Alpha" Slider component (0.1-0.9) | |
| Alpha parameter for exponential smoothing methods | |
| Returns: | |
| Tuple[Dict, go.Figure, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict, Dict]: Analysis results containing: | |
| [0] Dict: Trading Signals - Output value for the "Trading Signals" Json component | |
| Basic trading signals optimized for 15-minute timeframes | |
| [1] go.Figure: Analysis and Prediction - Output value for the "Analysis and Prediction" Plot component | |
| Interactive plot with 15-minute data, predictions, and micro-patterns | |
| [2] Dict: Product Metrics - Output value for the "Product Metrics" Json component | |
| Product metrics including high-frequency volatility and volume analysis | |
| [3] Dict: Risk Metrics - Output value for the "Risk Metrics" Json component | |
| Risk metrics adjusted for 15-minute data frequency | |
| [4] Dict: Sector Metrics - Output value for the "Sector Metrics" Json component | |
| Sector analysis with ultra-short-term metrics | |
| [5] Dict: Regime Metrics - Output value for the "Regime Metrics" Json component | |
| Market regime detection for 15-minute patterns | |
| [6] Dict: Stress Test Results - Output value for the "Stress Test Results" Json component | |
| Stress testing results for intraday scenarios | |
| [7] Dict: Ensemble Metrics - Output value for the "Ensemble Metrics" Json component | |
| Ensemble analysis configuration and results | |
| [8] Dict: Advanced Trading Signals - Output value for the "Advanced Trading Signals" Json component | |
| Advanced signals with 15-minute-specific indicators | |
| [9] Dict: Historical Data - Output value for the "Historical Data" Json component | |
| Historical data for the selected stock | |
| [10] Dict: Predicted Data - Output value for the "Predicted Data" Json component | |
| Predicted data for the selected stock | |
| Raises: | |
| gr.Error: If market is closed, insufficient data points, or analysis errors | |
| 15-minute data requires at least 64 data points and is only available during market hours | |
| Example: | |
| >>> signals, plot, metrics, risk, sector, regime, stress, ensemble, advanced, historical, predicted = min15_analysis( | |
| ... "AAPL", 1, 3, "chronos", True, True, True, 0.02, "^GSPC", 0.6, 0.2, 0.2, 4, True, "exponential", 5, 0.3 | |
| ... ) | |
| Notes: | |
| - Only available during market hours (9:30 AM - 4:00 PM ET, weekdays) | |
| - Maximum prediction period is 2 days (192 15-minute intervals) | |
| - Historical data limited to 7 days due to Yahoo Finance constraints | |
| - Requires minimum 64 data points for reliable Chronos predictions | |
| - Optimized for scalping and very short-term trading strategies | |
| - Includes specialized indicators for intraday momentum and volume analysis | |
| - Higher transaction costs and slippage considerations for 15-minute strategies | |
| - Best suited for highly liquid large-cap stocks with tight bid-ask spreads | |
| - Smoothing helps reduce prediction noise but may reduce responsiveness to sudden changes | |
| """ | |
| return analyze_stock(s, "15m", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, ha, abl, uc, us) | |
| min15_predict_btn.click( | |
| fn=min15_analysis, | |
| inputs=[min15_symbol, min15_prediction_days, min15_lookback_days, min15_strategy, | |
| use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index, | |
| chronos_weight, technical_weight, statistical_weight, | |
| random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha, | |
| high_accuracy, accuracy_boost_level], | |
| outputs=[min15_signals, min15_plot, min15_metrics, min15_risk_metrics, min15_sector_metrics, | |
| min15_regime_metrics, min15_stress_results, min15_ensemble_metrics, min15_signals_advanced, min15_historical_json, min15_predicted_json] | |
| ) | |
| return demo | |
| def get_enhanced_covariate_data(symbol: str, timeframe: str = "1d", lookback_days: int = 365) -> Dict[str, pd.DataFrame]: | |
| """ | |
| Collect enhanced covariate data including market indices, sectors, commodities, and economic indicators. | |
| Args: | |
| symbol (str): Stock symbol | |
| timeframe (str): Data timeframe | |
| lookback_days (int): Number of days to look back | |
| Returns: | |
| Dict[str, pd.DataFrame]: Dictionary of covariate dataframes | |
| """ | |
| try: | |
| covariate_data = {} | |
| # Calculate date range | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=lookback_days) | |
| # Collect market indices data | |
| print("Collecting market indices data...") | |
| market_data = {} | |
| for index in COVARIATE_SOURCES['market_indices']: | |
| try: | |
| ticker = yf.Ticker(index) | |
| data = retry_yfinance_request(lambda: ticker.history( | |
| start=start_date, end=end_date, interval=timeframe | |
| )) | |
| if not data.empty: | |
| market_data[index] = data['Close'] | |
| print(f" Successfully collected {index}: {len(data)} data points") | |
| else: | |
| print(f" No data for {index}") | |
| except Exception as e: | |
| print(f" Error fetching {index}: {str(e)}") | |
| # Continue with other indices even if one fails | |
| if market_data: | |
| covariate_data['market_indices'] = pd.DataFrame(market_data) | |
| print(f"Market indices data shape: {covariate_data['market_indices'].shape}") | |
| else: | |
| print("No market indices data collected") | |
| # Collect sector data | |
| print("Collecting sector data...") | |
| sector_data = {} | |
| for sector in COVARIATE_SOURCES['sectors']: | |
| try: | |
| ticker = yf.Ticker(sector) | |
| data = retry_yfinance_request(lambda: ticker.history( | |
| start=start_date, end=end_date, interval=timeframe | |
| )) | |
| if not data.empty: | |
| sector_data[sector] = data['Close'] | |
| print(f" Successfully collected {sector}: {len(data)} data points") | |
| else: | |
| print(f" No data for {sector}") | |
| except Exception as e: | |
| print(f" Error fetching {sector}: {str(e)}") | |
| # Continue with other sectors even if one fails | |
| if sector_data: | |
| covariate_data['sectors'] = pd.DataFrame(sector_data) | |
| print(f"Sector data shape: {covariate_data['sectors'].shape}") | |
| else: | |
| print("No sector data collected") | |
| # Collect economic indicators | |
| print("Collecting economic indicators...") | |
| economic_data = {} | |
| for indicator, ticker_symbol in ECONOMIC_INDICATORS.items(): | |
| try: | |
| ticker = yf.Ticker(ticker_symbol) | |
| data = retry_yfinance_request(lambda: ticker.history( | |
| start=start_date, end=end_date, interval=timeframe | |
| )) | |
| if not data.empty: | |
| economic_data[indicator] = data['Close'] | |
| print(f" Successfully collected {indicator} ({ticker_symbol}): {len(data)} data points") | |
| else: | |
| print(f" No data for {indicator} ({ticker_symbol})") | |
| except Exception as e: | |
| print(f" Error fetching {indicator} ({ticker_symbol}): {str(e)}") | |
| # Continue with other indicators even if one fails | |
| if economic_data: | |
| covariate_data['economic_indicators'] = pd.DataFrame(economic_data) | |
| print(f"Economic indicators data shape: {covariate_data['economic_indicators'].shape}") | |
| else: | |
| print("No economic indicators data collected") | |
| # Return whatever data we were able to collect | |
| if not covariate_data: | |
| print("Warning: No covariate data collected, returning empty dict") | |
| return covariate_data | |
| except Exception as e: | |
| print(f"Error collecting covariate data: {str(e)}") | |
| # Return empty dict instead of failing completely | |
| return {} | |
| def calculate_market_sentiment(symbol: str, lookback_days: int = 30) -> Dict[str, float]: | |
| """ | |
| Calculate market sentiment using news sentiment analysis and social media data. | |
| Args: | |
| symbol (str): Stock symbol | |
| lookback_days (int): Number of days to look back | |
| Returns: | |
| Dict[str, float]: Sentiment metrics | |
| """ | |
| if not SENTIMENT_AVAILABLE: | |
| return {'sentiment_score': 0.0, 'sentiment_confidence': 0.0} | |
| try: | |
| sentiment_scores = [] | |
| # Get news sentiment (simplified approach using yfinance news) | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| news = retry_yfinance_request(lambda: ticker.news) | |
| if news: | |
| for article in news[:10]: # Analyze last 10 news articles | |
| title = article.get('title', '') | |
| summary = article.get('summary', '') | |
| text = f"{title} {summary}" | |
| # Calculate sentiment using TextBlob | |
| blob = TextBlob(text) | |
| sentiment_scores.append(blob.sentiment.polarity) | |
| except Exception as e: | |
| print(f"Error fetching news sentiment: {str(e)}") | |
| # Don't fail completely, just log the error | |
| # Calculate average sentiment | |
| if sentiment_scores: | |
| avg_sentiment = np.mean(sentiment_scores) | |
| sentiment_confidence = min(0.9, len(sentiment_scores) / 10.0) | |
| else: | |
| avg_sentiment = 0.0 | |
| sentiment_confidence = 0.0 | |
| return { | |
| 'sentiment_score': avg_sentiment, | |
| 'sentiment_confidence': sentiment_confidence, | |
| 'sentiment_samples': len(sentiment_scores) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating sentiment: {str(e)}") | |
| # Return neutral sentiment on error | |
| return { | |
| 'sentiment_score': 0.0, | |
| 'sentiment_confidence': 0.0, | |
| 'sentiment_samples': 0, | |
| 'error': str(e) | |
| } | |
| def calculate_advanced_uncertainty(quantiles: np.ndarray, historical_volatility: float, | |
| market_conditions: Dict = None, confidence_level: float = 0.9) -> Dict[str, np.ndarray]: | |
| """ | |
| Calculate advanced uncertainty estimates using multiple methods. | |
| Args: | |
| quantiles (np.ndarray): Quantile predictions from Chronos | |
| historical_volatility (float): Historical volatility | |
| market_conditions (Dict): Market condition indicators | |
| confidence_level (float): Confidence level for uncertainty calculation | |
| Returns: | |
| Dict[str, np.ndarray]: Multiple uncertainty estimates | |
| """ | |
| try: | |
| lower = quantiles[0, :, 0] | |
| median = quantiles[0, :, 1] | |
| upper = quantiles[0, :, 2] | |
| uncertainties = {} | |
| # 1. Basic quantile-based uncertainty | |
| basic_uncertainty = (upper - lower) / (2 * stats.norm.ppf(confidence_level)) | |
| uncertainties['basic'] = basic_uncertainty | |
| # 2. Skewness-adjusted uncertainty | |
| skewed_uncertainty = calculate_skewed_uncertainty(quantiles, confidence_level) | |
| uncertainties['skewed'] = skewed_uncertainty | |
| # 3. Volatility-scaled uncertainty | |
| volatility_scaled = basic_uncertainty * (1 + historical_volatility) | |
| uncertainties['volatility_scaled'] = volatility_scaled | |
| # 4. Market condition adjusted uncertainty | |
| if market_conditions: | |
| vix_level = market_conditions.get('vix', 20.0) | |
| vix_factor = vix_level / 20.0 # Normalize to typical VIX level | |
| market_adjusted = basic_uncertainty * vix_factor | |
| uncertainties['market_adjusted'] = market_adjusted | |
| else: | |
| uncertainties['market_adjusted'] = basic_uncertainty | |
| # 5. Time-decay uncertainty (uncertainty increases with prediction horizon) | |
| time_decay = np.array([basic_uncertainty[i] * (1 + 0.1 * i) for i in range(len(basic_uncertainty))]) | |
| uncertainties['time_decay'] = time_decay | |
| # 6. Ensemble uncertainty (combine all methods) | |
| ensemble_uncertainty = np.mean([ | |
| uncertainties['basic'], | |
| uncertainties['skewed'], | |
| uncertainties['volatility_scaled'], | |
| uncertainties['market_adjusted'], | |
| uncertainties['time_decay'] | |
| ], axis=0) | |
| uncertainties['ensemble'] = ensemble_uncertainty | |
| return uncertainties | |
| except Exception as e: | |
| print(f"Advanced uncertainty calculation error: {str(e)}") | |
| # Fallback to basic calculation | |
| return {'basic': (quantiles[0, :, 2] - quantiles[0, :, 0]) / (2 * 1.645)} | |
| def create_enhanced_ensemble_model(df: pd.DataFrame, covariate_data: Dict, | |
| prediction_days: int) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Create an enhanced ensemble model using multiple algorithms and covariate data. | |
| Args: | |
| df (pd.DataFrame): Stock data | |
| covariate_data (Dict): Covariate data | |
| prediction_days (int): Number of days to predict | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray]: Ensemble predictions and uncertainties | |
| """ | |
| if not ENSEMBLE_AVAILABLE: | |
| return np.array([]), np.array([]) | |
| try: | |
| # Prepare features | |
| features = [] | |
| target = df['Close'].values | |
| # Technical indicators as features | |
| features.append(df['RSI'].fillna(50).values) # Fill NaN with neutral RSI value | |
| features.append(df['MACD'].fillna(0).values) # Fill NaN with zero | |
| features.append(df['Volatility'].fillna(df['Volatility'].mean()).values) # Fill with mean volatility | |
| if 'BB_Upper' in df.columns: | |
| bb_position = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower']) | |
| bb_position = bb_position.fillna(0.5) # Fill NaN with neutral position | |
| features.append(bb_position.values) | |
| # Add lagged price features to capture temporal patterns | |
| for lag in [1, 2, 3, 5, 10]: | |
| if len(target) > lag: | |
| lagged_prices = np.pad(target[:-lag], (lag, 0), mode='edge') | |
| features.append(lagged_prices) | |
| # Add rolling statistics to capture trends | |
| for window in [5, 10, 20]: | |
| if len(target) >= window: | |
| rolling_mean = pd.Series(target).rolling(window=window, min_periods=1).mean().values | |
| rolling_std = pd.Series(target).rolling(window=window, min_periods=1).std().fillna(0).values | |
| features.append(rolling_mean) | |
| features.append(rolling_std) | |
| # Add price momentum features | |
| if len(target) > 1: | |
| price_momentum_1d = np.pad(np.diff(target), (1, 0), mode='constant', constant_values=0) | |
| price_momentum_5d = np.pad(np.diff(target, n=5), (5, 0), mode='constant', constant_values=0) | |
| features.append(price_momentum_1d) | |
| features.append(price_momentum_5d) | |
| # Add covariate data | |
| if 'market_indices' in covariate_data: | |
| for col in covariate_data['market_indices'].columns: | |
| covariate_series = covariate_data['market_indices'][col] | |
| # Align covariate data to target length | |
| if len(covariate_series) == len(target): | |
| # Fill NaN values with forward fill, then backward fill | |
| filled_series = covariate_series.fillna(method='ffill').fillna(method='bfill') | |
| features.append(filled_series.values) | |
| elif len(covariate_series) > len(target): | |
| # Truncate to target length | |
| truncated_series = covariate_series.tail(len(target)) | |
| filled_series = truncated_series.fillna(method='ffill').fillna(method='bfill') | |
| features.append(filled_series.values) | |
| else: | |
| # Pad with last value | |
| padded_values = np.pad(covariate_series.values, | |
| (len(target) - len(covariate_series), 0), | |
| mode='edge') | |
| # Fill any remaining NaN values | |
| filled_values = pd.Series(padded_values).fillna(method='ffill').fillna(method='bfill').values | |
| features.append(filled_values) | |
| if 'economic_indicators' in covariate_data: | |
| for col in covariate_data['economic_indicators'].columns: | |
| covariate_series = covariate_data['economic_indicators'][col] | |
| # Align covariate data to target length | |
| if len(covariate_series) == len(target): | |
| # Fill NaN values with forward fill, then backward fill | |
| filled_series = covariate_series.fillna(method='ffill').fillna(method='bfill') | |
| features.append(filled_series.values) | |
| elif len(covariate_series) > len(target): | |
| # Truncate to target length | |
| truncated_series = covariate_series.tail(len(target)) | |
| filled_series = truncated_series.fillna(method='ffill').fillna(method='bfill') | |
| features.append(filled_series.values) | |
| else: | |
| # Pad with last value | |
| padded_values = np.pad(covariate_series.values, | |
| (len(target) - len(covariate_series), 0), | |
| mode='edge') | |
| # Fill any remaining NaN values | |
| filled_values = pd.Series(padded_values).fillna(method='ffill').fillna(method='bfill').values | |
| features.append(filled_values) | |
| # Create feature matrix | |
| X = np.column_stack(features) | |
| y = target | |
| # Validate feature matrix | |
| print(f"Feature matrix shape: {X.shape}, Target shape: {y.shape}") | |
| # Ensure all features have the same length | |
| feature_lengths = [len(feature) for feature in features] | |
| if len(set(feature_lengths)) > 1: | |
| print(f"Warning: Feature lengths are inconsistent: {feature_lengths}") | |
| # Find the minimum length and truncate all features | |
| min_length = min(feature_lengths) | |
| X = X[:min_length] | |
| y = y[:min_length] | |
| print(f"Truncated to minimum length: {min_length}") | |
| # Remove any NaN values | |
| print(f"Checking for NaN values in feature matrix...") | |
| nan_rows = np.isnan(X).any(axis=1) | |
| nan_count = nan_rows.sum() | |
| print(f"Found {nan_count} rows with NaN values out of {len(X)} total rows") | |
| if nan_count > 0: | |
| # Check which features have NaN values | |
| for i, feature_name in enumerate(['RSI', 'MACD', 'Volatility', 'BB_Position'] + | |
| [f'Market_{j}' for j in range(len(features)-4)]): | |
| if i < X.shape[1]: | |
| nan_count_feature = np.isnan(X[:, i]).sum() | |
| if nan_count_feature > 0: | |
| print(f" Feature {feature_name}: {nan_count_feature} NaN values") | |
| # Only remove rows if there are still NaN values after filling | |
| if nan_count > 0: | |
| mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y) | |
| X = X[mask] | |
| y = y[mask] | |
| print(f"Removed {nan_count} rows with NaN values") | |
| else: | |
| print("No NaN values found in feature matrix") | |
| print(f"After NaN removal - X shape: {X.shape}, y shape: {y.shape}") | |
| if len(X) < 50: # Need sufficient data | |
| print(f"Insufficient data after preprocessing: {len(X)} samples") | |
| print("This might be due to:") | |
| print("1. Too many NaN values in covariate data") | |
| print("2. Insufficient historical data") | |
| print("3. Data alignment issues") | |
| print("Trying fallback with technical indicators only...") | |
| # Fallback: Use only technical indicators | |
| try: | |
| fallback_features = [] | |
| fallback_features.append(df['RSI'].fillna(50).values) | |
| fallback_features.append(df['MACD'].fillna(0).values) | |
| fallback_features.append(df['Volatility'].fillna(df['Volatility'].mean()).values) | |
| if 'BB_Upper' in df.columns: | |
| bb_position = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower']) | |
| bb_position = bb_position.fillna(0.5) | |
| fallback_features.append(bb_position.values) | |
| X_fallback = np.column_stack(fallback_features) | |
| y_fallback = df['Close'].values | |
| # Remove any remaining NaN values | |
| mask = ~np.isnan(X_fallback).any(axis=1) & ~np.isnan(y_fallback) | |
| X_fallback = X_fallback[mask] | |
| y_fallback = y_fallback[mask] | |
| print(f"Fallback feature matrix shape: {X_fallback.shape}") | |
| if len(X_fallback) >= 50: | |
| print("Using fallback ensemble with technical indicators only") | |
| # Use the fallback data for the rest of the function | |
| X = X_fallback | |
| y = y_fallback | |
| else: | |
| print("Fallback also failed, returning empty arrays") | |
| return np.array([]), np.array([]) | |
| except Exception as fallback_error: | |
| print(f"Fallback failed: {str(fallback_error)}") | |
| return np.array([]), np.array([]) | |
| # Initialize models | |
| models = { | |
| 'random_forest': RandomForestRegressor(n_estimators=100, random_state=42), | |
| 'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=42), | |
| 'ridge': Ridge(alpha=1.0), | |
| 'lasso': Lasso(alpha=0.1), | |
| 'svr': SVR(kernel='rbf', C=1.0, gamma='scale'), | |
| 'mlp': MLPRegressor(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42) | |
| } | |
| # Train models using time series cross-validation | |
| predictions = {} | |
| uncertainties = {} | |
| for name, model in models.items(): | |
| try: | |
| # Use ALL available data for training (no train/test split) | |
| # This maximizes the use of historical information | |
| print(f"Training {name} on all {len(X)} data points...") | |
| # Train model on all available data | |
| model.fit(X, y) | |
| # Generate predictions for the full prediction period | |
| # Use the most recent data points to generate future predictions | |
| if len(X) >= prediction_days: | |
| # Use the last prediction_days data points for prediction | |
| # This ensures we're using the most recent patterns and trends | |
| X_pred = X[-prediction_days:] | |
| pred = model.predict(X_pred) | |
| print(f" {name}: Generated {len(pred)} predictions using last {prediction_days} data points") | |
| else: | |
| # If we don't have enough data, use all available data and extrapolate | |
| pred = model.predict(X) | |
| print(f" {name}: Generated {len(pred)} predictions using all {len(X)} data points") | |
| if len(pred) < prediction_days: | |
| # Extend with trend-based predictions | |
| if len(pred) > 0: | |
| # Calculate trend from last few predictions | |
| trend_window = min(5, len(pred)) | |
| if trend_window > 1: | |
| trend = np.mean(np.diff(pred[-trend_window:])) | |
| else: | |
| trend = 0 | |
| # Extend predictions using the trend | |
| last_pred = pred[-1] if len(pred) > 0 else y[-1] | |
| for i in range(len(pred), prediction_days): | |
| next_pred = last_pred + trend | |
| pred = np.append(pred, next_pred) | |
| last_pred = next_pred | |
| print(f" {name}: Extended to {len(pred)} predictions using trend extrapolation") | |
| else: | |
| # No predictions available, use simple extrapolation | |
| last_price = y[-1] | |
| pred = np.array([last_price * (1 + 0.001 * i) for i in range(prediction_days)]) | |
| print(f" {name}: Generated {len(pred)} predictions using simple extrapolation") | |
| # Calculate uncertainty using cross-validation on all data | |
| # This gives us a better estimate of model performance | |
| try: | |
| cv_scores = cross_val_score(model, X, y, cv=min(5, len(X)//10), scoring='neg_mean_squared_error') | |
| mse = -np.mean(cv_scores) | |
| uncertainty = np.sqrt(mse) * np.ones(prediction_days) | |
| print(f" {name} CV MSE: {mse:.6f}") | |
| except Exception as cv_error: | |
| print(f" {name} CV failed, using fallback uncertainty: {str(cv_error)}") | |
| # Fallback uncertainty based on historical volatility | |
| uncertainty = np.std(y) * np.ones(prediction_days) | |
| # Ensure prediction is the right length | |
| if len(pred) != prediction_days: | |
| print(f"Warning: {name} produced {len(pred)} predictions, expected {prediction_days}") | |
| if len(pred) > prediction_days: | |
| pred = pred[:prediction_days] | |
| uncertainty = uncertainty[:prediction_days] | |
| else: | |
| # Extend with trend-based predictions | |
| if len(pred) > 0: | |
| # Calculate trend from last few predictions | |
| trend_window = min(5, len(pred)) | |
| if trend_window > 1: | |
| trend = np.mean(np.diff(pred[-trend_window:])) | |
| else: | |
| trend = 0 | |
| # Extend predictions using the trend | |
| last_pred = pred[-1] if len(pred) > 0 else y[-1] | |
| for i in range(len(pred), prediction_days): | |
| next_pred = last_pred + trend | |
| pred = np.append(pred, next_pred) | |
| last_pred = next_pred | |
| else: | |
| # No predictions available, use simple extrapolation | |
| last_price = y[-1] | |
| pred = np.array([last_price * (1 + 0.001 * i) for i in range(prediction_days)]) | |
| # Validate prediction | |
| if len(pred) == prediction_days and len(uncertainty) == prediction_days: | |
| predictions[name] = pred | |
| uncertainties[name] = uncertainty | |
| else: | |
| print(f"Warning: {name} prediction validation failed. Skipping.") | |
| continue | |
| except Exception as e: | |
| print(f"Error training {name}: {str(e)}") | |
| continue | |
| if not predictions: | |
| return np.array([]), np.array([]) | |
| # Debug: Print prediction lengths | |
| print(f"Ensemble model debugging - Expected prediction_days: {prediction_days}") | |
| for name, pred in predictions.items(): | |
| print(f" {name}: prediction length = {len(pred)}, uncertainty length = {len(uncertainties.get(name, []))}") | |
| # Combine predictions using weighted average | |
| weights = {} | |
| model_performances = {} | |
| for name in predictions.keys(): | |
| if name in uncertainties: | |
| # Calculate model performance on training data | |
| try: | |
| # Use cross-validation score as performance metric | |
| cv_scores = cross_val_score(models[name], X, y, cv=min(5, len(X)//10), scoring='r2') | |
| performance = np.mean(cv_scores) | |
| model_performances[name] = performance | |
| # Weight based on both performance and uncertainty | |
| # Higher performance and lower uncertainty = higher weight | |
| uncertainty_factor = 1.0 / np.mean(uncertainties[name]) | |
| performance_factor = max(0, performance) # Ensure non-negative | |
| # Combine factors (you can adjust the balance) | |
| weights[name] = (0.7 * performance_factor + 0.3 * uncertainty_factor) | |
| print(f" {name}: Performance={performance:.4f}, Uncertainty={np.mean(uncertainties[name]):.4f}, Weight={weights[name]:.4f}") | |
| except Exception as e: | |
| print(f" {name}: Performance calculation failed, using uncertainty only: {str(e)}") | |
| # Fallback to uncertainty-based weighting | |
| weights[name] = 1.0 / np.mean(uncertainties[name]) | |
| model_performances[name] = 0.0 | |
| else: | |
| # Equal weight if no uncertainty available | |
| weights[name] = 1.0 | |
| model_performances[name] = 0.0 | |
| # Normalize weights | |
| total_weight = sum(weights.values()) | |
| if total_weight > 0: | |
| weights = {k: v / total_weight for k, v in weights.items()} | |
| else: | |
| # Equal weights if all uncertainties are zero | |
| weights = {k: 1.0 / len(weights) for k in weights.keys()} | |
| # Calculate ensemble prediction | |
| ensemble_pred = np.zeros(prediction_days) | |
| ensemble_uncertainty = np.zeros(prediction_days) | |
| for name, pred in predictions.items(): | |
| if name in weights: | |
| # Ensure prediction is the right length | |
| pred_array = np.array(pred) | |
| uncertainty_array = np.array(uncertainties[name]) | |
| # Handle different prediction lengths | |
| if len(pred_array) >= prediction_days: | |
| # Truncate to prediction_days | |
| pred_array = pred_array[:prediction_days] | |
| uncertainty_array = uncertainty_array[:prediction_days] | |
| elif len(pred_array) < prediction_days: | |
| # Extend with the last value | |
| last_pred = pred_array[-1] if len(pred_array) > 0 else 0 | |
| last_uncertainty = uncertainty_array[-1] if len(uncertainty_array) > 0 else 1.0 | |
| # Pad with last values | |
| pred_array = np.pad(pred_array, (0, prediction_days - len(pred_array)), | |
| mode='constant', constant_values=last_pred) | |
| uncertainty_array = np.pad(uncertainty_array, (0, prediction_days - len(uncertainty_array)), | |
| mode='constant', constant_values=last_uncertainty) | |
| # Ensure arrays are the correct shape | |
| if len(pred_array) != prediction_days: | |
| print(f"Warning: {name} prediction length mismatch. Expected {prediction_days}, got {len(pred_array)}") | |
| continue | |
| if len(uncertainty_array) != prediction_days: | |
| print(f"Warning: {name} uncertainty length mismatch. Expected {prediction_days}, got {len(uncertainty_array)}") | |
| continue | |
| # Add weighted contribution | |
| ensemble_pred += weights[name] * pred_array | |
| ensemble_uncertainty += weights[name] * uncertainty_array | |
| return ensemble_pred, ensemble_uncertainty | |
| except Exception as e: | |
| print(f"Enhanced ensemble model error: {str(e)}") | |
| print("Falling back to simple ensemble prediction...") | |
| # Fallback: Simple ensemble using basic models | |
| try: | |
| # Use simple linear regression as fallback | |
| X = np.arange(len(df)).reshape(-1, 1) | |
| y = df['Close'].values | |
| # Simple linear trend | |
| slope = np.polyfit(X.flatten(), y, 1)[0] | |
| last_price = y[-1] | |
| # Generate simple predictions | |
| ensemble_pred = np.array([last_price + slope * i for i in range(1, prediction_days + 1)]) | |
| ensemble_uncertainty = np.std(y) * np.ones(prediction_days) | |
| return ensemble_pred, ensemble_uncertainty | |
| except Exception as fallback_error: | |
| print(f"Fallback ensemble also failed: {str(fallback_error)}") | |
| return np.array([]), np.array([]) | |
| def calculate_volume_prediction_enhanced(df: pd.DataFrame, price_prediction: np.ndarray, | |
| covariate_data: Dict = None) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Enhanced volume prediction using multiple factors and relationships. | |
| Args: | |
| df (pd.DataFrame): Stock data | |
| price_prediction (np.ndarray): Price predictions | |
| covariate_data (Dict): Covariate data | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray]: Volume predictions and uncertainties | |
| """ | |
| try: | |
| # Get historical volume and price data | |
| volume_data = df['Volume'].values | |
| price_data = df['Close'].values | |
| # Calculate volume-price relationship | |
| volume_price_ratio = volume_data / price_data | |
| volume_volatility = np.std(volume_data) | |
| # Calculate volume momentum | |
| volume_momentum = np.diff(volume_data) | |
| avg_volume_momentum = np.mean(volume_momentum[-10:]) # Last 10 periods | |
| # Predict volume based on price movement | |
| price_changes = np.diff(price_prediction) | |
| predicted_volume = [] | |
| for i, price_change in enumerate(price_changes): | |
| if i == 0: | |
| # First prediction based on last actual volume | |
| base_volume = volume_data[-1] | |
| else: | |
| # Subsequent predictions based on price movement and momentum | |
| base_volume = predicted_volume[-1] | |
| # Adjust volume based on price movement | |
| if abs(price_change) > 0.01: # Significant price movement | |
| volume_multiplier = 1.5 if abs(price_change) > 0.02 else 1.2 | |
| else: | |
| volume_multiplier = 0.8 | |
| # Add momentum effect | |
| momentum_effect = 1.0 + (avg_volume_momentum / base_volume) * 0.1 | |
| # Calculate predicted volume | |
| pred_vol = base_volume * volume_multiplier * momentum_effect | |
| # Add some randomness based on historical volatility | |
| noise = np.random.normal(0, volume_volatility * 0.1) | |
| pred_vol = max(0, pred_vol + noise) | |
| predicted_volume.append(pred_vol) | |
| # Add first prediction | |
| predicted_volume.insert(0, volume_data[-1]) | |
| # Calculate uncertainty | |
| volume_uncertainty = volume_volatility * np.ones(len(predicted_volume)) | |
| # Adjust uncertainty based on prediction horizon | |
| for i in range(len(volume_uncertainty)): | |
| volume_uncertainty[i] *= (1 + 0.1 * i) | |
| return np.array(predicted_volume), np.array(volume_uncertainty) | |
| except Exception as e: | |
| print(f"Enhanced volume prediction error: {str(e)}") | |
| # Fallback to simple prediction | |
| last_volume = df['Volume'].iloc[-1] | |
| return np.full(len(price_prediction), last_volume), np.full(len(price_prediction), last_volume * 0.2) | |
| def calculate_regime_aware_uncertainty(quantiles: np.ndarray, regime_info: Dict, | |
| market_conditions: Dict = None) -> np.ndarray: | |
| """ | |
| Calculate uncertainty that accounts for market regime changes. | |
| Args: | |
| quantiles (np.ndarray): Quantile predictions | |
| regime_info (Dict): Market regime information | |
| market_conditions (Dict): Current market conditions | |
| Returns: | |
| np.ndarray: Regime-aware uncertainty estimates | |
| """ | |
| try: | |
| # Get basic uncertainty | |
| basic_uncertainty = calculate_skewed_uncertainty(quantiles) | |
| # Get regime information | |
| if regime_info and 'volatilities' in regime_info: | |
| current_volatility = regime_info.get('current_volatility', np.mean(regime_info['volatilities'])) | |
| avg_volatility = np.mean(regime_info['volatilities']) | |
| volatility_ratio = current_volatility / avg_volatility | |
| else: | |
| volatility_ratio = 1.0 | |
| # Adjust uncertainty based on regime | |
| regime_adjusted = basic_uncertainty * volatility_ratio | |
| # Add market condition adjustments | |
| if market_conditions: | |
| vix_level = market_conditions.get('vix', 20.0) | |
| vix_factor = vix_level / 20.0 | |
| regime_adjusted *= vix_factor | |
| return regime_adjusted | |
| except Exception as e: | |
| print(f"Regime-aware uncertainty calculation error: {str(e)}") | |
| return calculate_skewed_uncertainty(quantiles) | |
| def get_market_info_from_yfinance(symbol: str) -> Dict: | |
| """ | |
| Get market information from yfinance using the recommended API methods. | |
| Args: | |
| symbol (str): Market symbol (e.g., '^GSPC', 'EURUSD=X', 'BTC-USD') | |
| Returns: | |
| Dict: Market information including current price, volume, and other metrics | |
| """ | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| # Get basic info with retry - use get_info() method as recommended | |
| info = retry_yfinance_request(lambda: ticker.info) | |
| # Get current market data with retry | |
| try: | |
| hist = retry_yfinance_request(lambda: ticker.history(period="1d")) | |
| except Exception as e: | |
| print(f"Error fetching history for {symbol}: {str(e)}") | |
| hist = None | |
| # Get additional market data | |
| market_data = {} | |
| if hist is not None and not hist.empty: | |
| market_data.update({ | |
| 'current_price': hist['Close'].iloc[-1], | |
| 'open_price': hist['Open'].iloc[-1], | |
| 'high_price': hist['High'].iloc[-1], | |
| 'low_price': hist['Low'].iloc[-1], | |
| 'volume': hist['Volume'].iloc[-1], | |
| 'change': hist['Close'].iloc[-1] - hist['Open'].iloc[-1], | |
| 'change_percent': ((hist['Close'].iloc[-1] - hist['Open'].iloc[-1]) / hist['Open'].iloc[-1]) * 100 | |
| }) | |
| # Get news if available with retry | |
| try: | |
| news = retry_yfinance_request(lambda: ticker.news) | |
| market_data['news_count'] = len(news) if news else 0 | |
| except Exception as e: | |
| print(f"Error fetching news for {symbol}: {str(e)}") | |
| market_data['news_count'] = 0 | |
| # Skip earnings and recommendations for symbols that typically don't have them | |
| symbols_without_earnings = ['^', '=', 'F', 'X', 'USD', 'EUR', 'GBP', 'JPY', 'BTC', 'ETH', 'GC', 'SI', 'CL', 'NG'] | |
| skip_earnings = any(symbol in symbol.upper() for symbol in symbols_without_earnings) | |
| additional_data = {} | |
| if skip_earnings: | |
| market_data['earnings'] = [] | |
| market_data['recommendations'] = [] | |
| else: | |
| # Get recommendations if available with retry | |
| try: | |
| recommendations = retry_yfinance_request(lambda: ticker.recommendations) | |
| if recommendations is not None and hasattr(recommendations, 'empty') and not recommendations.empty: | |
| market_data['recommendations'] = recommendations.tail(5).to_dict('records') | |
| else: | |
| market_data['recommendations'] = [] | |
| except Exception as e: | |
| print(f"Error fetching recommendations for {symbol}: {str(e)}") | |
| market_data['recommendations'] = [] | |
| # Get earnings info if available with retry | |
| try: | |
| earnings = retry_yfinance_request(lambda: ticker.earnings) | |
| # Check if earnings is None or empty before accessing .empty | |
| if earnings is not None and hasattr(earnings, 'empty') and not earnings.empty: | |
| market_data['earnings'] = earnings.tail(4).to_dict('records') | |
| else: | |
| market_data['earnings'] = [] | |
| except Exception as e: | |
| print(f"Error fetching earnings for {symbol}: {str(e)}") | |
| market_data['earnings'] = [] | |
| # For stocks, try to get dividends and splits | |
| try: | |
| dividends = retry_yfinance_request(lambda: ticker.dividends) | |
| if dividends is not None and hasattr(dividends, 'empty') and not dividends.empty: | |
| additional_data['dividends'] = dividends.tail(4).to_dict('records') | |
| else: | |
| additional_data['dividends'] = [] | |
| except Exception as e: | |
| print(f"Error fetching dividends for {symbol}: {str(e)}") | |
| additional_data['dividends'] = [] | |
| try: | |
| splits = retry_yfinance_request(lambda: ticker.splits) | |
| if splits is not None and hasattr(splits, 'empty') and not splits.empty: | |
| additional_data['splits'] = splits.tail(4).to_dict('records') | |
| else: | |
| additional_data['splits'] = [] | |
| except Exception as e: | |
| print(f"Error fetching splits for {symbol}: {str(e)}") | |
| additional_data['splits'] = [] | |
| # Combine all data | |
| result = { | |
| 'symbol': symbol, | |
| 'info': info, | |
| 'market_data': market_data, | |
| 'additional_data': additional_data, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| return result | |
| except Exception as e: | |
| print(f"Error fetching market info for {symbol}: {str(e)}") | |
| return { | |
| 'symbol': symbol, | |
| 'error': str(e), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| def get_enhanced_market_summary() -> Dict: | |
| """ | |
| Get enhanced market summary for all configured markets using yfinance. | |
| Returns: | |
| Dict: Summary of all markets with current data | |
| """ | |
| market_summary = {} | |
| for market_key, config in MARKET_CONFIGS.items(): | |
| try: | |
| symbol = config['symbol'] | |
| market_info = get_market_info_from_yfinance(symbol) | |
| market_status = market_status_manager.get_status(market_key) | |
| market_summary[market_key] = { | |
| 'config': config, | |
| 'status': { | |
| 'is_open': market_status.is_open, | |
| 'status_text': market_status.status_text, | |
| 'current_time': market_status.current_time_et, | |
| 'next_trading_day': market_status.next_trading_day | |
| }, | |
| 'market_data': market_info.get('market_data', {}), | |
| 'info': market_info.get('info', {}), | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| market_summary[market_key] = { | |
| 'config': config, | |
| 'error': str(e), | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| return market_summary | |
| def check_market_status_simple(market_key: str) -> str: | |
| """ | |
| Simple function to check market status for a specific market. | |
| Args: | |
| market_key (str): Market key from MARKET_CONFIGS | |
| Returns: | |
| str: Formatted market status information | |
| """ | |
| try: | |
| # Get market status | |
| status = market_status_manager.get_status(market_key) | |
| # Create a user-friendly display | |
| status_emoji = "🟢" if status.is_open else "🔴" | |
| status_text = "OPEN" if status.is_open else "CLOSED" | |
| result = f""" | |
| ## {status_emoji} {status.market_name} Status: {status_text} | |
| **Current Status:** {status.status_text} | |
| **Market Details:** | |
| - **Type:** {status.market_type.title()} | |
| - **Symbol:** {status.market_symbol} | |
| - **Current Time:** {status.current_time_et} | |
| - **Last Updated:** {status.last_updated} | |
| **Trading Information:** | |
| - **Next Trading Day:** {status.next_trading_day} | |
| - **Time Until Open:** {status.time_until_open} | |
| - **Time Until Close:** {status.time_until_close} | |
| **Market Description:** {MARKET_CONFIGS[market_key]['description']} | |
| """ | |
| return result | |
| except Exception as e: | |
| return f"❌ Error checking market status: {str(e)}" | |
| def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Calculate technical indicators for a given DataFrame. | |
| Args: | |
| df (pd.DataFrame): DataFrame with OHLCV data | |
| Returns: | |
| pd.DataFrame: DataFrame with technical indicators added | |
| """ | |
| try: | |
| # Calculate moving averages | |
| df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean() | |
| df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean() | |
| df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() | |
| # Calculate RSI | |
| df['RSI'] = calculate_rsi(df['Close']) | |
| # Calculate MACD | |
| df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close']) | |
| # Calculate Bollinger Bands | |
| df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close']) | |
| # Calculate additional volatility metrics | |
| df['Annualized_Vol'] = df['Volatility'] * np.sqrt(252) | |
| # Calculate drawdown metrics | |
| df['Rolling_Max'] = df['Close'].rolling(window=len(df), min_periods=1).max() | |
| df['Drawdown'] = (df['Close'] - df['Rolling_Max']) / df['Rolling_Max'] | |
| df['Max_Drawdown'] = df['Drawdown'].rolling(window=len(df), min_periods=1).min() | |
| # Calculate liquidity metrics | |
| df['Avg_Daily_Volume'] = df['Volume'].rolling(window=20, min_periods=1).mean() | |
| df['Volume_Volatility'] = df['Volume'].rolling(window=20, min_periods=1).std() | |
| # Fill any remaining NaN values | |
| df = df.fillna(method='ffill').fillna(method='bfill') | |
| return df | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {str(e)}") | |
| return df | |
| def get_fundamental_data(symbol: str) -> dict: | |
| """ | |
| Fetch fundamental data for a given symbol using yfinance's info property. | |
| Returns a dictionary of fundamental data, or an empty dict if unavailable. | |
| """ | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| info = retry_yfinance_request(lambda: ticker.info) | |
| if info is not None and isinstance(info, dict): | |
| return info | |
| else: | |
| print(f"No fundamental info available for {symbol}") | |
| return {} | |
| except Exception as e: | |
| print(f"Error fetching fundamental data for {symbol}: {str(e)}") | |
| return {} # Fallback to empty dict | |
| if __name__ == "__main__": | |
| import signal | |
| import atexit | |
| # Register cleanup function | |
| atexit.register(cleanup_on_exit) | |
| # Handle SIGINT and SIGTERM for graceful shutdown | |
| def signal_handler(signum, frame): | |
| print(f"\nReceived signal {signum}. Shutting down gracefully...") | |
| cleanup_on_exit() | |
| exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| try: | |
| demo = create_interface() | |
| print("Starting Advanced Stock Prediction Analysis with real-time market status updates...") | |
| print("Market status will update every 10 minutes automatically.") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", "7860")), | |
| ssr_mode=False, | |
| mcp_server=True | |
| ) | |
| except KeyboardInterrupt: | |
| print("\nApplication interrupted by user. Shutting down...") | |
| cleanup_on_exit() | |
| except Exception as e: | |
| print(f"Error starting application: {str(e)}") | |
| cleanup_on_exit() | |
| raise | |