IDX-Predict / app.py
omniverse1's picture
Update app.py
c986f11 verified
import gradio as gr
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import yfinance as yf
import torch
from chronos import ChronosPipeline
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import plotly.express as px
from typing import Dict, List, Tuple, Optional, Union
import json
import spaces
import gc
import pytz
import time
import random
from scipy import stats
from scipy.optimize import minimize
import warnings
import threading
from dataclasses import dataclass
from transformers import GenerationConfig
warnings.filterwarnings('ignore')
# Additional imports for advanced features
try:
from hmmlearn import hmm
HMM_AVAILABLE = True
except ImportError:
HMM_AVAILABLE = False
print("Warning: hmmlearn not available. Regime detection will use simplified methods.")
try:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.metrics import mean_squared_error, mean_absolute_error
ENSEMBLE_AVAILABLE = True
except ImportError:
ENSEMBLE_AVAILABLE = False
print("Warning: scikit-learn not available. Ensemble methods will be simplified.")
# Additional imports for enhanced features
try:
import requests
import re
from textblob import TextBlob
SENTIMENT_AVAILABLE = True
except ImportError:
SENTIMENT_AVAILABLE = False
print("Warning: sentiment analysis not available.")
try:
from arch import arch_model
GARCH_AVAILABLE = True
except ImportError:
GARCH_AVAILABLE = False
print("Warning: arch not available. GARCH modeling will be simplified.")
# Market status management
@dataclass
class MarketStatus:
"""Data class to hold market status information"""
is_open: bool
status_text: str
next_trading_day: str
last_updated: str
time_until_open: str
time_until_close: str
current_time_et: str
market_name: str
market_type: str
market_symbol: str
# Global configuration for market status updates
MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes
# Market configurations for different exchanges
MARKET_CONFIGS = {
'IDX_STOCKS': {
'name': 'Indonesia Stock Exchange (IDX)',
'symbol': '^JKSE',
'type': 'stocks',
'timezone': 'Asia/Jakarta',
'open_time': '09:00',
'close_time': '15:00',
'days': [0, 1, 2, 3, 4], # Monday - Friday
'description': 'Indonesia Stock Exchange Stock Market'
},
'GOLD': {
'name': 'Gold (XAU/USD)',
'symbol': 'XAUUSD=X',
'type': 'commodities',
'timezone': 'UTC',
'open_time': '00:00',
'close_time': '23:59',
'days': [0, 1, 2, 3, 4, 5, 6], # Market 24/7
'description': 'Gold Market'
},
'CRYPTO': {
'name': 'Bitcoin (BTC/USD)',
'symbol': 'BTC-USD',
'type': 'crypto',
'timezone': 'UTC',
'open_time': '00:00',
'close_time': '23:59',
'days': [0, 1, 2, 3, 4, 5, 6], # Pasar 24/7
'description': 'Bitcoin to USD'
}
}
class MarketStatusManager:
"""Manages market status with periodic updates for multiple markets"""
def __init__(self):
self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds
self._statuses = {}
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._update_thread = None
self._start_update_thread()
def _get_current_market_status(self, market_key: str = 'US_STOCKS') -> MarketStatus:
"""Get current market status with detailed information for specific market"""
config = MARKET_CONFIGS[market_key]
now = datetime.now()
# Convert to market timezone
market_tz = pytz.timezone(config['timezone'])
market_time = now.astimezone(market_tz)
# Parse market hours
open_hour, open_minute = map(int, config['open_time'].split(':'))
close_hour, close_minute = map(int, config['close_time'].split(':'))
# Check if it's a trading day
is_trading_day = market_time.weekday() in config['days']
# Create market open/close times
market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0)
market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0)
# Handle 24/7 markets and overnight sessions
if config['type'] in ['futures', 'forex', 'crypto', 'commodities']:
# 24/7 markets are always considered open
is_open = True
status_text = f"{config['name']} is currently open (24/7)"
else:
# Traditional markets
if not is_trading_day:
is_open = False
status_text = f"{config['name']} is closed (Weekend)"
elif market_open_time <= market_time <= market_close_time:
is_open = True
status_text = f"{config['name']} is currently open"
else:
is_open = False
if market_time < market_open_time:
status_text = f"{config['name']} is closed (Before opening)"
else:
status_text = f"{config['name']} is closed (After closing)"
# Calculate next trading day
next_trading_day = self._get_next_trading_day(market_time, config['days'])
# Calculate time until open/close
time_until_open = self._get_time_until_open(market_time, market_open_time, config)
time_until_close = self._get_time_until_close(market_time, market_close_time, config)
return MarketStatus(
is_open=is_open,
status_text=status_text,
next_trading_day=next_trading_day.strftime('%Y-%m-%d'),
last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'),
time_until_open=time_until_open,
time_until_close=time_until_close,
current_time_et=market_time.strftime('%H:%M:%S %Z'),
market_name=config['name'],
market_type=config['type'],
market_symbol=config['symbol']
)
def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime:
"""Get the next trading day for a specific market"""
next_day = current_time + timedelta(days=1)
# Skip non-trading days
while next_day.weekday() not in trading_days:
next_day += timedelta(days=1)
return next_day
def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str:
"""Calculate time until market opens"""
if config['type'] in ['futures', 'forex', 'crypto', 'commodities']:
return "N/A (24/7 Market)"
if current_time.weekday() not in config['days']:
# Weekend - calculate to next trading day
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
else:
if current_time < market_open_time:
time_diff = market_open_time - current_time
else:
# Market already opened today, next opening is tomorrow
tomorrow = current_time + timedelta(days=1)
if tomorrow.weekday() in config['days']:
next_open = tomorrow.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
else:
# Next day is weekend, calculate to next trading day
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
return self._format_time_delta(time_diff)
def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str:
"""Calculate time until market closes"""
if config['type'] in ['futures', 'forex', 'crypto', 'commodities']:
return "N/A (24/7 Market)"
if current_time.weekday() not in config['days']:
return "N/A (Weekend)"
if current_time < market_close_time:
time_diff = market_close_time - current_time
return self._format_time_delta(time_diff)
else:
return "Market closed for today"
def _format_time_delta(self, time_diff: timedelta) -> str:
"""Format timedelta into human-readable string"""
total_seconds = int(time_diff.total_seconds())
if total_seconds < 0:
return "N/A"
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _update_loop(self):
"""Background thread loop for updating market status"""
while not self._stop_event.is_set():
try:
new_statuses = {}
for market_key in MARKET_CONFIGS.keys():
new_statuses[market_key] = self._get_current_market_status(market_key)
with self._lock:
self._statuses = new_statuses
time.sleep(self.update_interval)
except Exception as e:
print(f"Error updating market status: {str(e)}")
time.sleep(60) # Wait 1 minute before retrying
def _start_update_thread(self):
"""Start the background update thread"""
if self._update_thread is None or not self._update_thread.is_alive():
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def get_status(self, market_key: str = 'US_STOCKS') -> MarketStatus:
"""Get current market status (thread-safe)"""
with self._lock:
if market_key not in self._statuses:
# Initialize if not exists
self._statuses[market_key] = self._get_current_market_status(market_key)
return self._statuses[market_key]
def get_all_markets_status(self) -> Dict[str, MarketStatus]:
"""Get status for all markets"""
with self._lock:
return self._statuses.copy()
def stop(self):
"""Stop the update thread"""
self._stop_event.set()
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join(timeout=5)
# Initialize global variables
pipeline = None
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler.fit_transform([[-1, 1]])
# Global market data cache
market_data_cache = {}
cache_expiry = {}
CACHE_DURATION = 3600 # 1 hour cache
# Initialize market status manager
market_status_manager = MarketStatusManager()
# Enhanced covariate data sources
COVARIATE_SOURCES = {
'market_indices': ['^JKSE'], # Hanya gunakan Indeks Harga Saham Gabungan (IHSG)
'commodities': ['GC=F', 'CL=F'], # Emas Futures, Minyak Mentah Futures
'currencies': ['IDR=X', 'USDJPY=X'] # Rupiah ke USD, USD ke JPY
}
# Economic indicators (using yfinance symbols for simplicity)
ECONOMIC_INDICATORS = {
'ihsg_volatility': '^JKSE',
'rupiah_vs_dollar': 'IDR=X',
'gold': 'GLD',
'oil': 'USO'
}
def retry_yfinance_request(func, max_retries=3, initial_delay=1):
"""
Retry mechanism for yfinance requests with exponential backoff up to 8 seconds.
Args:
func: Function to retry
max_retries: Maximum number of retry attempts
initial_delay: Initial delay in seconds before first retry
Returns:
Result of the function call if successful, or None if all attempts fail
"""
for attempt in range(max_retries):
try:
result = func()
# Check if result is None (common with yfinance for unavailable data)
if result is None:
if attempt == max_retries - 1:
print(f"Function returned None after {max_retries} attempts")
return None
else:
print(f"Function returned None (attempt {attempt + 1}/{max_retries}), retrying...")
time.sleep(initial_delay * (2 ** attempt))
continue
return result
except Exception as e:
error_str = str(e).lower()
# Check if this is the last attempt
if attempt == max_retries - 1:
print(f"Final attempt failed after {max_retries} retries: {str(e)}")
return None # Return None instead of raising to avoid crashes
# Determine delay based on error type and attempt number
if "401" in error_str or "unauthorized" in error_str:
# Authentication errors - longer delay
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 2))
print(f"Authentication error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
elif "429" in error_str or "rate limit" in error_str or "too many requests" in error_str:
# Rate limiting - longer delay
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(1, 3))
print(f"Rate limit error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
elif "500" in error_str or "502" in error_str or "503" in error_str or "504" in error_str:
# Server errors - moderate delay
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0.5, 1.5))
print(f"Server error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
elif "timeout" in error_str or "connection" in error_str:
# Network errors - shorter delay
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1))
print(f"Network error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
else:
# Generic errors - standard exponential backoff
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1))
print(f"Generic error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
time.sleep(delay)
def clear_gpu_memory():
"""Clear memory cache (CPU version)"""
# Skip GPU cleanup since we're using CPU
gc.collect()
# @spaces.GPU()
def load_pipeline():
"""Load the Chronos model preferring CPU over GPU"""
global pipeline
try:
if pipeline is None:
print("Loading Chronos-Bolt model on CPU...")
# Force CPU usage
device = "cpu"
# Try different loading approaches for compatibility
try:
# First attempt: Basic loading without problematic parameters
pipeline = ChronosPipeline.from_pretrained(
"amazon/chronos-bolt-base",
device_map={"": device}, # Force CPU device
torch_dtype=torch.float32, # Use float32 for better compatibility
trust_remote_code=True
)
except Exception as e1:
print(f"First loading attempt failed: {str(e1)}")
try:
# Second attempt: Minimal configuration
pipeline = ChronosPipeline.from_pretrained(
"amazon/chronos-bolt-base"
)
except Exception as e2:
print(f"Second loading attempt failed: {str(e2)}")
try:
# Third attempt: Try alternative model name
pipeline = ChronosPipeline.from_pretrained(
"amazon/chronos-t5-small"
)
print("Using chronos-t5-small as fallback")
except Exception as e3:
print(f"All loading attempts failed:")
print(f" Attempt 1: {str(e1)}")
print(f" Attempt 2: {str(e2)}")
print(f" Attempt 3: {str(e3)}")
raise RuntimeError(f"All Chronos model loading attempts failed")
# Set model to evaluation mode
if hasattr(pipeline, 'model'):
pipeline.model = pipeline.model.eval()
# Disable gradient computation
for param in pipeline.model.parameters():
param.requires_grad = False
print("Chronos model loaded successfully")
return pipeline
except Exception as e:
print(f"Error loading pipeline: {str(e)}")
print(f"Error type: {type(e)}")
print(f"Error details: {str(e)}")
raise RuntimeError(f"Failed to load Chronos model: {str(e)}")
def is_market_open() -> bool:
"""Check if the US stock market is currently open (legacy function for backward compatibility)"""
return market_status_manager.get_status('US_STOCKS').is_open
def get_next_trading_day() -> datetime:
"""Get the next trading day for US stocks (legacy function for backward compatibility)"""
next_day_str = market_status_manager.get_status('US_STOCKS').next_trading_day
return datetime.strptime(next_day_str, '%Y-%m-%d')
def get_market_status_display() -> str:
"""Get formatted market status for display with multiple markets"""
all_statuses = market_status_manager.get_all_markets_status()
if not all_statuses:
# Fallback to US stocks only
status = market_status_manager.get_status('US_STOCKS')
return _format_single_market_status(status)
# Create comprehensive market status display
status_message = "## 🌍 Global Market Status\n\n"
# Group markets by type
market_groups = {
'stocks': ['US_STOCKS', 'EUROPE', 'ASIA'],
'24/7': ['FOREX', 'CRYPTO', 'US_FUTURES', 'COMMODITIES']
}
for group_name, market_keys in market_groups.items():
status_message += f"### {group_name.upper()} MARKETS\n\n"
for market_key in market_keys:
if market_key in all_statuses:
status = all_statuses[market_key]
status_message += _format_market_status_line(status)
status_message += "\n"
# Add summary
open_markets = sum(1 for status in all_statuses.values() if status.is_open)
total_markets = len(all_statuses)
status_message += f"**Summary:** {open_markets}/{total_markets} markets currently open\n\n"
status_message += f"*Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*"
return status_message
def _format_single_market_status(status: MarketStatus) -> str:
"""Format single market status for display"""
status_icon = "🟢" if status.is_open else "🔴"
status_message = f"""
### {status_icon} {status.market_name}: {status.status_text}
**Current Time ({status.current_time_et.split()[-1]}):** {status.current_time_et}
**Next Trading Day:** {status.next_trading_day}
**Last Updated:** {status.last_updated}
"""
if status.is_open:
status_message += f"**Time Until Close:** {status.time_until_close}"
else:
status_message += f"**Time Until Open:** {status.time_until_open}"
return status_message
def _format_market_status_line(status: MarketStatus) -> str:
"""Format a single line for market status display"""
status_icon = "🟢" if status.is_open else "🔴"
market_type_icon = {
'stocks': '📈',
'futures': '📊',
'forex': '💱',
'crypto': '₿',
'commodities': '🏭'
}.get(status.market_type, '📊')
time_info = ""
if status.is_open:
if status.time_until_close != "N/A (24/7 Market)":
time_info = f" | Closes in: {status.time_until_close}"
else:
if status.time_until_open != "N/A (24/7 Market)":
time_info = f" | Opens in: {status.time_until_open}"
return f"{status_icon} {market_type_icon} **{status.market_name}** ({status.market_symbol}){time_info}\n"
def update_market_status() -> str:
"""Function to update market status display (called by Gradio every parameter)"""
return get_market_status_display()
def cleanup_on_exit():
"""Cleanup function to stop market status manager when application exits"""
try:
market_status_manager.stop()
print("Market status manager stopped successfully")
except Exception as e:
print(f"Error stopping market status manager: {str(e)}")
def get_historical_data(symbol: str, timeframe: str = "1d", lookback_days: int = 365) -> pd.DataFrame:
"""
Fetch historical data using yfinance with enhanced support for intraday data.
Uses recommended API methods for better reliability.
Args:
symbol (str): The stock symbol (e.g., 'AAPL')
timeframe (str): The timeframe for data ('1d', '1h', '15m')
lookback_days (int): Number of days to look back
Returns:
pd.DataFrame: Historical data with OHLCV and technical indicators
"""
try:
# Check if market is open for intraday data
if timeframe in ["1h", "15m"] and not is_market_open():
next_trading_day = get_next_trading_day()
raise Exception(f"Market is currently closed. Next trading day is {next_trading_day.strftime('%Y-%m-%d')}")
# Map timeframe to yfinance interval and adjust lookback period
tf_map = {
"1d": {"interval": "1d", "period": f"{lookback_days}d"},
"1h": {"interval": "1h", "period": f"{min(lookback_days * 24, 730)}h"}, # Max 730 hours (30 days)
"15m": {"interval": "15m", "period": f"{min(lookback_days * 96, 60)}d"} # Max 60 days for 15m
}
if timeframe not in tf_map:
raise ValueError(f"Unsupported timeframe: {timeframe}. Supported: {list(tf_map.keys())}")
interval_config = tf_map[timeframe]
# Create ticker object
ticker = yf.Ticker(symbol)
# Fetch historical data with retry mechanism
def fetch_history():
return ticker.history(
period=interval_config["period"],
interval=interval_config["interval"],
prepost=True,
actions=True,
auto_adjust=True,
back_adjust=True,
repair=True
)
df = retry_yfinance_request(fetch_history)
if df is None or df.empty:
raise Exception(f"No data returned for {symbol}")
# Validate data quality
if len(df) < 10:
raise Exception(f"Insufficient data for {symbol}: only {len(df)} data points")
# Check for missing values in critical columns
critical_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
missing_data = df[critical_columns].isnull().sum()
if missing_data.sum() > len(df) * 0.1: # More than 10% missing data
print(f"Warning: Significant missing data for {symbol}: {missing_data.to_dict()}")
# Fill any remaining missing values with forward fill then backward fill
df = df.fillna(method='ffill').fillna(method='bfill')
# Calculate returns and volatility
df['Returns'] = df['Close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=20).std()
# Calculate technical indicators
df = calculate_technical_indicators(df)
print(f"Successfully fetched {len(df)} data points for {symbol} ({timeframe})")
return df
except Exception as e:
print(f"Error fetching historical data for {symbol}: {str(e)}")
raise
def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series:
"""Calculate Relative Strength Index"""
# Handle None values by forward filling
prices = prices.ffill().bfill()
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]:
"""Calculate MACD and Signal line"""
# Handle None values by forward filling
prices = prices.ffill().bfill()
exp1 = prices.ewm(span=fast, adjust=False).mean()
exp2 = prices.ewm(span=slow, adjust=False).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal, adjust=False).mean()
return macd, signal_line
def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""Calculate Bollinger Bands"""
# Handle None values by forward filling
prices = prices.ffill().bfill()
middle_band = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
upper_band = middle_band + (std * std_dev)
lower_band = middle_band - (std * std_dev)
return upper_band, middle_band, lower_band
def predict_technical_indicators(df: pd.DataFrame, price_prediction: np.ndarray,
timeframe: str = "1d") -> Dict[str, np.ndarray]:
"""
Predict technical indicators based on price predictions.
Args:
df (pd.DataFrame): Historical data with technical indicators
price_prediction (np.ndarray): Predicted prices
timeframe (str): Data timeframe for scaling
Returns:
Dict[str, np.ndarray]: Predicted technical indicators
"""
try:
predictions = {}
# Get last values for initialization
last_rsi = df['RSI'].iloc[-1]
last_macd = df['MACD'].iloc[-1]
last_macd_signal = df['MACD_Signal'].iloc[-1]
last_bb_upper = df['BB_Upper'].iloc[-1] if 'BB_Upper' in df.columns else None
last_bb_middle = df['BB_Middle'].iloc[-1] if 'BB_Middle' in df.columns else None
last_bb_lower = df['BB_Lower'].iloc[-1] if 'BB_Lower' in df.columns else None
# Predict RSI
rsi_predictions = []
for i, pred_price in enumerate(price_prediction):
# RSI tends to mean-revert, so we use a simple mean reversion model
if last_rsi > 70: # Overbought
rsi_change = -0.5 * (i + 1) # Gradual decline
elif last_rsi < 30: # Oversold
rsi_change = 0.5 * (i + 1) # Gradual rise
else: # Neutral zone
rsi_change = 0.1 * np.random.normal(0, 1) # Small random change
predicted_rsi = max(0, min(100, last_rsi + rsi_change))
rsi_predictions.append(predicted_rsi)
predictions['RSI'] = np.array(rsi_predictions)
# Predict MACD
macd_predictions = []
macd_signal_predictions = []
# MACD follows price momentum
price_changes = np.diff(price_prediction, prepend=price_prediction[0])
for i, price_change in enumerate(price_changes):
# MACD change based on price momentum
macd_change = price_change / price_prediction[i] * 100 # Scale to MACD range
predicted_macd = last_macd + macd_change * 0.1 # Dampened change
# MACD signal line (slower moving average of MACD)
signal_change = macd_change * 0.05 # Even slower
predicted_signal = last_macd_signal + signal_change
macd_predictions.append(predicted_macd)
macd_signal_predictions.append(predicted_signal)
predictions['MACD'] = np.array(macd_predictions)
predictions['MACD_Signal'] = np.array(macd_signal_predictions)
# Predict Bollinger Bands if available
if all(x is not None for x in [last_bb_upper, last_bb_middle, last_bb_lower]):
bb_upper_predictions = []
bb_middle_predictions = []
bb_lower_predictions = []
# Calculate rolling statistics for Bollinger Bands
window_size = 20
for i in range(len(price_prediction)):
if i < window_size:
# Use historical data + predictions for calculation
window_prices = np.concatenate([df['Close'].values[-window_size+i:], price_prediction[:i+1]])
else:
# Use only predictions
window_prices = price_prediction[i-window_size+1:i+1]
# Calculate Bollinger Bands for this window
window_mean = np.mean(window_prices)
window_std = np.std(window_prices)
bb_middle = window_mean
bb_upper = window_mean + (window_std * 2)
bb_lower = window_mean - (window_std * 2)
bb_upper_predictions.append(bb_upper)
bb_middle_predictions.append(bb_middle)
bb_lower_predictions.append(bb_lower)
predictions['BB_Upper'] = np.array(bb_upper_predictions)
predictions['BB_Middle'] = np.array(bb_middle_predictions)
predictions['BB_Lower'] = np.array(bb_lower_predictions)
# Predict moving averages
sma_predictions = {}
for period in [20, 50, 200]:
if f'SMA_{period}' in df.columns:
last_sma = df[f'SMA_{period}'].iloc[-1]
sma_predictions[f'SMA_{period}'] = []
for i in range(len(price_prediction)):
# Simple moving average prediction
if i < period:
# Use historical data + predictions
window_prices = np.concatenate([df['Close'].values[-period+i+1:], price_prediction[:i+1]])
else:
# Use only predictions
window_prices = price_prediction[i-period+1:i+1]
predicted_sma = np.mean(window_prices)
sma_predictions[f'SMA_{period}'].append(predicted_sma)
predictions[f'SMA_{period}'] = np.array(sma_predictions[f'SMA_{period}'])
return predictions
except Exception as e:
print(f"Error predicting technical indicators: {str(e)}")
# Return empty predictions on error
return {}
def calculate_technical_uncertainty(df: pd.DataFrame, indicator_predictions: Dict[str, np.ndarray],
price_prediction: np.ndarray) -> Dict[str, np.ndarray]:
"""
Calculate uncertainty for technical indicator predictions.
Args:
df (pd.DataFrame): Historical data
indicator_predictions (Dict[str, np.ndarray]): Predicted indicators
price_prediction (np.ndarray): Predicted prices
Returns:
Dict[str, np.ndarray]: Uncertainty estimates for each indicator
"""
try:
uncertainties = {}
# Calculate historical volatility of each indicator
for indicator_name, predictions in indicator_predictions.items():
if indicator_name in df.columns:
# Calculate historical volatility
historical_values = df[indicator_name].dropna()
if len(historical_values) > 10:
volatility = historical_values.std()
# Scale uncertainty by prediction horizon
indicator_uncertainty = []
for i in range(len(predictions)):
# Uncertainty increases with prediction horizon
uncertainty = volatility * np.sqrt(i + 1)
indicator_uncertainty.append(uncertainty)
uncertainties[indicator_name] = np.array(indicator_uncertainty)
else:
# Use a default uncertainty if insufficient data
uncertainties[indicator_name] = np.array([0.1 * abs(predictions[0])] * len(predictions))
else:
# For new indicators, use a percentage of the prediction value
uncertainties[indicator_name] = np.array([0.05 * abs(pred) for pred in predictions])
return uncertainties
except Exception as e:
print(f"Error calculating technical uncertainty: {str(e)}")
return {}
# @spaces.GPU()
def make_prediction_enhanced(symbol: str, timeframe: str = "1d", prediction_days: int = 5, strategy: str = "chronos",
use_ensemble: bool = True, use_regime_detection: bool = True, use_stress_testing: bool = True,
risk_free_rate: float = 0.02, ensemble_weights: Dict = None,
market_index: str = "^GSPC", use_covariates: bool = True, use_sentiment: bool = True,
random_real_points: int = 4, use_smoothing: bool = True,
smoothing_type: str = "exponential", smoothing_window: int = 5,
smoothing_alpha: float = 0.3) -> Tuple[Dict, go.Figure]:
"""
Enhanced prediction using Chronos with covariate data, advanced uncertainty calculations, and improved algorithms.
Args:
symbol (str): Stock symbol
timeframe (str): Data timeframe ('1d', '1h', '15m')
prediction_days (int): Number of days to predict
strategy (str): Prediction strategy to use
use_ensemble (bool): Whether to use ensemble methods
use_regime_detection (bool): Whether to use regime detection
use_stress_testing (bool): Whether to perform stress testing
risk_free_rate (float): Risk-free rate for calculations
ensemble_weights (Dict): Weights for ensemble models
market_index (str): Market index for correlation analysis
use_covariates (bool): Whether to use covariate data
use_sentiment (bool): Whether to use sentiment analysis
random_real_points (int): Number of random real points to include in long-horizon context
use_smoothing (bool): Whether to apply smoothing to predictions
smoothing_type (str): Type of smoothing to apply ('exponential', 'moving_average', 'kalman', 'savitzky_golay', 'none')
Returns:
Tuple[Dict, go.Figure]: Trading signals and visualization plot
"""
try:
# Get historical data
df = get_historical_data(symbol, timeframe)
# Initialize variables that might not be set in all strategy paths
advanced_uncertainties = {}
volume_pred = None
volume_uncertainty = None
technical_predictions = {}
technical_uncertainties = {}
# Collect enhanced covariate data
covariate_data = {}
market_conditions = {}
if use_covariates:
print("Collecting enhanced covariate data...")
covariate_data = get_enhanced_covariate_data(symbol, timeframe, 365)
# Extract market conditions from covariate data
if 'economic_indicators' in covariate_data:
vix_data = covariate_data['economic_indicators'].get('volatility', None)
if vix_data is not None and len(vix_data) > 0:
market_conditions['vix'] = vix_data.iloc[-1]
# Calculate market sentiment
sentiment_data = {}
if use_sentiment:
print("Calculating market sentiment...")
sentiment_data = calculate_market_sentiment(symbol, 30)
# Detect market regime
regime_info = {}
if use_regime_detection:
print("Detecting market regime...")
returns = df['Close'].pct_change().dropna()
regime_info = detect_market_regime(returns)
if strategy == "chronos":
try:
# Prepare data for Chronos
prices = df['Close'].values
chronos_context_size = 64 # Chronos model's context window size (fixed at 64)
input_context_size = len(prices) # Available input data can be much larger
# Use a larger range for scaler fitting to get better normalization
scaler_range = min(input_context_size, chronos_context_size * 2) # Use up to 128 points for scaler
# Select the most recent chronos_context_size points for the model input
context_window = prices[-chronos_context_size:]
scaler = MinMaxScaler(feature_range=(-1, 1))
# Fit scaler on a larger range for better normalization
scaler.fit(prices[-scaler_range:].reshape(-1, 1))
normalized_prices = scaler.transform(context_window.reshape(-1, 1)).flatten()
# Ensure we have enough data points for Chronos
min_data_points = chronos_context_size
if len(normalized_prices) < min_data_points:
padding = np.full(min_data_points - len(normalized_prices), normalized_prices[-1])
normalized_prices = np.concatenate([padding, normalized_prices])
elif len(normalized_prices) > min_data_points:
normalized_prices = normalized_prices[-min_data_points:]
# Load pipeline and move to GPU
pipe = load_pipeline()
# Force CPU usage
device = torch.device("cpu")
dtype = torch.float32
print(f"Model device: {device}")
print(f"Model dtype: {dtype}")
# Convert to tensor and ensure proper shape and device
context = torch.tensor(normalized_prices, dtype=dtype, device=device)
# Validate context data
if torch.isnan(context).any() or torch.isinf(context).any():
print("Warning: Context contains NaN or Inf values, replacing with zeros")
context = torch.nan_to_num(context, nan=0.0, posinf=0.0, neginf=0.0)
# Ensure context is finite and reasonable
if torch.abs(context).max() > 1000:
print("Warning: Context values are very large, normalizing")
context = torch.clamp(context, -1000, 1000)
print(f"Context validation - Shape: {context.shape}, Min: {context.min():.4f}, Max: {context.max():.4f}, Mean: {context.mean():.4f}")
# Adjust prediction length based on timeframe
if timeframe == "1d":
max_prediction_length = chronos_context_size # 64 days
actual_prediction_length = min(prediction_days, max_prediction_length)
trim_length = prediction_days
elif timeframe == "1h":
max_prediction_length = chronos_context_size # 64 hours
actual_prediction_length = min(prediction_days * 24, max_prediction_length)
trim_length = prediction_days * 24
else: # 15m
max_prediction_length = chronos_context_size # 64 intervals
actual_prediction_length = min(prediction_days * 96, max_prediction_length)
trim_length = prediction_days * 96
# Ensure prediction length is valid (must be positive and reasonable)
actual_prediction_length = max(1, min(actual_prediction_length, 64))
print(f"Prediction length: {actual_prediction_length}")
print(f"Context length: {len(context)}")
# Use predict_quantiles with proper formatting (CPU only)
# No autocast for CPU to avoid issues
# Ensure all inputs are on CPU
context = context.to(device)
# Ensure context is properly shaped and on CPU
if len(context.shape) == 1:
context = context.unsqueeze(0)
context = context.to(device)
# Move model to evaluation mode
pipe.model.eval()
# Move all model parameters and buffers to GPU
for param in pipe.model.parameters():
param.data = param.data.to(device)
for buffer in pipe.model.buffers():
buffer.data = buffer.data.to(device)
# Move all model submodules to GPU
for module in pipe.model.modules():
if hasattr(module, 'to'):
module.to(device)
# Move all model attributes to GPU
for name, value in pipe.model.__dict__.items():
if isinstance(value, torch.Tensor):
pipe.model.__dict__[name] = value.to(device)
# Move all model config tensors to GPU
if hasattr(pipe.model, 'config'):
for key, value in pipe.model.config.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(pipe.model.config, key, value.to(device))
# Move all pipeline tensors to GPU
for name, value in pipe.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(pipe, name, value.to(device))
# Ensure all model states are on GPU
if hasattr(pipe.model, 'state_dict'):
state_dict = pipe.model.state_dict()
for key in state_dict:
if isinstance(state_dict[key], torch.Tensor):
state_dict[key] = state_dict[key].to(device)
pipe.model.load_state_dict(state_dict)
# Move any additional components to GPU
if hasattr(pipe, 'tokenizer'):
# Move tokenizer to GPU if it supports it
if hasattr(pipe.tokenizer, 'to'):
pipe.tokenizer = pipe.tokenizer.to(device)
# Move all tokenizer tensors to GPU
for name, value in pipe.tokenizer.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(pipe.tokenizer, name, value.to(device))
# Handle MeanScaleUniformBins specific attributes
if hasattr(pipe.tokenizer, 'bins'):
if isinstance(pipe.tokenizer.bins, torch.Tensor):
pipe.tokenizer.bins = pipe.tokenizer.bins.to(device)
if hasattr(pipe.tokenizer, 'scale'):
if isinstance(pipe.tokenizer.scale, torch.Tensor):
pipe.tokenizer.scale = pipe.tokenizer.scale.to(device)
if hasattr(pipe.tokenizer, 'mean'):
if isinstance(pipe.tokenizer.mean, torch.Tensor):
pipe.tokenizer.mean = pipe.tokenizer.mean.to(device)
# Move any additional tensors in the tokenizer's attributes to GPU
for name, value in pipe.tokenizer.__dict__.items():
if isinstance(value, torch.Tensor):
pipe.tokenizer.__dict__[name] = value.to(device)
# Remove the EOS token handling since MeanScaleUniformBins doesn't use it
if hasattr(pipe.tokenizer, '_append_eos_token'):
# Create a wrapper that just returns the input tensors
def wrapped_append_eos(token_ids, attention_mask):
return token_ids, attention_mask
pipe.tokenizer._append_eos_token = wrapped_append_eos
# Skip CUDA synchronization since we're using CPU
# torch.cuda.synchronize() # Commented out for CPU usage
# Ensure all model components are in eval mode
pipe.model.eval()
# Fix generation configuration to prevent min_length errors
if hasattr(pipe.model, 'config'):
# Ensure generation config is properly set
if hasattr(pipe.model.config, 'generation_config'):
# Reset generation config to safe defaults
pipe.model.config.generation_config.min_length = 0
pipe.model.config.generation_config.max_length = 512
pipe.model.config.generation_config.do_sample = False
pipe.model.config.generation_config.num_beams = 1
else:
# Create a safe generation config if it doesn't exist
pipe.model.config.generation_config = GenerationConfig(
min_length=0,
max_length=512,
do_sample=False,
num_beams=1
)
# Move any additional tensors in the model's config to GPU
if hasattr(pipe.model, 'config'):
for key, value in pipe.model.config.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(pipe.model.config, key, value.to(device))
# Move any additional tensors in the model's state dict to GPU
if hasattr(pipe.model, 'state_dict'):
state_dict = pipe.model.state_dict()
for key in state_dict:
if isinstance(state_dict[key], torch.Tensor):
state_dict[key] = state_dict[key].to(device)
pipe.model.load_state_dict(state_dict)
# Move any additional tensors in the model's buffers to GPU
for name, buffer in pipe.model.named_buffers():
if buffer is not None:
pipe.model.register_buffer(name, buffer.to(device))
# Move any additional tensors in the model's parameters to GPU
for name, param in pipe.model.named_parameters():
if param is not None:
param.data = param.data.to(device)
# Move any additional tensors in the model's attributes to GPU
for name, value in pipe.model.__dict__.items():
if isinstance(value, torch.Tensor):
pipe.model.__dict__[name] = value.to(device)
# Move any additional tensors in the model's modules to GPU
for name, module in pipe.model.named_modules():
if hasattr(module, 'to'):
module.to(device)
# Move any tensors in the module's __dict__
for key, value in module.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(module, key, value.to(device))
# Skip CUDA synchronization since we're using CPU
# torch.cuda.synchronize() # Commented out for CPU usage
# Ensure tokenizer is on GPU and all its tensors are on GPU
if hasattr(pipe, 'tokenizer'):
# Move tokenizer to GPU if it supports it
if hasattr(pipe.tokenizer, 'to'):
pipe.tokenizer = pipe.tokenizer.to(device)
# Move all tokenizer tensors to GPU
for name, value in pipe.tokenizer.__dict__.items():
if isinstance(value, torch.Tensor):
setattr(pipe.tokenizer, name, value.to(device))
# Handle MeanScaleUniformBins specific attributes
if hasattr(pipe.tokenizer, 'bins'):
if isinstance(pipe.tokenizer.bins, torch.Tensor):
pipe.tokenizer.bins = pipe.tokenizer.bins.to(device)
if hasattr(pipe.tokenizer, 'scale'):
if isinstance(pipe.tokenizer.scale, torch.Tensor):
pipe.tokenizer.scale = pipe.tokenizer.scale.to(device)
if hasattr(pipe.tokenizer, 'mean'):
if isinstance(pipe.tokenizer.mean, torch.Tensor):
pipe.tokenizer.mean = pipe.tokenizer.mean.to(device)
# Move any additional tensors in the tokenizer's attributes to GPU
for name, value in pipe.tokenizer.__dict__.items():
if isinstance(value, torch.Tensor):
pipe.tokenizer.__dict__[name] = value.to(device)
# Skip CUDA synchronization since we're using CPU
# torch.cuda.synchronize() # Commented out for CPU usage
# Make prediction with proper parameters
# Use the standard quantile levels as per Chronos documentation
try:
quantiles, mean = pipe.predict_quantiles(
context=context,
prediction_length=actual_prediction_length,
quantile_levels=[0.1, 0.5, 0.9]
)
except Exception as prediction_error:
print(f"Chronos prediction failed: {str(prediction_error)}")
print(f"Context shape: {context.shape}")
print(f"Context dtype: {context.dtype}")
print(f"Context device: {context.device}")
print(f"Prediction length: {actual_prediction_length}")
print(f"Model device: {next(pipe.model.parameters()).device}")
print(f"Model dtype: {next(pipe.model.parameters()).dtype}")
# Try with a smaller prediction length as fallback
if actual_prediction_length > 1:
print(f"Retrying with prediction length 1...")
actual_prediction_length = 1
quantiles, mean = pipe.predict_quantiles(
context=context,
prediction_length=actual_prediction_length,
quantile_levels=[0.1, 0.5, 0.9]
)
else:
raise prediction_error
if quantiles is None or mean is None or len(quantiles) == 0 or len(mean) == 0:
raise ValueError("Chronos returned empty prediction")
print(f"Quantiles shape: {quantiles.shape}, Mean shape: {mean.shape}")
# Convert to numpy arrays
quantiles = quantiles.detach().cpu().numpy()
mean = mean.detach().cpu().numpy()
# Denormalize predictions using the same scaler as context
mean_pred = scaler.inverse_transform(mean.reshape(-1, 1)).flatten()
lower_bound = scaler.inverse_transform(quantiles[0, :, 0].reshape(-1, 1)).flatten()
upper_bound = scaler.inverse_transform(quantiles[0, :, 2].reshape(-1, 1)).flatten()
# Calculate uncertainty using advanced methods
historical_volatility = df['Volatility'].iloc[-1]
advanced_uncertainties = calculate_advanced_uncertainty(
quantiles, historical_volatility, market_conditions
)
std_pred = advanced_uncertainties.get('ensemble',
(upper_bound - lower_bound) / (2 * 1.645))
# Apply continuity correction
last_actual = df['Close'].iloc[-1]
first_pred = mean_pred[0]
discontinuity_threshold = max(1e-6, 0.02 * abs(last_actual)) # 2% threshold
if abs(first_pred - last_actual) > discontinuity_threshold:
print(f"Warning: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})")
print(f"Discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)")
# Apply improved continuity correction
if len(mean_pred) > 1:
# Calculate the overall trend from the original predictions
original_trend = mean_pred[-1] - first_pred
total_steps = len(mean_pred) - 1
# Calculate the desired trend per step to reach the final prediction
if total_steps > 0:
trend_per_step = original_trend / total_steps
else:
trend_per_step = 0
# Apply smooth transition starting from last actual
# Use a gradual transition that preserves the overall trend
transition_length = min(5, len(mean_pred)) # Longer transition for smoother curve
for i in range(transition_length):
if i == 0:
# First prediction should be very close to last actual
mean_pred[i] = last_actual + trend_per_step * 0.1
else:
# Gradually increase the trend contribution
transition_factor = min(1.0, i / transition_length)
trend_contribution = trend_per_step * i * transition_factor
mean_pred[i] = last_actual + trend_contribution
# For remaining predictions, maintain the original relative differences
if len(mean_pred) > transition_length:
# Calculate the scale factor to maintain relative relationships
original_diff = mean_pred[transition_length] - mean_pred[transition_length-1]
if original_diff != 0:
# Scale the remaining predictions to maintain continuity
for i in range(transition_length, len(mean_pred)):
if i == transition_length:
# Ensure smooth transition at the boundary
mean_pred[i] = mean_pred[i-1] + original_diff * 0.5
else:
# Maintain the original relative differences
original_diff_i = mean_pred[i] - mean_pred[i-1]
mean_pred[i] = mean_pred[i-1] + original_diff_i
print(f"Applied continuity correction: First prediction adjusted from {first_pred:.4f} to {mean_pred[0]:.4f}")
# Apply financial smoothing if enabled
if use_smoothing:
mean_pred = apply_financial_smoothing(mean_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing)
else:
# Single prediction case - set to last actual
mean_pred[0] = last_actual
print(f"Single prediction case: Set to last actual value {last_actual:.4f}")
# If we had to limit the prediction length, extend the prediction recursively
if actual_prediction_length < trim_length:
extended_mean_pred = mean_pred.copy()
extended_std_pred = std_pred.copy()
# Store the original scaler for consistency
original_scaler = scaler
# Calculate the number of extension steps needed
remaining_steps = trim_length - actual_prediction_length
steps_needed = (remaining_steps + actual_prediction_length - 1) // actual_prediction_length
for step in range(steps_needed):
# Use all available datapoints for context, including predictions
# This allows the model to build upon its own predictions for better long-horizon forecasting
all_available_data = np.concatenate([prices, extended_mean_pred])
# If we have more data than chronos_context_size, use the most recent chronos_context_size points
# Otherwise, use all available data (this allows for longer context when available)
if len(all_available_data) > chronos_context_size:
context_window = all_available_data[-chronos_context_size:]
else:
context_window = all_available_data
# Use the original scaler to maintain consistency - fit on historical data only
# but transform the combined context window
normalized_context = original_scaler.transform(context_window.reshape(-1, 1)).flatten()
context = torch.tensor(normalized_context, dtype=dtype, device=device)
if len(context.shape) == 1:
context = context.unsqueeze(0)
next_length = min(max_prediction_length, remaining_steps)
# Ensure next_length is valid (must be positive and reasonable)
next_length = max(1, min(next_length, 64))
# No autocast for CPU
try:
next_quantiles, next_mean = pipe.predict_quantiles(
context=context,
prediction_length=next_length,
quantile_levels=[0.1, 0.5, 0.9]
)
except Exception as extension_error:
print(f"Chronos extension prediction failed: {str(extension_error)}")
print(f"Extension context shape: {context.shape}")
print(f"Extension prediction length: {next_length}")
# Try with a smaller prediction length as fallback
if next_length > 1:
print(f"Retrying extension with prediction length 1...")
next_length = 1
next_quantiles, next_mean = pipe.predict_quantiles(
context=context,
prediction_length=next_length,
quantile_levels=[0.1, 0.5, 0.9]
)
else:
raise extension_error
# Convert predictions to numpy and denormalize using original scaler
next_mean = next_mean.detach().cpu().numpy()
next_quantiles = next_quantiles.detach().cpu().numpy()
# Denormalize predictions using the original scaler
next_mean_pred = original_scaler.inverse_transform(next_mean.reshape(-1, 1)).flatten()
# Calculate uncertainty for extended predictions
next_uncertainties = calculate_advanced_uncertainty(
next_quantiles, historical_volatility, market_conditions
)
next_std_pred = next_uncertainties.get('ensemble',
(next_quantiles[0, :, 2] - next_quantiles[0, :, 0]) / (2 * 1.645))
# Check for discontinuity and apply continuity correction
if abs(next_mean_pred[0] - extended_mean_pred[-1]) > max(1e-6, 0.02 * abs(extended_mean_pred[-1])):
print(f"Warning: Discontinuity detected between last prediction ({extended_mean_pred[-1]:.4f}) and next prediction ({next_mean_pred[0]:.4f})")
print(f"Extension discontinuity magnitude: {abs(next_mean_pred[0] - extended_mean_pred[-1]):.4f}")
# Apply improved continuity correction for extensions
if len(next_mean_pred) > 1:
# Calculate the overall trend from the original predictions
original_trend = next_mean_pred[-1] - next_mean_pred[0]
total_steps = len(next_mean_pred) - 1
# Calculate the desired trend per step
if total_steps > 0:
trend_per_step = original_trend / total_steps
else:
trend_per_step = 0
# Apply smooth transition starting from last extended prediction
transition_length = min(5, len(next_mean_pred)) # Longer transition for smoother curve
for i in range(transition_length):
if i == 0:
# First prediction should be very close to last extended prediction
next_mean_pred[i] = extended_mean_pred[-1] + trend_per_step * 0.1
else:
# Gradually increase the trend contribution
transition_factor = min(1.0, i / transition_length)
trend_contribution = trend_per_step * i * transition_factor
next_mean_pred[i] = extended_mean_pred[-1] + trend_contribution
# For remaining predictions, maintain the original relative differences
if len(next_mean_pred) > transition_length:
# Calculate the scale factor to maintain relative relationships
original_diff = next_mean_pred[transition_length] - next_mean_pred[transition_length-1]
if original_diff != 0:
# Scale the remaining predictions to maintain continuity
for i in range(transition_length, len(next_mean_pred)):
if i == transition_length:
# Ensure smooth transition at the boundary
next_mean_pred[i] = next_mean_pred[i-1] + original_diff * 0.5
else:
# Maintain the original relative differences
original_diff_i = next_mean_pred[i] - next_mean_pred[i-1]
next_mean_pred[i] = next_mean_pred[i-1] + original_diff_i
print(f"Applied extension continuity correction: First extension prediction adjusted from {next_mean_pred[0]:.4f} to {next_mean_pred[0]:.4f}")
else:
# Single prediction case - set to last extended prediction
next_mean_pred[0] = extended_mean_pred[-1]
print(f"Single extension prediction case: Set to last extended prediction value {extended_mean_pred[-1]:.4f}")
# Apply financial smoothing if enabled
if use_smoothing and len(next_mean_pred) > 1:
next_mean_pred = apply_financial_smoothing(next_mean_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing)
# Append predictions
extended_mean_pred = np.concatenate([extended_mean_pred, next_mean_pred])
extended_std_pred = np.concatenate([extended_std_pred, next_std_pred])
remaining_steps -= len(next_mean_pred)
if remaining_steps <= 0:
break
# Trim to exact prediction length if needed
mean_pred = extended_mean_pred[:trim_length]
std_pred = extended_std_pred[:trim_length]
# Enhanced volume prediction
volume_pred, volume_uncertainty = calculate_volume_prediction_enhanced(
df, mean_pred, covariate_data
)
# Ensure volume prediction is properly handled
if volume_pred is None or len(volume_pred) == 0:
print("Warning: Volume prediction failed, using fallback")
volume_pred = np.full(len(mean_pred), df['Volume'].iloc[-1])
volume_uncertainty = np.full(len(mean_pred), df['Volume'].iloc[-1] * 0.2)
elif len(volume_pred) != len(mean_pred):
print(f"Warning: Volume prediction length mismatch. Expected {len(mean_pred)}, got {len(volume_pred)}")
# Pad or truncate to match
if len(volume_pred) < len(mean_pred):
last_vol = volume_pred[-1] if len(volume_pred) > 0 else df['Volume'].iloc[-1]
volume_pred = np.pad(volume_pred, (0, len(mean_pred) - len(volume_pred)),
mode='constant', constant_values=last_vol)
volume_uncertainty = np.pad(volume_uncertainty, (0, len(mean_pred) - len(volume_uncertainty)),
mode='constant', constant_values=volume_uncertainty[-1] if len(volume_uncertainty) > 0 else df['Volume'].iloc[-1] * 0.2)
else:
volume_pred = volume_pred[:len(mean_pred)]
volume_uncertainty = volume_uncertainty[:len(mean_pred)]
# Predict technical indicators
print("Predicting technical indicators...")
technical_predictions = predict_technical_indicators(df, mean_pred, timeframe)
technical_uncertainties = calculate_technical_uncertainty(df, technical_predictions, mean_pred)
# Create ensemble prediction if enabled
ensemble_pred = np.array([])
ensemble_uncertainty = np.array([])
if use_ensemble and covariate_data:
print("Creating enhanced ensemble model...")
ensemble_pred, ensemble_uncertainty = create_enhanced_ensemble_model(
df, covariate_data, trim_length
)
# Combine Chronos and ensemble predictions
if len(ensemble_pred) > 0:
# Weighted combination
chronos_weight = 0.7
ensemble_weight = 0.3
final_pred = chronos_weight * mean_pred + ensemble_weight * ensemble_pred
final_uncertainty = np.sqrt(
(chronos_weight * std_pred)**2 + (ensemble_weight * ensemble_uncertainty)**2
)
else:
final_pred = mean_pred
final_uncertainty = std_pred
# Final continuity validation and correction
print("Performing final continuity validation...")
last_actual = df['Close'].iloc[-1]
first_pred = final_pred[0]
discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check
if abs(first_pred - last_actual) > discontinuity_threshold:
print(f"Final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})")
print(f"Final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)")
# Apply final continuity correction
if len(final_pred) > 1:
# Calculate the overall trend
overall_trend = final_pred[-1] - first_pred
total_steps = len(final_pred) - 1
if total_steps > 0:
trend_per_step = overall_trend / total_steps
else:
trend_per_step = 0
# Apply very smooth transition
transition_length = min(8, len(final_pred)) # Longer transition for final correction
for i in range(transition_length):
if i == 0:
# First prediction should be extremely close to last actual
final_pred[i] = last_actual + trend_per_step * 0.05
else:
# Very gradual transition
transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing
trend_contribution = trend_per_step * i * transition_factor
final_pred[i] = last_actual + trend_contribution
# Maintain relative relationships for remaining predictions
if len(final_pred) > transition_length:
for i in range(transition_length, len(final_pred)):
if i == transition_length:
# Smooth transition at boundary
final_pred[i] = final_pred[i-1] + trend_per_step * 0.8
else:
# Maintain original relative differences
original_diff = final_pred[i] - final_pred[i-1]
final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened
print(f"Final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}")
# Apply additional smoothing for final correction
if use_smoothing:
final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha * 0.5, 3, use_smoothing)
else:
# Single prediction case
final_pred[0] = last_actual
print(f"Final single prediction correction: Set to last actual value {last_actual:.4f}")
# Verify final continuity
final_first_pred = final_pred[0]
final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100
print(f"Final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)")
if final_discontinuity <= 1.0:
print("✓ Continuity validation passed - predictions are smooth")
else:
print(f"⚠ Continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains")
except Exception as e:
print(f"Chronos prediction error: {str(e)}")
raise
elif strategy == "technical":
# Technical analysis fallback strategy
print("Using technical analysis strategy...")
try:
# Use the same MinMaxScaler for consistency
prices = df['Close'].values
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler.fit(prices.reshape(-1, 1))
# Calculate technical indicators for prediction
last_price = df['Close'].iloc[-1]
last_rsi = df['RSI'].iloc[-1]
last_macd = df['MACD'].iloc[-1]
last_macd_signal = df['MACD_Signal'].iloc[-1]
last_volatility = df['Volatility'].iloc[-1]
# Get trend direction from technical indicators
rsi_trend = 1 if last_rsi > 50 else -1
macd_trend = 1 if last_macd > last_macd_signal else -1
# Calculate momentum and mean reversion factors
sma_20 = df['SMA_20'].iloc[-1] if 'SMA_20' in df.columns else last_price
sma_50 = df['SMA_50'].iloc[-1] if 'SMA_50' in df.columns else last_price
sma_200 = df['SMA_200'].iloc[-1] if 'SMA_200' in df.columns else last_price
# Mean reversion factor (distance from long-term average)
mean_reversion = (sma_200 - last_price) / last_price
# Momentum factor (short vs long-term trend)
momentum = (sma_20 - sma_50) / sma_50
# Combine technical signals
technical_score = (rsi_trend * 0.3 + macd_trend * 0.3 +
np.sign(momentum) * 0.2 + np.sign(mean_reversion) * 0.2)
# Generate price predictions
final_pred = []
final_uncertainty = []
for i in range(1, prediction_days + 1):
# Base prediction with trend and mean reversion
trend_factor = technical_score * last_volatility * 0.5
mean_reversion_factor = mean_reversion * 0.1 * i
momentum_factor = momentum * 0.05 * i
# Combine factors with decay
prediction = last_price * (1 + trend_factor + mean_reversion_factor + momentum_factor)
final_pred.append(prediction)
# Uncertainty based on volatility and prediction horizon
uncertainty = last_volatility * last_price * np.sqrt(i)
final_uncertainty.append(uncertainty)
final_pred = np.array(final_pred)
final_uncertainty = np.array(final_uncertainty)
# Apply smoothing if enabled
if use_smoothing:
final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha, 3, use_smoothing)
# Enhanced volume prediction
volume_pred, volume_uncertainty = calculate_volume_prediction_enhanced(
df, final_pred, covariate_data
)
# Ensure volume prediction is properly handled
if volume_pred is None or len(volume_pred) == 0:
print("Warning: Volume prediction failed, using fallback")
volume_pred = np.full(len(final_pred), df['Volume'].iloc[-1])
volume_uncertainty = np.full(len(final_pred), df['Volume'].iloc[-1] * 0.2)
elif len(volume_pred) != len(final_pred):
print(f"Warning: Volume prediction length mismatch. Expected {len(final_pred)}, got {len(volume_pred)}")
# Pad or truncate to match
if len(volume_pred) < len(final_pred):
last_vol = volume_pred[-1] if len(volume_pred) > 0 else df['Volume'].iloc[-1]
volume_pred = np.pad(volume_pred, (0, len(final_pred) - len(volume_pred)),
mode='constant', constant_values=last_vol)
volume_uncertainty = np.pad(volume_uncertainty, (0, len(final_pred) - len(volume_uncertainty)),
mode='constant', constant_values=volume_uncertainty[-1] if len(volume_uncertainty) > 0 else df['Volume'].iloc[-1] * 0.2)
else:
volume_pred = volume_pred[:len(final_pred)]
volume_uncertainty = volume_uncertainty[:len(final_pred)]
# Predict technical indicators
print("Predicting technical indicators for technical strategy...")
technical_predictions = predict_technical_indicators(df, final_pred, timeframe)
technical_uncertainties = calculate_technical_uncertainty(df, technical_predictions, final_pred)
# Create ensemble prediction if enabled
ensemble_pred = np.array([])
ensemble_uncertainty = np.array([])
if use_ensemble and covariate_data:
print("Creating enhanced ensemble model for technical strategy...")
ensemble_pred, ensemble_uncertainty = create_enhanced_ensemble_model(
df, covariate_data, prediction_days
)
# Combine technical and ensemble predictions
if len(ensemble_pred) > 0:
technical_weight = 0.7
ensemble_weight = 0.3
final_pred = technical_weight * final_pred + ensemble_weight * ensemble_pred
final_uncertainty = np.sqrt(
(technical_weight * final_uncertainty)**2 + (ensemble_weight * ensemble_uncertainty)**2
)
# Calculate advanced uncertainties for technical strategy
historical_volatility = df['Volatility'].iloc[-1]
# Create dummy quantiles for technical strategy (since we don't have quantiles from Chronos)
dummy_quantiles = np.array([[
[final_pred[i] - 2 * final_uncertainty[i], final_pred[i], final_pred[i] + 2 * final_uncertainty[i]]
for i in range(len(final_pred))
]])
advanced_uncertainties = calculate_advanced_uncertainty(
dummy_quantiles, historical_volatility, market_conditions
)
print(f"Technical strategy completed: {len(final_pred)} predictions generated")
# Final continuity validation for technical strategy
print("Performing final continuity validation for technical strategy...")
last_actual = df['Close'].iloc[-1]
first_pred = final_pred[0]
discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check
if abs(first_pred - last_actual) > discontinuity_threshold:
print(f"Technical strategy final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})")
print(f"Technical strategy final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)")
# Apply final continuity correction for technical strategy
if len(final_pred) > 1:
# Calculate the overall trend
overall_trend = final_pred[-1] - first_pred
total_steps = len(final_pred) - 1
if total_steps > 0:
trend_per_step = overall_trend / total_steps
else:
trend_per_step = 0
# Apply very smooth transition
transition_length = min(8, len(final_pred)) # Longer transition for final correction
for i in range(transition_length):
if i == 0:
# First prediction should be extremely close to last actual
final_pred[i] = last_actual + trend_per_step * 0.05
else:
# Very gradual transition
transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing
trend_contribution = trend_per_step * i * transition_factor
final_pred[i] = last_actual + trend_contribution
# Maintain relative relationships for remaining predictions
if len(final_pred) > transition_length:
for i in range(transition_length, len(final_pred)):
if i == transition_length:
# Smooth transition at boundary
final_pred[i] = final_pred[i-1] + trend_per_step * 0.8
else:
# Maintain original relative differences
original_diff = final_pred[i] - final_pred[i-1]
final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened
print(f"Technical strategy final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}")
# Apply additional smoothing for final correction
if use_smoothing:
final_pred = apply_financial_smoothing(final_pred, smoothing_type, smoothing_window, smoothing_alpha * 0.5, 3, use_smoothing)
else:
# Single prediction case
final_pred[0] = last_actual
print(f"Technical strategy final single prediction correction: Set to last actual value {last_actual:.4f}")
# Verify final continuity for technical strategy
final_first_pred = final_pred[0]
final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100
print(f"Technical strategy final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)")
if final_discontinuity <= 1.0:
print("✓ Technical strategy continuity validation passed - predictions are smooth")
else:
print(f"⚠ Technical strategy continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains")
except Exception as e:
print(f"Technical strategy error: {str(e)}")
# Fallback to simple moving average prediction
print("Falling back to simple moving average prediction...")
try:
last_price = df['Close'].iloc[-1]
volatility = df['Volatility'].iloc[-1]
final_pred = np.array([last_price * (1 + 0.001 * i) for i in range(1, prediction_days + 1)])
final_uncertainty = np.array([volatility * last_price * np.sqrt(i) for i in range(1, prediction_days + 1)])
volume_pred = None
volume_uncertainty = None
ensemble_pred = np.array([])
ensemble_uncertainty = np.array([])
# Calculate advanced uncertainties for fallback case
dummy_quantiles = np.array([[
[final_pred[i] - 2 * final_uncertainty[i], final_pred[i], final_pred[i] + 2 * final_uncertainty[i]]
for i in range(len(final_pred))
]])
advanced_uncertainties = calculate_advanced_uncertainty(
dummy_quantiles, volatility, market_conditions
)
# Final continuity validation for fallback case
print("Performing final continuity validation for fallback prediction...")
last_actual = df['Close'].iloc[-1]
first_pred = final_pred[0]
discontinuity_threshold = max(1e-6, 0.01 * abs(last_actual)) # Stricter 1% threshold for final check
if abs(first_pred - last_actual) > discontinuity_threshold:
print(f"Fallback final check: Discontinuity detected between last actual ({last_actual:.4f}) and first prediction ({first_pred:.4f})")
print(f"Fallback final discontinuity magnitude: {abs(first_pred - last_actual):.4f} ({abs(first_pred - last_actual)/last_actual*100:.2f}%)")
# Apply final continuity correction for fallback case
if len(final_pred) > 1:
# Calculate the overall trend
overall_trend = final_pred[-1] - first_pred
total_steps = len(final_pred) - 1
if total_steps > 0:
trend_per_step = overall_trend / total_steps
else:
trend_per_step = 0
# Apply very smooth transition
transition_length = min(8, len(final_pred)) # Longer transition for final correction
for i in range(transition_length):
if i == 0:
# First prediction should be extremely close to last actual
final_pred[i] = last_actual + trend_per_step * 0.05
else:
# Very gradual transition
transition_factor = min(1.0, (i / transition_length) ** 2) # Quadratic easing
trend_contribution = trend_per_step * i * transition_factor
final_pred[i] = last_actual + trend_contribution
# Maintain relative relationships for remaining predictions
if len(final_pred) > transition_length:
for i in range(transition_length, len(final_pred)):
if i == transition_length:
# Smooth transition at boundary
final_pred[i] = final_pred[i-1] + trend_per_step * 0.8
else:
# Maintain original relative differences
original_diff = final_pred[i] - final_pred[i-1]
final_pred[i] = final_pred[i-1] + original_diff * 0.9 # Slightly dampened
print(f"Fallback final continuity correction applied: First prediction adjusted from {first_pred:.4f} to {final_pred[0]:.4f}")
else:
# Single prediction case
final_pred[0] = last_actual
print(f"Fallback final single prediction correction: Set to last actual value {last_actual:.4f}")
# Verify final continuity for fallback case
final_first_pred = final_pred[0]
final_discontinuity = abs(final_first_pred - last_actual) / last_actual * 100
print(f"Fallback final continuity check: Discontinuity = {final_discontinuity:.3f}% (threshold: 1.0%)")
if final_discontinuity <= 1.0:
print("✓ Fallback continuity validation passed - predictions are smooth")
else:
print(f"⚠ Fallback continuity validation warning - discontinuity of {final_discontinuity:.3f}% remains")
except Exception as fallback_error:
print(f"Fallback prediction error: {str(fallback_error)}")
raise
else:
raise ValueError(f"Unknown strategy: {strategy}. Supported strategies: 'chronos', 'technical'")
# Create prediction dates
last_date = df.index[-1]
if timeframe == "1d":
pred_dates = pd.date_range(start=last_date + timedelta(days=1), periods=prediction_days)
elif timeframe == "1h":
pred_dates = pd.date_range(start=last_date + timedelta(hours=1), periods=prediction_days * 24)
else: # 15m
pred_dates = pd.date_range(start=last_date + timedelta(minutes=15), periods=prediction_days * 96)
# Create enhanced visualization with uncertainties integrated
fig = make_subplots(rows=3, cols=1,
shared_xaxes=True,
vertical_spacing=0.12,
subplot_titles=('Price Prediction with Uncertainty', 'Technical Indicators with Uncertainty', 'Volume with Uncertainty'))
# Add historical price
fig.add_trace(
go.Scatter(x=df.index, y=df['Close'], name='Historical Price',
line=dict(color='blue', width=2)),
row=1, col=1
)
# Add predicted price with integrated uncertainty bands
fig.add_trace(
go.Scatter(x=pred_dates, y=final_pred, name='Predicted Price',
line=dict(color='red', width=3)),
row=1, col=1
)
# Add price uncertainty bands on the same subplot
if final_uncertainty is not None and len(final_uncertainty) > 0:
uncertainty_clean = np.array(final_uncertainty)
uncertainty_clean = np.where(np.isnan(uncertainty_clean), 0, uncertainty_clean)
# Create confidence bands
confidence_68 = uncertainty_clean # 1 standard deviation (68% confidence)
confidence_95 = 2 * uncertainty_clean # 2 standard deviations (95% confidence)
# Plot 95% confidence bands
fig.add_trace(
go.Scatter(x=pred_dates, y=final_pred + confidence_95, name='95% Confidence Upper',
line=dict(color='red', width=1, dash='dash'), showlegend=False),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=final_pred - confidence_95, name='95% Confidence Lower',
line=dict(color='red', width=1, dash='dash'), fill='tonexty',
fillcolor='rgba(255,0,0,0.1)', showlegend=False),
row=1, col=1
)
# Plot 68% confidence bands
fig.add_trace(
go.Scatter(x=pred_dates, y=final_pred + confidence_68, name='68% Confidence Upper',
line=dict(color='darkred', width=1, dash='dot'), showlegend=False),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=final_pred - confidence_68, name='68% Confidence Lower',
line=dict(color='darkred', width=1, dash='dot'), fill='tonexty',
fillcolor='rgba(139,0,0,0.1)', showlegend=False),
row=1, col=1
)
# Add Bollinger Bands if available (on price subplot)
if 'BB_Upper' in df.columns and 'BB_Middle' in df.columns and 'BB_Lower' in df.columns:
fig.add_trace(
go.Scatter(x=df.index, y=df['BB_Upper'], name='BB Upper (Historical)',
line=dict(color='gray', width=1, dash='dash')),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=df.index, y=df['BB_Middle'], name='BB Middle (Historical)',
line=dict(color='gray', width=1)),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=df.index, y=df['BB_Lower'], name='BB Lower (Historical)',
line=dict(color='gray', width=1, dash='dash')),
row=1, col=1
)
# Add predicted Bollinger Bands if available
if 'BB_Upper' in technical_predictions:
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['BB_Upper'], name='BB Upper (Predicted)',
line=dict(color='darkgray', width=2, dash='dash')),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['BB_Middle'], name='BB Middle (Predicted)',
line=dict(color='darkgray', width=2)),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['BB_Lower'], name='BB Lower (Predicted)',
line=dict(color='darkgray', width=2, dash='dash')),
row=1, col=1
)
# Add technical indicators with their uncertainties on the same subplot
# RSI with uncertainty
fig.add_trace(
go.Scatter(x=df.index, y=df['RSI'], name='RSI (Historical)',
line=dict(color='purple', width=1)),
row=2, col=1
)
if 'RSI' in technical_predictions:
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['RSI'], name='RSI (Predicted)',
line=dict(color='purple', width=2, dash='dash')),
row=2, col=1
)
# Add RSI uncertainty bands
if 'RSI' in technical_uncertainties:
rsi_upper = technical_predictions['RSI'] + 2 * technical_uncertainties['RSI']
rsi_lower = technical_predictions['RSI'] - 2 * technical_uncertainties['RSI']
fig.add_trace(
go.Scatter(x=pred_dates, y=rsi_upper, name='RSI Upper Bound',
line=dict(color='purple', width=1, dash='dot'), showlegend=False),
row=2, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=rsi_lower, name='RSI Lower Bound',
line=dict(color='purple', width=1, dash='dot'), fill='tonexty',
fillcolor='rgba(128,0,128,0.1)', showlegend=False),
row=2, col=1
)
# MACD with uncertainty
fig.add_trace(
go.Scatter(x=df.index, y=df['MACD'], name='MACD (Historical)',
line=dict(color='orange', width=1)),
row=2, col=1
)
if 'MACD' in technical_predictions:
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['MACD'], name='MACD (Predicted)',
line=dict(color='orange', width=2, dash='dash')),
row=2, col=1
)
# Add MACD signal line if available
if 'MACD_Signal' in technical_predictions:
fig.add_trace(
go.Scatter(x=pred_dates, y=technical_predictions['MACD_Signal'], name='MACD Signal (Predicted)',
line=dict(color='red', width=1, dash='dash')),
row=2, col=1
)
# Add MACD uncertainty bands
if 'MACD' in technical_uncertainties:
macd_upper = technical_predictions['MACD'] + 2 * technical_uncertainties['MACD']
macd_lower = technical_predictions['MACD'] - 2 * technical_uncertainties['MACD']
fig.add_trace(
go.Scatter(x=pred_dates, y=macd_upper, name='MACD Upper Bound',
line=dict(color='orange', width=1, dash='dot'), showlegend=False),
row=2, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=macd_lower, name='MACD Lower Bound',
line=dict(color='orange', width=1, dash='dot'), fill='tonexty',
fillcolor='rgba(255,165,0,0.1)', showlegend=False),
row=2, col=1
)
# Add volume with uncertainty on the same subplot
if 'Volume' in df.columns and not df['Volume'].isna().all():
volume_data = pd.to_numeric(df['Volume'], errors='coerce').fillna(0)
fig.add_trace(
go.Bar(x=df.index, y=volume_data, name='Historical Volume',
marker_color='lightblue', opacity=0.7),
row=3, col=1
)
else:
print("Warning: No valid volume data available for plotting")
# Add predicted volume with better handling
if volume_pred is not None and len(volume_pred) > 0:
# Ensure volume prediction is numeric and handle any NaN values
volume_pred_clean = np.array(volume_pred)
volume_pred_clean = np.where(np.isnan(volume_pred_clean), 0, volume_pred_clean)
fig.add_trace(
go.Bar(x=pred_dates, y=volume_pred_clean, name='Predicted Volume',
marker_color='red', opacity=0.7),
row=3, col=1
)
# Add volume uncertainty bands if available
if volume_uncertainty is not None and len(volume_uncertainty) > 0:
volume_uncertainty_clean = np.array(volume_uncertainty)
volume_uncertainty_clean = np.where(np.isnan(volume_uncertainty_clean), 0, volume_uncertainty_clean)
volume_upper = volume_pred_clean + 2 * volume_uncertainty_clean
volume_lower = np.maximum(0, volume_pred_clean - 2 * volume_uncertainty_clean)
fig.add_trace(
go.Scatter(x=pred_dates, y=volume_upper, name='Volume Upper Bound',
line=dict(color='red', width=1, dash='dot'), showlegend=False),
row=3, col=1
)
fig.add_trace(
go.Scatter(x=pred_dates, y=volume_lower, name='Volume Lower Bound',
line=dict(color='red', width=1, dash='dot'), fill='tonexty',
fillcolor='rgba(255,0,0,0.1)', showlegend=False),
row=3, col=1
)
# Add reference lines for technical indicators
# RSI overbought/oversold lines
fig.add_hline(y=70, line_dash="dash", line_color="red",
annotation_text="Overbought (70)", row=2, col=1)
fig.add_hline(y=30, line_dash="dash", line_color="green",
annotation_text="Oversold (30)", row=2, col=1)
# MACD zero line
fig.add_hline(y=0, line_dash="dash", line_color="black",
annotation_text="MACD Zero", row=2, col=1)
# Add volume reference line (average volume)
if 'Volume' in df.columns and not df['Volume'].isna().all():
avg_volume = df['Volume'].mean()
fig.add_hline(y=avg_volume, line_dash="dash", line_color="blue",
annotation_text=f"Avg Volume: {avg_volume:,.0f}", row=3, col=1)
fig.update_layout(
title=dict(
text=f'Enhanced Stock Prediction for {symbol}',
x=0.5,
xanchor='center',
font=dict(size=18, color='black'),
y=0.95 # Moved down slightly to avoid overlap
),
# height=1000, <-- HAPUS ATAU COMMENT BARIS INI
showlegend=True,
legend=dict(
orientation="h",
yanchor="top",
y=-0.15, # Position legend below all subplots
xanchor="center",
x=0.5, # Center the legend horizontally
bgcolor='rgba(255,255,255,0.9)',
bordercolor='black',
borderwidth=1,
font=dict(size=10) # Smaller font for better fit
),
margin=dict(t=120, b=150, l=80, r=80), # Increased bottom margin for legend
autosize=True,
hovermode='x unified'
)
fig.update_xaxes(
title_text="Date",
row=3, col=1,
title_font=dict(size=12, color='black'),
tickfont=dict(size=10)
)
fig.update_yaxes(
title_text="Price ($)",
row=1, col=1,
title_font=dict(size=12, color='black'),
tickfont=dict(size=10)
)
fig.update_yaxes(
title_text="Technical Indicators",
row=2, col=1,
title_font=dict(size=12, color='black'),
tickfont=dict(size=10)
)
fig.update_yaxes(
title_text="Volume",
row=3, col=1,
title_font=dict(size=12, color='black'),
tickfont=dict(size=10)
)
for i in range(len(fig.layout.annotations)):
if i < 3:
fig.layout.annotations[i].update(
font=dict(size=13, color='darkblue', family='Arial, sans-serif'),
y=fig.layout.annotations[i].y + 0.01, # Reduced adjustment to prevent overlap
bgcolor='rgba(255,255,255,0.9)',
bordercolor='lightgray',
borderwidth=1
)
# Create comprehensive trading signals
trading_signals = {
'prediction': {
'dates': pred_dates.tolist(),
'prices': final_pred.tolist(),
'uncertainty': final_uncertainty.tolist(),
'volume': volume_pred.tolist() if volume_pred is not None else None
},
'historical': {
'dates': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'prices': df['Close'].tolist(),
'volume': df['Volume'].tolist() if 'Volume' in df.columns else None
},
'technical_indicators': {
'predictions': {k: v.tolist() for k, v in technical_predictions.items()},
'uncertainties': {k: v.tolist() for k, v in technical_uncertainties.items()}
},
'advanced_uncertainties': advanced_uncertainties,
'regime_info': regime_info,
'sentiment_data': sentiment_data,
'market_conditions': market_conditions,
'covariate_data_available': len(covariate_data) > 0
}
# Add stress testing results if enabled
stress_test_results = {}
if use_stress_testing:
try:
print("Performing stress testing...")
stress_test_results = stress_test_scenarios(df, final_pred)
trading_signals['stress_test_results'] = stress_test_results
except Exception as stress_error:
print(f"Stress testing failed: {str(stress_error)}")
trading_signals['stress_test_results'] = {"error": str(stress_error)}
# Add advanced trading signals
try:
print("Generating advanced trading signals...")
advanced_signals = advanced_trading_signals(df, regime_info)
trading_signals['advanced_signals'] = advanced_signals
except Exception as advanced_error:
print(f"Advanced trading signals failed: {str(advanced_error)}")
trading_signals['advanced_signals'] = {"error": str(advanced_error)}
# Add basic trading signals
try:
basic_signals = calculate_trading_signals(df)
trading_signals.update(basic_signals)
trading_signals['symbol'] = symbol
trading_signals['timeframe'] = timeframe
trading_signals['strategy_used'] = strategy
trading_signals['ensemble_used'] = use_ensemble
except Exception as basic_error:
print(f"Basic trading signals failed: {str(basic_error)}")
trading_signals['error'] = str(basic_error)
# Debug information for volume and uncertainty
print(f"Volume data info:")
print(f" Historical volume shape: {df['Volume'].shape if 'Volume' in df.columns else 'No volume column'}")
print(f" Historical volume NaN count: {df['Volume'].isna().sum() if 'Volume' in df.columns else 'N/A'}")
print(f" Predicted volume shape: {volume_pred.shape if volume_pred is not None else 'None'}")
print(f" Predicted volume NaN count: {np.isnan(volume_pred).sum() if volume_pred is not None else 'N/A'}")
print(f"Uncertainty data info:")
print(f" Final uncertainty shape: {final_uncertainty.shape if final_uncertainty is not None else 'None'}")
print(f" Final uncertainty NaN count: {np.isnan(final_uncertainty).sum() if final_uncertainty is not None else 'N/A'}")
print(f" Final uncertainty range: [{np.min(final_uncertainty):.4f}, {np.max(final_uncertainty):.4f}]" if final_uncertainty is not None else 'N/A')
return trading_signals, fig
except Exception as e:
print(f"Enhanced prediction error: {str(e)}")
raise
def calculate_trading_signals(df: pd.DataFrame) -> Dict:
"""Calculate trading signals based on technical indicators"""
signals = {
"RSI": "Oversold" if df['RSI'].iloc[-1] < 30 else "Overbought" if df['RSI'].iloc[-1] > 70 else "Neutral",
"MACD": "Buy" if df['MACD'].iloc[-1] > df['MACD_Signal'].iloc[-1] else "Sell",
"Bollinger": "Buy" if df['Close'].iloc[-1] < df['BB_Lower'].iloc[-1] else "Sell" if df['Close'].iloc[-1] > df['BB_Upper'].iloc[-1] else "Hold",
"SMA": "Buy" if df['SMA_20'].iloc[-1] > df['SMA_50'].iloc[-1] else "Sell"
}
# Calculate overall signal
buy_signals = sum(1 for signal in signals.values() if signal == "Buy")
sell_signals = sum(1 for signal in signals.values() if signal == "Sell")
if buy_signals > sell_signals:
signals["Overall"] = "Buy"
elif sell_signals > buy_signals:
signals["Overall"] = "Sell"
else:
signals["Overall"] = "Hold"
return signals
def get_market_data(symbol: str = "^GSPC", lookback_days: int = 365) -> pd.DataFrame:
"""
Fetch market data (S&P 500 by default) for correlation analysis and regime detection.
Uses recommended yfinance API methods for better reliability.
Args:
symbol (str): Market index symbol (default: ^GSPC for S&P 500)
lookback_days (int): Number of days to look back
Returns:
pd.DataFrame: Market data with returns
"""
cache_key = f"{symbol}_{lookback_days}"
current_time = time.time()
# Check cache
if cache_key in market_data_cache and current_time < cache_expiry.get(cache_key, 0):
return market_data_cache[cache_key]
try:
ticker = yf.Ticker(symbol)
def fetch_market_history():
return ticker.history(
period=f"{lookback_days}d",
interval="1d",
prepost=False,
actions=False,
auto_adjust=True,
back_adjust=True,
repair=True
)
df = retry_yfinance_request(fetch_market_history)
if df is not None and not df.empty:
df['Returns'] = df['Close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=20).std()
# Cache the data
market_data_cache[cache_key] = df
cache_expiry[cache_key] = current_time + CACHE_DURATION
print(f"Successfully fetched market data for {symbol}: {len(df)} data points")
else:
print(f"Warning: No data returned for {symbol}")
df = pd.DataFrame()
return df
except Exception as e:
print(f"Warning: Could not fetch market data for {symbol}: {str(e)}")
return pd.DataFrame()
def detect_market_regime(returns: pd.Series, n_regimes: int = 3) -> Dict:
"""
Detect market regime using Hidden Markov Model or simplified methods.
Args:
returns (pd.Series): Price returns
n_regimes (int): Number of regimes to detect
Returns:
Dict: Regime information including probabilities and characteristics
"""
def get_regime_name(regime_idx: int, means: List[float], volatilities: List[float]) -> str:
"""
Convert regime index to descriptive name based on characteristics.
Args:
regime_idx (int): Regime index (0, 1, 2)
means (List[float]): List of regime means
volatilities (List[float]): List of regime volatilities
Returns:
str: Descriptive regime name
"""
if len(means) != 3 or len(volatilities) != 3:
return f"Regime {regime_idx}"
# Sort regimes by volatility (low to high)
vol_sorted = sorted(range(len(volatilities)), key=lambda i: volatilities[i])
# Sort regimes by mean return (low to high)
mean_sorted = sorted(range(len(means)), key=lambda i: means[i])
# Determine regime characteristics
if regime_idx == vol_sorted[0]: # Lowest volatility
if means[regime_idx] > 0:
return "Low Volatility Bull"
else:
return "Low Volatility Bear"
elif regime_idx == vol_sorted[2]: # Highest volatility
if means[regime_idx] > 0:
return "High Volatility Bull"
else:
return "High Volatility Bear"
else: # Medium volatility
if means[regime_idx] > 0:
return "Moderate Bull"
else:
return "Moderate Bear"
if len(returns) < 50:
return {"regime": "Normal Market", "probabilities": [1.0], "volatility": returns.std()}
try:
if HMM_AVAILABLE:
# Use HMM for regime detection
# Convert pandas Series to numpy array for reshape
returns_array = returns.dropna().values
# Try different HMM configurations if convergence fails
for attempt in range(3):
try:
if attempt == 0:
model = hmm.GaussianHMM(
n_components=n_regimes,
random_state=42,
covariance_type="full",
n_iter=500,
tol=1e-4,
init_params="stmc"
)
elif attempt == 1:
model = hmm.GaussianHMM(
n_components=n_regimes,
random_state=42,
covariance_type="diag",
n_iter=1000,
tol=1e-3,
init_params="stmc"
)
else:
model = hmm.GaussianHMM(
n_components=n_regimes,
random_state=42,
covariance_type="spherical",
n_iter=1500,
tol=1e-2,
init_params="stmc"
)
# Add data preprocessing to improve convergence
returns_clean = returns_array[~np.isnan(returns_array)]
returns_clean = returns_clean[~np.isinf(returns_clean)]
# Remove outliers that might cause convergence issues
q1, q3 = np.percentile(returns_clean, [25, 75])
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
returns_filtered = returns_clean[(returns_clean >= lower_bound) & (returns_clean <= upper_bound)]
# Ensure we have enough data
if len(returns_filtered) < 50:
returns_filtered = returns_clean
# Fit the model with filtered data
model.fit(returns_filtered.reshape(-1, 1))
# Check if model converged
if model.monitor_.converged:
print(f"HMM converged successfully with {model.covariance_type} covariance type")
else:
print(f"HMM did not converge with {model.covariance_type} covariance type, trying next configuration...")
if attempt < 2: # Not the last attempt
continue
else:
print("HMM failed to converge with all configurations, using fallback method")
raise Exception("HMM convergence failed")
# Get regime probabilities for the last observation
regime_probs = model.predict_proba(returns_array.reshape(-1, 1))
current_regime = model.predict(returns_array.reshape(-1, 1))[-1]
# Calculate regime characteristics
regime_means = model.means_.flatten()
regime_vols = np.sqrt(model.covars_.diagonal(axis1=1, axis2=2)) if model.covariance_type == "full" else np.sqrt(model.covars_)
# Convert regime index to descriptive name
regime_name = get_regime_name(int(current_regime), regime_means.tolist(), regime_vols.tolist())
return {
"regime": regime_name,
"regime_index": int(current_regime),
"probabilities": regime_probs[-1].tolist(),
"means": regime_means.tolist(),
"volatilities": regime_vols.tolist(),
"method": f"HMM-{model.covariance_type}"
}
except Exception as e:
if attempt == 2: # Last attempt failed
print(f"HMM failed after {attempt + 1} attempts: {str(e)}")
break
continue
else:
# Simplified regime detection using volatility clustering
volatility = returns.rolling(window=20).std().dropna()
vol_percentile = volatility.iloc[-1] / volatility.quantile(0.8)
if vol_percentile > 1.2:
regime_name = "High Volatility Market"
regime = 2 # High volatility regime
elif vol_percentile < 0.8:
regime_name = "Low Volatility Market"
regime = 0 # Low volatility regime
else:
regime_name = "Normal Market"
regime = 1 # Normal regime
return {
"regime": regime_name,
"regime_index": regime,
"probabilities": [0.1, 0.8, 0.1] if regime == 1 else [0.8, 0.1, 0.1] if regime == 0 else [0.1, 0.1, 0.8],
"volatility": volatility.iloc[-1],
"method": "Volatility-based"
}
except Exception as e:
print(f"Warning: Regime detection failed: {str(e)}")
return {"regime": "Normal Market", "regime_index": 1, "probabilities": [1.0], "volatility": returns.std(), "method": "Fallback"}
def calculate_advanced_risk_metrics(df: pd.DataFrame, market_returns: pd.Series = None,
risk_free_rate: float = 0.02) -> Dict:
"""
Calculate advanced risk metrics including tail risk and market correlation.
Args:
df (pd.DataFrame): Stock data
market_returns (pd.Series): Market returns for correlation analysis
risk_free_rate (float): Annual risk-free rate
Returns:
Dict: Advanced risk metrics
"""
try:
returns = df['Returns'].dropna()
if len(returns) < 30:
return {"error": "Insufficient data for risk calculation"}
# Basic metrics
annual_return = returns.mean() * 252
annual_vol = returns.std() * np.sqrt(252)
# Market-adjusted metrics
beta = 1.0
alpha = 0.0
correlation = 0.0
aligned_returns = None
aligned_market = None
if market_returns is not None and len(market_returns) > 0:
try:
# Align dates
aligned_returns = returns.reindex(market_returns.index).dropna()
aligned_market = market_returns.reindex(aligned_returns.index).dropna()
# Ensure both arrays have the same length
if len(aligned_returns) > 10 and len(aligned_market) > 10:
# Find the common length
min_length = min(len(aligned_returns), len(aligned_market))
aligned_returns = aligned_returns.iloc[-min_length:]
aligned_market = aligned_market.iloc[-min_length:]
# Ensure they have the same length
if len(aligned_returns) == len(aligned_market) and len(aligned_returns) > 10:
try:
beta = np.cov(aligned_returns, aligned_market)[0,1] / np.var(aligned_market)
alpha = aligned_returns.mean() - beta * aligned_market.mean()
correlation = np.corrcoef(aligned_returns, aligned_market)[0,1]
except Exception as e:
print(f"Market correlation calculation error: {str(e)}")
beta = 1.0
alpha = 0.0
correlation = 0.0
else:
beta = 1.0
alpha = 0.0
correlation = 0.0
else:
beta = 1.0
alpha = 0.0
correlation = 0.0
except Exception as e:
print(f"Market data alignment error: {str(e)}")
beta = 1.0
alpha = 0.0
correlation = 0.0
aligned_returns = None
aligned_market = None
# Tail risk metrics
var_95 = np.percentile(returns, 5)
var_99 = np.percentile(returns, 1)
cvar_95 = returns[returns <= var_95].mean()
cvar_99 = returns[returns <= var_99].mean()
# Maximum drawdown
cumulative_returns = (1 + returns).cumprod()
rolling_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - rolling_max) / rolling_max
max_drawdown = drawdown.min()
# Skewness and kurtosis
skewness = stats.skew(returns)
kurtosis = stats.kurtosis(returns)
# Risk-adjusted returns
sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0
sortino_ratio = (annual_return - risk_free_rate) / (returns[returns < 0].std() * np.sqrt(252)) if returns[returns < 0].std() > 0 else 0
calmar_ratio = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
# Information ratio (if market data available)
information_ratio = 0
if aligned_returns is not None and aligned_market is not None:
try:
if len(aligned_returns) > 10 and len(aligned_market) > 10:
min_length = min(len(aligned_returns), len(aligned_market))
aligned_returns_for_ir = aligned_returns.iloc[-min_length:]
aligned_market_for_ir = aligned_market.iloc[-min_length:]
if len(aligned_returns_for_ir) == len(aligned_market_for_ir):
excess_returns = aligned_returns_for_ir - aligned_market_for_ir
information_ratio = excess_returns.mean() / excess_returns.std() if excess_returns.std() > 0 else 0
else:
information_ratio = 0
else:
information_ratio = 0
except Exception as e:
print(f"Information ratio calculation error: {str(e)}")
information_ratio = 0
return {
"Annual_Return": annual_return,
"Annual_Volatility": annual_vol,
"Sharpe_Ratio": sharpe_ratio,
"Sortino_Ratio": sortino_ratio,
"Calmar_Ratio": calmar_ratio,
"Information_Ratio": information_ratio,
"Beta": beta,
"Alpha": alpha * 252,
"Correlation_with_Market": correlation,
"VaR_95": var_95,
"VaR_99": var_99,
"CVaR_95": cvar_95,
"CVaR_99": cvar_99,
"Max_Drawdown": max_drawdown,
"Skewness": skewness,
"Kurtosis": kurtosis,
"Risk_Free_Rate": risk_free_rate
}
except Exception as e:
print(f"Advanced risk metrics calculation error: {str(e)}")
return {"error": f"Risk calculation failed: {str(e)}"}
def create_ensemble_prediction(df: pd.DataFrame, prediction_days: int,
ensemble_weights: Dict = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Create ensemble prediction combining multiple models.
Args:
df (pd.DataFrame): Historical data
prediction_days (int): Number of days to predict
ensemble_weights (Dict): Weights for different models
Returns:
Tuple[np.ndarray, np.ndarray]: Mean and uncertainty predictions
"""
if ensemble_weights is None:
ensemble_weights = {"chronos": 0.6, "technical": 0.2, "statistical": 0.2}
predictions = {}
uncertainties = {}
# Chronos prediction (placeholder - will be filled by main prediction function)
predictions["chronos"] = np.array([])
uncertainties["chronos"] = np.array([])
# Technical prediction
if ensemble_weights.get("technical", 0) > 0:
try:
last_price = df['Close'].iloc[-1]
rsi = df['RSI'].iloc[-1]
macd = df['MACD'].iloc[-1]
macd_signal = df['MACD_Signal'].iloc[-1]
volatility = df['Volatility'].iloc[-1]
# Enhanced technical prediction
trend = 1 if (rsi > 50 and macd > macd_signal) else -1
mean_reversion = (df['SMA_200'].iloc[-1] - last_price) / last_price if 'SMA_200' in df.columns else 0
tech_pred = []
for i in range(1, prediction_days + 1):
# Combine trend and mean reversion
prediction = last_price * (1 + trend * volatility * 0.3 + mean_reversion * 0.1 * i)
tech_pred.append(prediction)
predictions["technical"] = np.array(tech_pred)
uncertainties["technical"] = np.array([volatility * last_price * i for i in range(1, prediction_days + 1)])
except Exception as e:
print(f"Technical prediction error: {str(e)}")
predictions["technical"] = np.array([])
uncertainties["technical"] = np.array([])
# Statistical prediction (ARIMA-like)
if ensemble_weights.get("statistical", 0) > 0:
try:
returns = df['Returns'].dropna()
if len(returns) > 10:
# Simple moving average with momentum
ma_short = df['Close'].rolling(window=10).mean().iloc[-1]
ma_long = df['Close'].rolling(window=30).mean().iloc[-1]
momentum = (ma_short - ma_long) / ma_long
last_price = df['Close'].iloc[-1]
stat_pred = []
for i in range(1, prediction_days + 1):
# Mean reversion with momentum
prediction = last_price * (1 + momentum * 0.5 - 0.001 * i) # Decay factor
stat_pred.append(prediction)
predictions["statistical"] = np.array(stat_pred)
uncertainties["statistical"] = np.array([returns.std() * last_price * np.sqrt(i) for i in range(1, prediction_days + 1)])
else:
predictions["statistical"] = np.array([])
uncertainties["statistical"] = np.array([])
except Exception as e:
print(f"Statistical prediction error: {str(e)}")
predictions["statistical"] = np.array([])
uncertainties["statistical"] = np.array([])
# Combine predictions
valid_predictions = {k: v for k, v in predictions.items() if len(v) > 0}
valid_uncertainties = {k: v for k, v in uncertainties.items() if len(v) > 0}
if not valid_predictions:
return np.array([]), np.array([])
# Weighted ensemble
total_weight = sum(ensemble_weights.get(k, 0) for k in valid_predictions.keys())
if total_weight == 0:
return np.array([]), np.array([])
# Normalize weights
normalized_weights = {k: ensemble_weights.get(k, 0) / total_weight for k in valid_predictions.keys()}
# Calculate weighted mean and uncertainty
max_length = max(len(v) for v in valid_predictions.values())
ensemble_mean = np.zeros(max_length)
ensemble_uncertainty = np.zeros(max_length)
for model, pred in valid_predictions.items():
weight = normalized_weights[model]
if len(pred) < max_length:
# Extend prediction using last value
extended_pred = np.concatenate([pred, np.full(max_length - len(pred), pred[-1])])
extended_unc = np.concatenate([valid_uncertainties[model], np.full(max_length - len(pred), valid_uncertainties[model][-1])])
else:
extended_pred = pred[:max_length]
extended_unc = valid_uncertainties[model][:max_length]
ensemble_mean += weight * extended_pred
ensemble_uncertainty += weight * extended_unc
return ensemble_mean, ensemble_uncertainty
def stress_test_scenarios(df: pd.DataFrame, prediction: np.ndarray,
scenarios: Dict = None) -> Dict:
"""
Perform stress testing under various market scenarios.
Args:
df (pd.DataFrame): Historical data
prediction (np.ndarray): Base prediction
scenarios (Dict): Stress test scenarios
Returns:
Dict: Stress test results
"""
if scenarios is None:
scenarios = {
"market_crash": {"volatility_multiplier": 3.0, "return_shock": -0.15},
"high_volatility": {"volatility_multiplier": 2.0, "return_shock": -0.05},
"low_volatility": {"volatility_multiplier": 0.5, "return_shock": 0.02},
"bull_market": {"volatility_multiplier": 1.2, "return_shock": 0.10},
"interest_rate_shock": {"volatility_multiplier": 1.5, "return_shock": -0.08}
}
base_volatility = df['Volatility'].iloc[-1]
base_return = df['Returns'].mean()
last_price = df['Close'].iloc[-1]
stress_results = {}
for scenario_name, params in scenarios.items():
try:
# Calculate stressed parameters
stressed_vol = base_volatility * params["volatility_multiplier"]
stressed_return = base_return + params["return_shock"]
# Generate stressed prediction
stressed_pred = []
for i, pred in enumerate(prediction):
# Apply stress factors
stress_factor = 1 + stressed_return * (i + 1) / 252
volatility_impact = np.random.normal(0, stressed_vol * np.sqrt((i + 1) / 252))
stressed_price = pred * stress_factor * (1 + volatility_impact)
stressed_pred.append(stressed_price)
# Calculate stress metrics
stress_results[scenario_name] = {
"prediction": np.array(stressed_pred),
"max_loss": min(stressed_pred) / last_price - 1,
"volatility": stressed_vol,
"expected_return": stressed_return,
"var_95": np.percentile([p / last_price - 1 for p in stressed_pred], 5)
}
except Exception as e:
print(f"Stress test error for {scenario_name}: {str(e)}")
stress_results[scenario_name] = {"error": str(e)}
return stress_results
def calculate_skewed_uncertainty(quantiles: np.ndarray, confidence_level: float = 0.9) -> np.ndarray:
"""
Calculate uncertainty accounting for skewness in return distributions.
Args:
quantiles (np.ndarray): Quantile predictions from Chronos
confidence_level (float): Confidence level for uncertainty calculation
Returns:
np.ndarray: Uncertainty estimates
"""
try:
lower = quantiles[0, :, 0]
median = quantiles[0, :, 1]
upper = quantiles[0, :, 2]
# Calculate skewness for each prediction point
uncertainties = []
for i in range(len(lower)):
# Calculate skewness
if upper[i] != median[i] and median[i] != lower[i]:
skewness = (median[i] - lower[i]) / (upper[i] - median[i])
else:
skewness = 1.0
# Adjust z-score based on skewness
if skewness > 1.2: # Right-skewed
z_score = stats.norm.ppf(confidence_level) * (1 + 0.1 * skewness)
elif skewness < 0.8: # Left-skewed
z_score = stats.norm.ppf(confidence_level) * (1 - 0.1 * abs(skewness))
else:
z_score = stats.norm.ppf(confidence_level)
# Calculate uncertainty
uncertainty = (upper[i] - lower[i]) / (2 * z_score)
uncertainties.append(uncertainty)
return np.array(uncertainties)
except Exception as e:
print(f"Skewed uncertainty calculation error: {str(e)}")
# Fallback to simple calculation
return (quantiles[0, :, 2] - quantiles[0, :, 0]) / (2 * 1.645)
def adaptive_smoothing(new_pred: np.ndarray, historical_pred: np.ndarray,
prediction_uncertainty: np.ndarray) -> np.ndarray:
"""
Apply adaptive smoothing based on prediction uncertainty.
Args:
new_pred (np.ndarray): New predictions
historical_pred (np.ndarray): Historical predictions
prediction_uncertainty (np.ndarray): Prediction uncertainty
Returns:
np.ndarray: Smoothed predictions
"""
try:
if len(historical_pred) == 0:
return new_pred
# Calculate adaptive alpha based on uncertainty
uncertainty_ratio = prediction_uncertainty / np.mean(np.abs(historical_pred))
if uncertainty_ratio > 0.1: # High uncertainty
alpha = 0.1 # More smoothing
elif uncertainty_ratio < 0.05: # Low uncertainty
alpha = 0.5 # Less smoothing
else:
alpha = 0.3 # Default
# Apply weighted smoothing
smoothed = alpha * new_pred + (1 - alpha) * historical_pred[-len(new_pred):]
return smoothed
except Exception as e:
print(f"Adaptive smoothing error: {str(e)}")
return new_pred
def advanced_trading_signals(df: pd.DataFrame, regime_info: Dict = None) -> Dict:
"""
Generate advanced trading signals with confidence levels and regime awareness.
Args:
df (pd.DataFrame): Stock data
regime_info (Dict): Market regime information
Returns:
Dict: Advanced trading signals
"""
try:
# Calculate signal strength and confidence
rsi = df['RSI'].iloc[-1]
macd = df['MACD'].iloc[-1]
macd_signal = df['MACD_Signal'].iloc[-1]
rsi_strength = abs(rsi - 50) / 50 # 0-1 scale
macd_strength = abs(macd - macd_signal) / df['Close'].iloc[-1]
# Regime-adjusted thresholds
if regime_info and "volatilities" in regime_info:
volatility_regime = df['Volatility'].iloc[-1] / np.mean(regime_info["volatilities"])
else:
volatility_regime = 1.0
# Adjust RSI thresholds based on volatility
rsi_oversold = 30 + (volatility_regime - 1) * 10
rsi_overbought = 70 - (volatility_regime - 1) * 10
# Calculate signals with confidence
signals = {}
# RSI signal
if rsi < rsi_oversold:
rsi_signal = "Oversold"
rsi_confidence = min(0.9, 0.5 + rsi_strength * 0.4)
elif rsi > rsi_overbought:
rsi_signal = "Overbought"
rsi_confidence = min(0.9, 0.5 + rsi_strength * 0.4)
else:
rsi_signal = "Neutral"
rsi_confidence = 0.3
signals["RSI"] = {
"signal": rsi_signal,
"strength": rsi_strength,
"confidence": rsi_confidence,
"value": rsi
}
# MACD signal
if macd > macd_signal:
macd_signal = "Buy"
macd_confidence = min(0.8, 0.4 + macd_strength * 40)
else:
macd_signal = "Sell"
macd_confidence = min(0.8, 0.4 + macd_strength * 40)
signals["MACD"] = {
"signal": macd_signal,
"strength": macd_strength,
"confidence": macd_confidence,
"value": macd
}
# Bollinger Bands signal
if 'BB_Upper' in df.columns and 'BB_Lower' in df.columns:
current_price = df['Close'].iloc[-1]
bb_upper = df['BB_Upper'].iloc[-1]
bb_lower = df['BB_Lower'].iloc[-1]
# Calculate position within Bollinger Bands (0-1 scale)
bb_position = (current_price - bb_lower) / (bb_upper - bb_lower) if bb_upper != bb_lower else 0.5
bb_strength = abs(bb_position - 0.5) * 2 # 0-1 scale, strongest at edges
if current_price < bb_lower:
bb_signal = "Buy"
bb_confidence = 0.7
elif current_price > bb_upper:
bb_signal = "Sell"
bb_confidence = 0.7
else:
bb_signal = "Hold"
bb_confidence = 0.5
signals["Bollinger"] = {
"signal": bb_signal,
"strength": bb_strength,
"confidence": bb_confidence,
"position": bb_position
}
# SMA signal
if 'SMA_20' in df.columns and 'SMA_50' in df.columns:
sma_20 = df['SMA_20'].iloc[-1]
sma_50 = df['SMA_50'].iloc[-1]
# Calculate SMA strength based on ratio
sma_ratio = sma_20 / sma_50 if sma_50 != 0 else 1.0
sma_strength = abs(sma_ratio - 1.0) # 0-1 scale, strongest when ratio differs most from 1
if sma_20 > sma_50:
sma_signal = "Buy"
sma_confidence = 0.6
else:
sma_signal = "Sell"
sma_confidence = 0.6
signals["SMA"] = {
"signal": sma_signal,
"strength": sma_strength,
"confidence": sma_confidence,
"ratio": sma_ratio
}
# Calculate weighted overall signal
buy_signals = []
sell_signals = []
for signal_name, signal_data in signals.items():
# Get strength with default value if not present
strength = signal_data.get("strength", 0.5) # Default strength of 0.5
confidence = signal_data.get("confidence", 0.5) # Default confidence of 0.5
if signal_data["signal"] == "Buy":
buy_signals.append(strength * confidence)
elif signal_data["signal"] == "Sell":
sell_signals.append(strength * confidence)
weighted_buy = sum(buy_signals) if buy_signals else 0
weighted_sell = sum(sell_signals) if sell_signals else 0
if weighted_buy > weighted_sell:
overall_signal = "Buy"
overall_confidence = weighted_buy / (weighted_buy + weighted_sell) if (weighted_buy + weighted_sell) > 0 else 0
elif weighted_sell > weighted_buy:
overall_signal = "Sell"
overall_confidence = weighted_sell / (weighted_buy + weighted_sell) if (weighted_buy + weighted_sell) > 0 else 0
else:
overall_signal = "Hold"
overall_confidence = 0.5
return {
"signals": signals,
"overall_signal": overall_signal,
"confidence": overall_confidence,
"regime_adjusted": regime_info is not None
}
except Exception as e:
print(f"Advanced trading signals error: {str(e)}")
return {"error": str(e)}
def apply_financial_smoothing(data: np.ndarray, smoothing_type: str = "exponential",
window_size: int = 5, alpha: float = 0.3,
poly_order: int = 3, use_smoothing: bool = True) -> np.ndarray:
"""
Apply financial smoothing algorithms to time series data.
Args:
data (np.ndarray): Input time series data
smoothing_type (str): Type of smoothing to apply
- 'exponential': Exponential moving average (good for trend following)
- 'moving_average': Simple moving average (good for noise reduction)
- 'kalman': Kalman filter (good for adaptive smoothing)
- 'savitzky_golay': Savitzky-Golay filter (good for preserving peaks/valleys)
- 'double_exponential': Double exponential smoothing (good for trend + seasonality)
- 'triple_exponential': Triple exponential smoothing (Holt-Winters, good for complex patterns)
- 'adaptive': Adaptive smoothing based on volatility
- 'none': No smoothing applied
window_size (int): Window size for moving average and Savitzky-Golay
alpha (float): Smoothing factor for exponential methods (0-1)
poly_order (int): Polynomial order for Savitzky-Golay filter
use_smoothing (bool): Whether to apply smoothing
Returns:
np.ndarray: Smoothed data
"""
if not use_smoothing or smoothing_type == "none" or len(data) < 3:
return data
try:
if smoothing_type == "exponential":
# Exponential Moving Average - good for trend following
smoothed = np.zeros_like(data)
smoothed[0] = data[0]
for i in range(1, len(data)):
smoothed[i] = alpha * data[i] + (1 - alpha) * smoothed[i-1]
return smoothed
elif smoothing_type == "moving_average":
# Simple Moving Average - good for noise reduction
if len(data) < window_size:
return data
smoothed = np.zeros_like(data)
# Handle the beginning of the series
for i in range(min(window_size - 1, len(data))):
smoothed[i] = np.mean(data[:i+1])
# Apply moving average for the rest
for i in range(window_size - 1, len(data)):
smoothed[i] = np.mean(data[i-window_size+1:i+1])
return smoothed
elif smoothing_type == "kalman":
# Kalman Filter - adaptive smoothing
if len(data) < 2:
return data
# Initialize Kalman filter parameters
Q = 0.01 # Process noise
R = 0.1 # Measurement noise
P = 1.0 # Initial estimate error
x = data[0] # Initial state estimate
smoothed = np.zeros_like(data)
smoothed[0] = x
for i in range(1, len(data)):
# Prediction step
x_pred = x
P_pred = P + Q
# Update step
K = P_pred / (P_pred + R) # Kalman gain
x = x_pred + K * (data[i] - x_pred)
P = (1 - K) * P_pred
smoothed[i] = x
return smoothed
elif smoothing_type == "savitzky_golay":
# Savitzky-Golay filter - preserves peaks and valleys
if len(data) < window_size:
return data
# Ensure window_size is odd
if window_size % 2 == 0:
window_size += 1
# Ensure polynomial order is less than window_size
if poly_order >= window_size:
poly_order = window_size - 1
try:
from scipy.signal import savgol_filter
return savgol_filter(data, window_size, poly_order)
except ImportError:
# Fallback to simple moving average if scipy not available
return apply_financial_smoothing(data, "moving_average", window_size)
elif smoothing_type == "double_exponential":
# Double Exponential Smoothing (Holt's method) - trend + level
if len(data) < 3:
return data
smoothed = np.zeros_like(data)
trend = np.zeros_like(data)
# Initialize
smoothed[0] = data[0]
trend[0] = data[1] - data[0] if len(data) > 1 else 0
# Apply double exponential smoothing
for i in range(1, len(data)):
prev_smoothed = smoothed[i-1]
prev_trend = trend[i-1]
smoothed[i] = alpha * data[i] + (1 - alpha) * (prev_smoothed + prev_trend)
trend[i] = alpha * (smoothed[i] - prev_smoothed) + (1 - alpha) * prev_trend
return smoothed
elif smoothing_type == "triple_exponential":
# Triple Exponential Smoothing (Holt-Winters) - trend + level + seasonality
if len(data) < 6:
return apply_financial_smoothing(data, "double_exponential", window_size, alpha)
# For simplicity, we'll use a seasonal period of 5 (common for financial data)
season_period = min(5, len(data) // 2)
smoothed = np.zeros_like(data)
trend = np.zeros_like(data)
season = np.zeros_like(data)
# Initialize
smoothed[0] = data[0]
trend[0] = (data[season_period] - data[0]) / season_period if len(data) > season_period else 0
# Initialize seasonal components
for i in range(season_period):
season[i] = data[i] - smoothed[0]
# Apply triple exponential smoothing
for i in range(1, len(data)):
prev_smoothed = smoothed[i-1]
prev_trend = trend[i-1]
prev_season = season[(i-1) % season_period]
smoothed[i] = alpha * (data[i] - prev_season) + (1 - alpha) * (prev_smoothed + prev_trend)
trend[i] = alpha * (smoothed[i] - prev_smoothed) + (1 - alpha) * prev_trend
season[i % season_period] = alpha * (data[i] - smoothed[i]) + (1 - alpha) * prev_season
return smoothed
elif smoothing_type == "adaptive":
# Adaptive smoothing based on volatility
if len(data) < 5:
return data
# Calculate rolling volatility
returns = np.diff(data) / data[:-1]
volatility = np.zeros_like(data)
volatility[0] = np.std(returns) if len(returns) > 0 else 0.01
for i in range(1, len(data)):
if i < 5:
volatility[i] = np.std(returns[:i]) if i > 0 else 0.01
else:
volatility[i] = np.std(returns[i-5:i])
# Normalize volatility to smoothing factor
vol_factor = np.clip(volatility / np.mean(volatility), 0.1, 0.9)
adaptive_alpha = 1 - vol_factor # Higher volatility = less smoothing
# Apply adaptive exponential smoothing
smoothed = np.zeros_like(data)
smoothed[0] = data[0]
for i in range(1, len(data)):
current_alpha = adaptive_alpha[i]
smoothed[i] = current_alpha * data[i] + (1 - current_alpha) * smoothed[i-1]
return smoothed
else:
# Default to exponential smoothing
return apply_financial_smoothing(data, "exponential", window_size, alpha)
except Exception as e:
print(f"Smoothing error: {str(e)}")
return data
def create_interface():
"""Create the Gradio interface with separate tabs for different timeframes"""
with gr.Blocks(title="Advanced Stock Prediction Analysis", theme=gr.themes.Base()) as demo:
# Add comprehensive market information section with nested accordions
with gr.Accordion("🌎 Market Information", open=True):
# Quick Market Status Check Section
with gr.Accordion("📊 Quick Market Status Check", open=False):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📊 Quick Market Status Check")
# Create user-friendly market choices
market_choices = [(config['name'], key) for key, config in MARKET_CONFIGS.items()]
market_dropdown = gr.Dropdown(
choices=market_choices,
label="Select Market",
value="IDX_STOCKS",
info="Choose a market to check its current status"
)
check_market_btn = gr.Button("🔍 Check Market Status", variant="primary")
with gr.Column(scale=2):
market_status_result = gr.Markdown(
value="Select a market and click 'Check Market Status' to see current trading status.",
label="Market Status Result"
)
# Enhanced Market Information Section
with gr.Accordion("🌍 Enhanced Market Information", open=False):
with gr.Row():
with gr.Column():
gr.Markdown("### Market Summary")
market_summary_display = gr.JSON(label="Market Summary", value={})
def update_market_summary():
"""Update market summary with enhanced yfinance data"""
return get_enhanced_market_summary()
update_market_summary_btn = gr.Button("🔄 Update Market Summary")
update_market_summary_btn.click(
fn=update_market_summary,
outputs=[market_summary_display]
)
with gr.Column():
gr.Markdown("### Market Types Supported")
gr.Markdown("""
**📈 Indonesia Stock Exchange (IDX):**
- **Open Hours**: 09:00 - 15:00 JKT
- **Days**: Monday - Friday
**📊 24/7 Markets:**
- **Gold (XAU/USD)**: 24/7 Trading
- **Bitcoin (BTC/USD)**: 24/7 Trading
**💡 Features:**
- Real-time market status
- Timezone-aware calculations
- Data from yfinance API
""")
# Connect the button to the function
check_market_btn.click(
fn=check_market_status_simple,
inputs=[market_dropdown],
outputs=[market_status_result]
)
# Advanced Settings Accordion
with gr.Accordion("Advanced Settings", open=False):
with gr.Row():
with gr.Column():
use_ensemble = gr.Checkbox(label="Use Ensemble Methods", value=True)
use_regime_detection = gr.Checkbox(label="Use Regime Detection", value=True)
use_stress_testing = gr.Checkbox(label="Use Stress Testing", value=True)
use_covariates = gr.Checkbox(label="Use Enhanced Covariate Data", value=True,
info="Include market indices, sectors, and economic indicators")
use_sentiment = gr.Checkbox(label="Use Sentiment Analysis", value=True,
info="Include news sentiment analysis")
use_smoothing = gr.Checkbox(label="Use Smoothing", value=True)
smoothing_type = gr.Dropdown(
choices=["exponential", "moving_average", "kalman", "savitzky_golay",
"double_exponential", "triple_exponential", "adaptive", "none"],
label="Smoothing Type",
value="exponential"
)
smoothing_window = gr.Slider(
minimum=3, maximum=21, value=5, step=1,
label="Smoothing Window Size",
info="Window size for moving average and Savitzky-Golay filters"
)
smoothing_alpha = gr.Slider(
minimum=0.1, maximum=0.9, value=0.3, step=0.05,
label="Smoothing Alpha",
info="Smoothing factor for exponential methods (0.1-0.9)"
)
risk_free_rate = gr.Slider(
minimum=0.0, maximum=0.1, value=0.02, step=0.001,
label="Risk-Free Rate (Annual)"
)
market_index = gr.Dropdown(
choices=["^JKSE"],
label="Market Index for Correlation",
value="^JKSE"
)
random_real_points = gr.Slider(
minimum=0, maximum=16, value=4, step=1,
label="Random Real Points in Long-Horizon Context"
)
with gr.Column():
gr.Markdown("### Ensemble Weights")
chronos_weight = gr.Slider(minimum=0.0, maximum=1.0, value=0.6, step=0.1, label="Chronos Weight")
technical_weight = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, step=0.1, label="Technical Weight")
statistical_weight = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, step=0.1, label="Statistical Weight")
with gr.Tabs() as tabs:
# Daily Analysis Tab
with gr.TabItem("Daily Analysis"):
with gr.Row():
with gr.Column(scale=1):
daily_symbol = gr.Textbox(label="Stock Symbol (e.g., BBCA.JK)", value="BBCA.JK")
daily_prediction_days = gr.Slider(minimum=1, maximum=365, value=30, step=1, label="Days to Predict")
daily_lookback_days = gr.Slider(minimum=1, maximum=3650, value=365, step=1, label="Historical Lookback (Days)")
daily_strategy = gr.Dropdown(choices=["chronos", "technical"], label="Prediction Strategy", value="chronos")
daily_predict_btn = gr.Button("Analyze Stock")
with gr.Column(scale=2):
daily_plot = gr.Plot(label="Analysis and Prediction")
with gr.Row():
with gr.Column():
gr.Markdown("### Trading Signals")
daily_signals_advanced = gr.JSON(label="Advanced Trading Signals")
gr.Markdown("### Risk & Regime Analysis")
daily_risk_metrics = gr.JSON(label="Risk Metrics")
daily_regime_metrics = gr.JSON(label="Regime Metrics")
with gr.Column():
gr.Markdown("### Financial & Stress Test")
daily_sector_metrics = gr.JSON(label="Sector & Financial Metrics")
daily_stress_results = gr.JSON(label="Stress Test Results")
# Hourly Analysis Tab
with gr.TabItem("Hourly Analysis"):
with gr.Row():
with gr.Column(scale=1):
hourly_symbol = gr.Textbox(label="Stock Symbol (e.g., BBCA.JK)", value="BBCA.JK")
hourly_prediction_days = gr.Slider(minimum=1, maximum=7, value=3, step=1, label="Days to Predict")
hourly_lookback_days = gr.Slider(minimum=1, maximum=60, value=14, step=1, label="Historical Lookback (Days)")
hourly_strategy = gr.Dropdown(choices=["chronos", "technical"], label="Prediction Strategy", value="chronos")
hourly_predict_btn = gr.Button("Analyze Stock")
with gr.Column(scale=2):
hourly_plot = gr.Plot(label="Analysis and Prediction")
with gr.Row():
with gr.Column():
gr.Markdown("### Trading Signals")
hourly_signals_advanced = gr.JSON(label="Advanced Trading Signals")
gr.Markdown("### Risk & Regime Analysis")
hourly_risk_metrics = gr.JSON(label="Risk Metrics")
hourly_regime_metrics = gr.JSON(label="Regime Metrics")
with gr.Column():
gr.Markdown("### Financial & Stress Test")
hourly_sector_metrics = gr.JSON(label="Sector & Financial Metrics")
hourly_stress_results = gr.JSON(label="Stress Test Results")
# 15-Minute Analysis Tab
with gr.TabItem("15-Minute Analysis"):
with gr.Row():
with gr.Column(scale=1):
min15_symbol = gr.Textbox(label="Stock Symbol (e.g., BBCA.JK)", value="BBCA.JK")
min15_prediction_days = gr.Slider(minimum=1, maximum=2, value=1, step=1, label="Days to Predict")
min15_lookback_days = gr.Slider(minimum=1, maximum=7, value=3, step=1, label="Historical Lookback (Days)")
min15_strategy = gr.Dropdown(choices=["chronos", "technical"], label="Prediction Strategy", value="chronos")
min15_predict_btn = gr.Button("Analyze Stock")
with gr.Column(scale=2):
min15_plot = gr.Plot(label="Analysis and Prediction")
with gr.Row():
with gr.Column():
gr.Markdown("### Trading Signals")
min15_signals_advanced = gr.JSON(label="Advanced Trading Signals")
gr.Markdown("### Risk & Regime Analysis")
min15_risk_metrics = gr.JSON(label="Risk Metrics")
min15_regime_metrics = gr.JSON(label="Regime Metrics")
with gr.Column():
gr.Markdown("### Financial & Stress Test")
min15_sector_metrics = gr.JSON(label="Sector & Financial Metrics")
min15_stress_results = gr.JSON(label="Stress Test Results")
def analyze_stock(symbol, timeframe, prediction_days, lookback_days, strategy,
use_ensemble, use_regime_detection, use_stress_testing,
risk_free_rate, market_index, chronos_weight, technical_weight, statistical_weight,
random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha,
use_covariates=True, use_sentiment=True):
try:
ensemble_weights = {
"chronos": chronos_weight, "technical": technical_weight, "statistical": statistical_weight
}
market_df = get_market_data(market_index, lookback_days)
market_returns = market_df['Returns'] if not market_df.empty else None
signals, fig = make_prediction_enhanced(
symbol=symbol, timeframe=timeframe, prediction_days=prediction_days,
strategy=strategy, use_ensemble=use_ensemble, use_regime_detection=use_regime_detection,
use_stress_testing=use_stress_testing, risk_free_rate=risk_free_rate,
ensemble_weights=ensemble_weights, market_index=market_index, use_covariates=use_covariates,
use_sentiment=use_sentiment, random_real_points=random_real_points, use_smoothing=use_smoothing,
smoothing_type=smoothing_type, smoothing_window=smoothing_window, smoothing_alpha=smoothing_alpha
)
df = get_historical_data(symbol, timeframe, lookback_days)
fundamentals = get_fundamental_data(symbol)
risk_metrics = calculate_advanced_risk_metrics(df, market_returns, risk_free_rate)
sector_metrics = {
"Sector": fundamentals.get("sector"), "Industry": fundamentals.get("industry"),
"Market_Cap": fundamentals.get("marketCap"), "P/E_Ratio": fundamentals.get("trailingPE"),
"Dividend_Yield": fundamentals.get("dividendYield")
}
regime_metrics = signals.get("regime_info", {})
stress_results = signals.get("stress_test_results", {})
advanced_signals = signals.get("advanced_signals", {})
return fig, advanced_signals, risk_metrics, sector_metrics, regime_metrics, stress_results
except Exception as e:
raise gr.Error(str(e))
# Wrapper functions for each tab
def daily_analysis_wrapper(s, pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us):
return analyze_stock(s, "1d", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us)
def hourly_analysis_wrapper(s, pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us):
return analyze_stock(s, "1h", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us)
def min15_analysis_wrapper(s, pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us):
return analyze_stock(s, "15m", pd, ld, st, ue, urd, ust, rfr, mi, cw, tw, sw, rrp, usm, smt, sww, sa, uc, us)
# Connect buttons to their respective wrapper functions
daily_predict_btn.click(
fn=daily_analysis_wrapper,
inputs=[daily_symbol, daily_prediction_days, daily_lookback_days, daily_strategy,
use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index,
chronos_weight, technical_weight, statistical_weight,
random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha,
use_covariates, use_sentiment],
outputs=[daily_plot, daily_signals_advanced, daily_risk_metrics, daily_sector_metrics,
daily_regime_metrics, daily_stress_results]
)
hourly_predict_btn.click(
fn=hourly_analysis_wrapper,
inputs=[hourly_symbol, hourly_prediction_days, hourly_lookback_days, hourly_strategy,
use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index,
chronos_weight, technical_weight, statistical_weight,
random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha,
use_covariates, use_sentiment],
outputs=[hourly_plot, hourly_signals_advanced, hourly_risk_metrics, hourly_sector_metrics,
hourly_regime_metrics, hourly_stress_results]
)
min15_predict_btn.click(
fn=min15_analysis_wrapper,
inputs=[min15_symbol, min15_prediction_days, min15_lookback_days, min15_strategy,
use_ensemble, use_regime_detection, use_stress_testing, risk_free_rate, market_index,
chronos_weight, technical_weight, statistical_weight,
random_real_points, use_smoothing, smoothing_type, smoothing_window, smoothing_alpha,
use_covariates, use_sentiment],
outputs=[min15_plot, min15_signals_advanced, min15_risk_metrics, min15_sector_metrics,
min15_regime_metrics, min15_stress_results]
)
return demo
def get_enhanced_covariate_data(symbol: str, timeframe: str = "1d", lookback_days: int = 365) -> Dict[str, pd.DataFrame]:
"""
Collect enhanced covariate data including market indices, sectors, commodities, and economic indicators.
Args:
symbol (str): Stock symbol
timeframe (str): Data timeframe
lookback_days (int): Number of days to look back
Returns:
Dict[str, pd.DataFrame]: Dictionary of covariate dataframes
"""
try:
covariate_data = {}
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=lookback_days)
# Collect market indices data
print("Collecting market indices data...")
market_data = {}
for index in COVARIATE_SOURCES['market_indices']:
try:
ticker = yf.Ticker(index)
data = retry_yfinance_request(lambda: ticker.history(
start=start_date, end=end_date, interval=timeframe
))
if not data.empty:
market_data[index] = data['Close']
print(f" Successfully collected {index}: {len(data)} data points")
else:
print(f" No data for {index}")
except Exception as e:
print(f" Error fetching {index}: {str(e)}")
# Continue with other indices even if one fails
if market_data:
covariate_data['market_indices'] = pd.DataFrame(market_data)
print(f"Market indices data shape: {covariate_data['market_indices'].shape}")
else:
print("No market indices data collected")
# Collect sector data (skip if not configured)
print("Collecting sector data...")
sector_data = {}
if 'sectors' in COVARIATE_SOURCES:
for sector in COVARIATE_SOURCES['sectors']:
try:
ticker = yf.Ticker(sector)
data = retry_yfinance_request(lambda: ticker.history(
start=start_date, end=end_date, interval=timeframe
))
if not data.empty:
sector_data[sector] = data['Close']
print(f" Successfully collected {sector}: {len(data)} data points")
else:
print(f" No data for {sector}")
except Exception as e:
print(f" Error fetching {sector}: {str(e)}")
# Continue with other sectors even if one fails
if sector_data:
covariate_data['sectors'] = pd.DataFrame(sector_data)
print(f"Sector data shape: {covariate_data['sectors'].shape}")
else:
print("No sector data collected")
else:
print("Sector data source not configured, skipping")
# Collect economic indicators
print("Collecting economic indicators...")
economic_data = {}
for indicator, ticker_symbol in ECONOMIC_INDICATORS.items():
try:
ticker = yf.Ticker(ticker_symbol)
data = retry_yfinance_request(lambda: ticker.history(
start=start_date, end=end_date, interval=timeframe
))
if not data.empty:
economic_data[indicator] = data['Close']
print(f" Successfully collected {indicator} ({ticker_symbol}): {len(data)} data points")
else:
print(f" No data for {indicator} ({ticker_symbol})")
except Exception as e:
print(f" Error fetching {indicator} ({ticker_symbol}): {str(e)}")
# Continue with other indicators even if one fails
if economic_data:
covariate_data['economic_indicators'] = pd.DataFrame(economic_data)
print(f"Economic indicators data shape: {covariate_data['economic_indicators'].shape}")
else:
print("No economic indicators data collected")
# Return whatever data we were able to collect
if not covariate_data:
print("Warning: No covariate data collected, returning empty dict")
return covariate_data
except Exception as e:
print(f"Error collecting covariate data: {str(e)}")
# Return empty dict instead of failing completely
return {}
def calculate_market_sentiment(symbol: str, lookback_days: int = 30) -> Dict[str, float]:
"""
Calculate market sentiment using news sentiment analysis and social media data.
Args:
symbol (str): Stock symbol
lookback_days (int): Number of days to look back
Returns:
Dict[str, float]: Sentiment metrics
"""
if not SENTIMENT_AVAILABLE:
return {'sentiment_score': 0.0, 'sentiment_confidence': 0.0}
try:
sentiment_scores = []
# Get news sentiment (simplified approach using yfinance news)
try:
ticker = yf.Ticker(symbol)
news = retry_yfinance_request(lambda: ticker.news)
if news:
for article in news[:10]: # Analyze last 10 news articles
title = article.get('title', '')
summary = article.get('summary', '')
text = f"{title} {summary}"
# Calculate sentiment using TextBlob
blob = TextBlob(text)
sentiment_scores.append(blob.sentiment.polarity)
except Exception as e:
print(f"Error fetching news sentiment: {str(e)}")
# Don't fail completely, just log the error
# Calculate average sentiment
if sentiment_scores:
avg_sentiment = np.mean(sentiment_scores)
sentiment_confidence = min(0.9, len(sentiment_scores) / 10.0)
else:
avg_sentiment = 0.0
sentiment_confidence = 0.0
return {
'sentiment_score': avg_sentiment,
'sentiment_confidence': sentiment_confidence,
'sentiment_samples': len(sentiment_scores)
}
except Exception as e:
print(f"Error calculating sentiment: {str(e)}")
# Return neutral sentiment on error
return {
'sentiment_score': 0.0,
'sentiment_confidence': 0.0,
'sentiment_samples': 0,
'error': str(e)
}
def calculate_advanced_uncertainty(quantiles: np.ndarray, historical_volatility: float,
market_conditions: Dict = None, confidence_level: float = 0.9) -> Dict[str, np.ndarray]:
"""
Calculate advanced uncertainty estimates using multiple methods.
Args:
quantiles (np.ndarray): Quantile predictions from Chronos
historical_volatility (float): Historical volatility
market_conditions (Dict): Market condition indicators
confidence_level (float): Confidence level for uncertainty calculation
Returns:
Dict[str, np.ndarray]: Multiple uncertainty estimates
"""
try:
lower = quantiles[0, :, 0]
median = quantiles[0, :, 1]
upper = quantiles[0, :, 2]
uncertainties = {}
# 1. Basic quantile-based uncertainty
basic_uncertainty = (upper - lower) / (2 * stats.norm.ppf(confidence_level))
uncertainties['basic'] = basic_uncertainty
# 2. Skewness-adjusted uncertainty
skewed_uncertainty = calculate_skewed_uncertainty(quantiles, confidence_level)
uncertainties['skewed'] = skewed_uncertainty
# 3. Volatility-scaled uncertainty
volatility_scaled = basic_uncertainty * (1 + historical_volatility)
uncertainties['volatility_scaled'] = volatility_scaled
# 4. Market condition adjusted uncertainty
if market_conditions:
vix_level = market_conditions.get('vix', 20.0)
vix_factor = vix_level / 20.0 # Normalize to typical VIX level
market_adjusted = basic_uncertainty * vix_factor
uncertainties['market_adjusted'] = market_adjusted
else:
uncertainties['market_adjusted'] = basic_uncertainty
# 5. Time-decay uncertainty (uncertainty increases with prediction horizon)
time_decay = np.array([basic_uncertainty[i] * (1 + 0.1 * i) for i in range(len(basic_uncertainty))])
uncertainties['time_decay'] = time_decay
# 6. Ensemble uncertainty (combine all methods)
ensemble_uncertainty = np.mean([
uncertainties['basic'],
uncertainties['skewed'],
uncertainties['volatility_scaled'],
uncertainties['market_adjusted'],
uncertainties['time_decay']
], axis=0)
uncertainties['ensemble'] = ensemble_uncertainty
return uncertainties
except Exception as e:
print(f"Advanced uncertainty calculation error: {str(e)}")
# Fallback to basic calculation
return {'basic': (quantiles[0, :, 2] - quantiles[0, :, 0]) / (2 * 1.645)}
def create_enhanced_ensemble_model(df: pd.DataFrame, covariate_data: Dict,
prediction_days: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Create an enhanced ensemble model using multiple algorithms and covariate data.
Args:
df (pd.DataFrame): Stock data
covariate_data (Dict): Covariate data
prediction_days (int): Number of days to predict
Returns:
Tuple[np.ndarray, np.ndarray]: Ensemble predictions and uncertainties
"""
if not ENSEMBLE_AVAILABLE:
return np.array([]), np.array([])
try:
# Prepare features
features = []
target = df['Close'].values
# Technical indicators as features
features.append(df['RSI'].fillna(50).values) # Fill NaN with neutral RSI value
features.append(df['MACD'].fillna(0).values) # Fill NaN with zero
features.append(df['Volatility'].fillna(df['Volatility'].mean()).values) # Fill with mean volatility
if 'BB_Upper' in df.columns:
bb_position = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower'])
bb_position = bb_position.fillna(0.5) # Fill NaN with neutral position
features.append(bb_position.values)
# Add lagged price features to capture temporal patterns
for lag in [1, 2, 3, 5, 10]:
if len(target) > lag:
lagged_prices = np.pad(target[:-lag], (lag, 0), mode='edge')
features.append(lagged_prices)
# Add rolling statistics to capture trends
for window in [5, 10, 20]:
if len(target) >= window:
rolling_mean = pd.Series(target).rolling(window=window, min_periods=1).mean().values
rolling_std = pd.Series(target).rolling(window=window, min_periods=1).std().fillna(0).values
features.append(rolling_mean)
features.append(rolling_std)
# Add price momentum features
if len(target) > 1:
price_momentum_1d = np.pad(np.diff(target), (1, 0), mode='constant', constant_values=0)
price_momentum_5d = np.pad(np.diff(target, n=5), (5, 0), mode='constant', constant_values=0)
features.append(price_momentum_1d)
features.append(price_momentum_5d)
# Add covariate data
if 'market_indices' in covariate_data:
for col in covariate_data['market_indices'].columns:
covariate_series = covariate_data['market_indices'][col]
# Align covariate data to target length
if len(covariate_series) == len(target):
# Fill NaN values with forward fill, then backward fill
filled_series = covariate_series.fillna(method='ffill').fillna(method='bfill')
features.append(filled_series.values)
elif len(covariate_series) > len(target):
# Truncate to target length
truncated_series = covariate_series.tail(len(target))
filled_series = truncated_series.fillna(method='ffill').fillna(method='bfill')
features.append(filled_series.values)
else:
# Pad with last value
padded_values = np.pad(covariate_series.values,
(len(target) - len(covariate_series), 0),
mode='edge')
# Fill any remaining NaN values
filled_values = pd.Series(padded_values).fillna(method='ffill').fillna(method='bfill').values
features.append(filled_values)
if 'economic_indicators' in covariate_data:
for col in covariate_data['economic_indicators'].columns:
covariate_series = covariate_data['economic_indicators'][col]
# Align covariate data to target length
if len(covariate_series) == len(target):
# Fill NaN values with forward fill, then backward fill
filled_series = covariate_series.fillna(method='ffill').fillna(method='bfill')
features.append(filled_series.values)
elif len(covariate_series) > len(target):
# Truncate to target length
truncated_series = covariate_series.tail(len(target))
filled_series = truncated_series.fillna(method='ffill').fillna(method='bfill')
features.append(filled_series.values)
else:
# Pad with last value
padded_values = np.pad(covariate_series.values,
(len(target) - len(covariate_series), 0),
mode='edge')
# Fill any remaining NaN values
filled_values = pd.Series(padded_values).fillna(method='ffill').fillna(method='bfill').values
features.append(filled_values)
# Create feature matrix
X = np.column_stack(features)
y = target
# Validate feature matrix
print(f"Feature matrix shape: {X.shape}, Target shape: {y.shape}")
# Ensure all features have the same length
feature_lengths = [len(feature) for feature in features]
if len(set(feature_lengths)) > 1:
print(f"Warning: Feature lengths are inconsistent: {feature_lengths}")
# Find the minimum length and truncate all features
min_length = min(feature_lengths)
X = X[:min_length]
y = y[:min_length]
print(f"Truncated to minimum length: {min_length}")
# Remove any NaN values
print(f"Checking for NaN values in feature matrix...")
nan_rows = np.isnan(X).any(axis=1)
nan_count = nan_rows.sum()
print(f"Found {nan_count} rows with NaN values out of {len(X)} total rows")
if nan_count > 0:
# Check which features have NaN values
for i, feature_name in enumerate(['RSI', 'MACD', 'Volatility', 'BB_Position'] +
[f'Market_{j}' for j in range(len(features)-4)]):
if i < X.shape[1]:
nan_count_feature = np.isnan(X[:, i]).sum()
if nan_count_feature > 0:
print(f" Feature {feature_name}: {nan_count_feature} NaN values")
# Only remove rows if there are still NaN values after filling
if nan_count > 0:
mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y)
X = X[mask]
y = y[mask]
print(f"Removed {nan_count} rows with NaN values")
else:
print("No NaN values found in feature matrix")
print(f"After NaN removal - X shape: {X.shape}, y shape: {y.shape}")
if len(X) < 50: # Need sufficient data
print(f"Insufficient data after preprocessing: {len(X)} samples")
print("This might be due to:")
print("1. Too many NaN values in covariate data")
print("2. Insufficient historical data")
print("3. Data alignment issues")
print("Trying fallback with technical indicators only...")
# Fallback: Use only technical indicators
try:
fallback_features = []
fallback_features.append(df['RSI'].fillna(50).values)
fallback_features.append(df['MACD'].fillna(0).values)
fallback_features.append(df['Volatility'].fillna(df['Volatility'].mean()).values)
if 'BB_Upper' in df.columns:
bb_position = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower'])
bb_position = bb_position.fillna(0.5)
fallback_features.append(bb_position.values)
X_fallback = np.column_stack(fallback_features)
y_fallback = df['Close'].values
# Remove any remaining NaN values
mask = ~np.isnan(X_fallback).any(axis=1) & ~np.isnan(y_fallback)
X_fallback = X_fallback[mask]
y_fallback = y_fallback[mask]
print(f"Fallback feature matrix shape: {X_fallback.shape}")
if len(X_fallback) >= 50:
print("Using fallback ensemble with technical indicators only")
# Use the fallback data for the rest of the function
X = X_fallback
y = y_fallback
else:
print("Fallback also failed, returning empty arrays")
return np.array([]), np.array([])
except Exception as fallback_error:
print(f"Fallback failed: {str(fallback_error)}")
return np.array([]), np.array([])
# Initialize models
models = {
'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'ridge': Ridge(alpha=1.0),
'lasso': Lasso(alpha=0.1),
'svr': SVR(kernel='rbf', C=1.0, gamma='scale'),
'mlp': MLPRegressor(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42)
}
# Train models using time series cross-validation
predictions = {}
uncertainties = {}
for name, model in models.items():
try:
# Use ALL available data for training (no train/test split)
# This maximizes the use of historical information
print(f"Training {name} on all {len(X)} data points...")
# Train model on all available data
model.fit(X, y)
# Generate predictions for the full prediction period
# Use the most recent data points to generate future predictions
if len(X) >= prediction_days:
# Use the last prediction_days data points for prediction
# This ensures we're using the most recent patterns and trends
X_pred = X[-prediction_days:]
pred = model.predict(X_pred)
print(f" {name}: Generated {len(pred)} predictions using last {prediction_days} data points")
else:
# If we don't have enough data, use all available data and extrapolate
pred = model.predict(X)
print(f" {name}: Generated {len(pred)} predictions using all {len(X)} data points")
if len(pred) < prediction_days:
# Extend with trend-based predictions
if len(pred) > 0:
# Calculate trend from last few predictions
trend_window = min(5, len(pred))
if trend_window > 1:
trend = np.mean(np.diff(pred[-trend_window:]))
else:
trend = 0
# Extend predictions using the trend
last_pred = pred[-1] if len(pred) > 0 else y[-1]
for i in range(len(pred), prediction_days):
next_pred = last_pred + trend
pred = np.append(pred, next_pred)
last_pred = next_pred
print(f" {name}: Extended to {len(pred)} predictions using trend extrapolation")
else:
# No predictions available, use simple extrapolation
last_price = y[-1]
pred = np.array([last_price * (1 + 0.001 * i) for i in range(prediction_days)])
print(f" {name}: Generated {len(pred)} predictions using simple extrapolation")
# Calculate uncertainty using cross-validation on all data
# This gives us a better estimate of model performance
try:
cv_scores = cross_val_score(model, X, y, cv=min(5, len(X)//10), scoring='neg_mean_squared_error')
mse = -np.mean(cv_scores)
uncertainty = np.sqrt(mse) * np.ones(prediction_days)
print(f" {name} CV MSE: {mse:.6f}")
except Exception as cv_error:
print(f" {name} CV failed, using fallback uncertainty: {str(cv_error)}")
# Fallback uncertainty based on historical volatility
uncertainty = np.std(y) * np.ones(prediction_days)
# Ensure prediction is the right length
if len(pred) != prediction_days:
print(f"Warning: {name} produced {len(pred)} predictions, expected {prediction_days}")
if len(pred) > prediction_days:
pred = pred[:prediction_days]
uncertainty = uncertainty[:prediction_days]
else:
# Extend with trend-based predictions
if len(pred) > 0:
# Calculate trend from last few predictions
trend_window = min(5, len(pred))
if trend_window > 1:
trend = np.mean(np.diff(pred[-trend_window:]))
else:
trend = 0
# Extend predictions using the trend
last_pred = pred[-1] if len(pred) > 0 else y[-1]
for i in range(len(pred), prediction_days):
next_pred = last_pred + trend
pred = np.append(pred, next_pred)
last_pred = next_pred
else:
# No predictions available, use simple extrapolation
last_price = y[-1]
pred = np.array([last_price * (1 + 0.001 * i) for i in range(prediction_days)])
# Validate prediction
if len(pred) == prediction_days and len(uncertainty) == prediction_days:
predictions[name] = pred
uncertainties[name] = uncertainty
else:
print(f"Warning: {name} prediction validation failed. Skipping.")
continue
except Exception as e:
print(f"Error training {name}: {str(e)}")
continue
if not predictions:
return np.array([]), np.array([])
# Debug: Print prediction lengths
print(f"Ensemble model debugging - Expected prediction_days: {prediction_days}")
for name, pred in predictions.items():
print(f" {name}: prediction length = {len(pred)}, uncertainty length = {len(uncertainties.get(name, []))}")
# Combine predictions using weighted average
weights = {}
model_performances = {}
for name in predictions.keys():
if name in uncertainties:
# Calculate model performance on training data
try:
# Use cross-validation score as performance metric
cv_scores = cross_val_score(models[name], X, y, cv=min(5, len(X)//10), scoring='r2')
performance = np.mean(cv_scores)
model_performances[name] = performance
# Weight based on both performance and uncertainty
# Higher performance and lower uncertainty = higher weight
uncertainty_factor = 1.0 / np.mean(uncertainties[name])
performance_factor = max(0, performance) # Ensure non-negative
# Combine factors (you can adjust the balance)
weights[name] = (0.7 * performance_factor + 0.3 * uncertainty_factor)
print(f" {name}: Performance={performance:.4f}, Uncertainty={np.mean(uncertainties[name]):.4f}, Weight={weights[name]:.4f}")
except Exception as e:
print(f" {name}: Performance calculation failed, using uncertainty only: {str(e)}")
# Fallback to uncertainty-based weighting
weights[name] = 1.0 / np.mean(uncertainties[name])
model_performances[name] = 0.0
else:
# Equal weight if no uncertainty available
weights[name] = 1.0
model_performances[name] = 0.0
# Normalize weights
total_weight = sum(weights.values())
if total_weight > 0:
weights = {k: v / total_weight for k, v in weights.items()}
else:
# Equal weights if all uncertainties are zero
weights = {k: 1.0 / len(weights) for k in weights.keys()}
# Calculate ensemble prediction
ensemble_pred = np.zeros(prediction_days)
ensemble_uncertainty = np.zeros(prediction_days)
for name, pred in predictions.items():
if name in weights:
# Ensure prediction is the right length
pred_array = np.array(pred)
uncertainty_array = np.array(uncertainties[name])
# Handle different prediction lengths
if len(pred_array) >= prediction_days:
# Truncate to prediction_days
pred_array = pred_array[:prediction_days]
uncertainty_array = uncertainty_array[:prediction_days]
elif len(pred_array) < prediction_days:
# Extend with the last value
last_pred = pred_array[-1] if len(pred_array) > 0 else 0
last_uncertainty = uncertainty_array[-1] if len(uncertainty_array) > 0 else 1.0
# Pad with last values
pred_array = np.pad(pred_array, (0, prediction_days - len(pred_array)),
mode='constant', constant_values=last_pred)
uncertainty_array = np.pad(uncertainty_array, (0, prediction_days - len(uncertainty_array)),
mode='constant', constant_values=last_uncertainty)
# Ensure arrays are the correct shape
if len(pred_array) != prediction_days:
print(f"Warning: {name} prediction length mismatch. Expected {prediction_days}, got {len(pred_array)}")
continue
if len(uncertainty_array) != prediction_days:
print(f"Warning: {name} uncertainty length mismatch. Expected {prediction_days}, got {len(uncertainty_array)}")
continue
# Add weighted contribution
ensemble_pred += weights[name] * pred_array
ensemble_uncertainty += weights[name] * uncertainty_array
return ensemble_pred, ensemble_uncertainty
except Exception as e:
print(f"Enhanced ensemble model error: {str(e)}")
print("Falling back to simple ensemble prediction...")
# Fallback: Simple ensemble using basic models
try:
# Use simple linear regression as fallback
X = np.arange(len(df)).reshape(-1, 1)
y = df['Close'].values
# Simple linear trend
slope = np.polyfit(X.flatten(), y, 1)[0]
last_price = y[-1]
# Generate simple predictions
ensemble_pred = np.array([last_price + slope * i for i in range(1, prediction_days + 1)])
ensemble_uncertainty = np.std(y) * np.ones(prediction_days)
return ensemble_pred, ensemble_uncertainty
except Exception as fallback_error:
print(f"Fallback ensemble also failed: {str(fallback_error)}")
return np.array([]), np.array([])
def calculate_volume_prediction_enhanced(df: pd.DataFrame, price_prediction: np.ndarray,
covariate_data: Dict = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Enhanced volume prediction using multiple factors and relationships.
Args:
df (pd.DataFrame): Stock data
price_prediction (np.ndarray): Price predictions
covariate_data (Dict): Covariate data
Returns:
Tuple[np.ndarray, np.ndarray]: Volume predictions and uncertainties
"""
try:
# Get historical volume and price data
volume_data = df['Volume'].values
price_data = df['Close'].values
# Calculate volume-price relationship
volume_price_ratio = volume_data / price_data
volume_volatility = np.std(volume_data)
# Calculate volume momentum
volume_momentum = np.diff(volume_data)
avg_volume_momentum = np.mean(volume_momentum[-10:]) # Last 10 periods
# Predict volume based on price movement
price_changes = np.diff(price_prediction)
predicted_volume = []
for i, price_change in enumerate(price_changes):
if i == 0:
# First prediction based on last actual volume
base_volume = volume_data[-1]
else:
# Subsequent predictions based on price movement and momentum
base_volume = predicted_volume[-1]
# Adjust volume based on price movement
if abs(price_change) > 0.01: # Significant price movement
volume_multiplier = 1.5 if abs(price_change) > 0.02 else 1.2
else:
volume_multiplier = 0.8
# Add momentum effect
momentum_effect = 1.0 + (avg_volume_momentum / base_volume) * 0.1
# Calculate predicted volume
pred_vol = base_volume * volume_multiplier * momentum_effect
# Add some randomness based on historical volatility
noise = np.random.normal(0, volume_volatility * 0.1)
pred_vol = max(0, pred_vol + noise)
predicted_volume.append(pred_vol)
# Add first prediction
predicted_volume.insert(0, volume_data[-1])
# Calculate uncertainty
volume_uncertainty = volume_volatility * np.ones(len(predicted_volume))
# Adjust uncertainty based on prediction horizon
for i in range(len(volume_uncertainty)):
volume_uncertainty[i] *= (1 + 0.1 * i)
return np.array(predicted_volume), np.array(volume_uncertainty)
except Exception as e:
print(f"Enhanced volume prediction error: {str(e)}")
# Fallback to simple prediction
last_volume = df['Volume'].iloc[-1]
return np.full(len(price_prediction), last_volume), np.full(len(price_prediction), last_volume * 0.2)
def calculate_regime_aware_uncertainty(quantiles: np.ndarray, regime_info: Dict,
market_conditions: Dict = None) -> np.ndarray:
"""
Calculate uncertainty that accounts for market regime changes.
Args:
quantiles (np.ndarray): Quantile predictions
regime_info (Dict): Market regime information
market_conditions (Dict): Current market conditions
Returns:
np.ndarray: Regime-aware uncertainty estimates
"""
try:
# Get basic uncertainty
basic_uncertainty = calculate_skewed_uncertainty(quantiles)
# Get regime information
if regime_info and 'volatilities' in regime_info:
current_volatility = regime_info.get('current_volatility', np.mean(regime_info['volatilities']))
avg_volatility = np.mean(regime_info['volatilities'])
volatility_ratio = current_volatility / avg_volatility
else:
volatility_ratio = 1.0
# Adjust uncertainty based on regime
regime_adjusted = basic_uncertainty * volatility_ratio
# Add market condition adjustments
if market_conditions:
vix_level = market_conditions.get('vix', 20.0)
vix_factor = vix_level / 20.0
regime_adjusted *= vix_factor
return regime_adjusted
except Exception as e:
print(f"Regime-aware uncertainty calculation error: {str(e)}")
return calculate_skewed_uncertainty(quantiles)
def get_market_info_from_yfinance(symbol: str) -> Dict:
"""
Get market information from yfinance using the recommended API methods.
Args:
symbol (str): Market symbol (e.g., '^GSPC', 'EURUSD=X', 'BTC-USD')
Returns:
Dict: Market information including current price, volume, and other metrics
"""
try:
ticker = yf.Ticker(symbol)
# Get basic info with retry - use get_info() method as recommended
info = retry_yfinance_request(lambda: ticker.info)
# Get current market data with retry
try:
hist = retry_yfinance_request(lambda: ticker.history(period="1d"))
except Exception as e:
print(f"Error fetching history for {symbol}: {str(e)}")
hist = None
# Get additional market data
market_data = {}
if hist is not None and not hist.empty:
market_data.update({
'current_price': hist['Close'].iloc[-1],
'open_price': hist['Open'].iloc[-1],
'high_price': hist['High'].iloc[-1],
'low_price': hist['Low'].iloc[-1],
'volume': hist['Volume'].iloc[-1],
'change': hist['Close'].iloc[-1] - hist['Open'].iloc[-1],
'change_percent': ((hist['Close'].iloc[-1] - hist['Open'].iloc[-1]) / hist['Open'].iloc[-1]) * 100
})
# Get news if available with retry
try:
news = retry_yfinance_request(lambda: ticker.news)
market_data['news_count'] = len(news) if news else 0
except Exception as e:
print(f"Error fetching news for {symbol}: {str(e)}")
market_data['news_count'] = 0
# Skip earnings and recommendations for symbols that typically don't have them
symbols_without_earnings = ['^', '=', 'F', 'X', 'USD', 'EUR', 'GBP', 'JPY', 'BTC', 'ETH', 'GC', 'SI', 'CL', 'NG']
skip_earnings = any(symbol in symbol.upper() for symbol in symbols_without_earnings)
additional_data = {}
if skip_earnings:
market_data['earnings'] = []
market_data['recommendations'] = []
else:
# Get recommendations if available with retry
try:
recommendations = retry_yfinance_request(lambda: ticker.recommendations)
if recommendations is not None and hasattr(recommendations, 'empty') and not recommendations.empty:
market_data['recommendations'] = recommendations.tail(5).to_dict('records')
else:
market_data['recommendations'] = []
except Exception as e:
print(f"Error fetching recommendations for {symbol}: {str(e)}")
market_data['recommendations'] = []
# Get earnings info if available with retry
try:
earnings = retry_yfinance_request(lambda: ticker.earnings)
# Check if earnings is None or empty before accessing .empty
if earnings is not None and hasattr(earnings, 'empty') and not earnings.empty:
market_data['earnings'] = earnings.tail(4).to_dict('records')
else:
market_data['earnings'] = []
except Exception as e:
print(f"Error fetching earnings for {symbol}: {str(e)}")
market_data['earnings'] = []
# For stocks, try to get dividends and splits
try:
dividends = retry_yfinance_request(lambda: ticker.dividends)
if dividends is not None and hasattr(dividends, 'empty') and not dividends.empty:
additional_data['dividends'] = dividends.tail(4).to_dict('records')
else:
additional_data['dividends'] = []
except Exception as e:
print(f"Error fetching dividends for {symbol}: {str(e)}")
additional_data['dividends'] = []
try:
splits = retry_yfinance_request(lambda: ticker.splits)
if splits is not None and hasattr(splits, 'empty') and not splits.empty:
additional_data['splits'] = splits.tail(4).to_dict('records')
else:
additional_data['splits'] = []
except Exception as e:
print(f"Error fetching splits for {symbol}: {str(e)}")
additional_data['splits'] = []
# Combine all data
result = {
'symbol': symbol,
'info': info,
'market_data': market_data,
'additional_data': additional_data,
'timestamp': datetime.now().isoformat()
}
return result
except Exception as e:
print(f"Error fetching market info for {symbol}: {str(e)}")
return {
'symbol': symbol,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def get_enhanced_market_summary() -> Dict:
"""
Get enhanced market summary for all configured markets using yfinance.
Returns:
Dict: Summary of all markets with current data
"""
market_summary = {}
for market_key, config in MARKET_CONFIGS.items():
try:
symbol = config['symbol']
market_info = get_market_info_from_yfinance(symbol)
market_status = market_status_manager.get_status(market_key)
market_summary[market_key] = {
'config': config,
'status': {
'is_open': market_status.is_open,
'status_text': market_status.status_text,
'current_time': market_status.current_time_et,
'next_trading_day': market_status.next_trading_day
},
'market_data': market_info.get('market_data', {}),
'info': market_info.get('info', {}),
'last_updated': datetime.now().isoformat()
}
except Exception as e:
market_summary[market_key] = {
'config': config,
'error': str(e),
'last_updated': datetime.now().isoformat()
}
return market_summary
def check_market_status_simple(market_key: str) -> str:
"""
Simple function to check market status for a specific market.
Args:
market_key (str): Market key from MARKET_CONFIGS
Returns:
str: Formatted market status information
"""
try:
# Get market status
status = market_status_manager.get_status(market_key)
# Create a user-friendly display
status_emoji = "🟢" if status.is_open else "🔴"
status_text = "OPEN" if status.is_open else "CLOSED"
result = f"""
## {status_emoji} {status.market_name} Status: {status_text}
**Current Status:** {status.status_text}
**Market Details:**
- **Type:** {status.market_type.title()}
- **Symbol:** {status.market_symbol}
- **Current Time:** {status.current_time_et}
- **Last Updated:** {status.last_updated}
**Trading Information:**
- **Next Trading Day:** {status.next_trading_day}
- **Time Until Open:** {status.time_until_open}
- **Time Until Close:** {status.time_until_close}
**Market Description:** {MARKET_CONFIGS[market_key]['description']}
"""
return result
except Exception as e:
return f"❌ Error checking market status: {str(e)}"
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate technical indicators for a given DataFrame.
Args:
df (pd.DataFrame): DataFrame with OHLCV data
Returns:
pd.DataFrame: DataFrame with technical indicators added
"""
try:
# Calculate moving averages
df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean()
df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean()
df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean()
# Calculate RSI
df['RSI'] = calculate_rsi(df['Close'])
# Calculate MACD
df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close'])
# Calculate Bollinger Bands
df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])
# Calculate additional volatility metrics
df['Annualized_Vol'] = df['Volatility'] * np.sqrt(252)
# Calculate drawdown metrics
df['Rolling_Max'] = df['Close'].rolling(window=len(df), min_periods=1).max()
df['Drawdown'] = (df['Close'] - df['Rolling_Max']) / df['Rolling_Max']
df['Max_Drawdown'] = df['Drawdown'].rolling(window=len(df), min_periods=1).min()
# Calculate liquidity metrics
df['Avg_Daily_Volume'] = df['Volume'].rolling(window=20, min_periods=1).mean()
df['Volume_Volatility'] = df['Volume'].rolling(window=20, min_periods=1).std()
# Fill any remaining NaN values
df = df.fillna(method='ffill').fillna(method='bfill')
return df
except Exception as e:
print(f"Error calculating technical indicators: {str(e)}")
return df
def get_fundamental_data(symbol: str) -> dict:
"""
Fetch fundamental data for a given symbol using yfinance's info property.
Returns a dictionary of fundamental data, or an empty dict if unavailable.
"""
try:
ticker = yf.Ticker(symbol)
info = retry_yfinance_request(lambda: ticker.info)
if info is not None and isinstance(info, dict):
return info
else:
print(f"No fundamental info available for {symbol}")
return {}
except Exception as e:
print(f"Error fetching fundamental data for {symbol}: {str(e)}")
return {} # Fallback to empty dict
if __name__ == "__main__":
import signal
import atexit
# Register cleanup function
atexit.register(cleanup_on_exit)
# Handle SIGINT and SIGTERM for graceful shutdown
def signal_handler(signum, frame):
print(f"\nReceived signal {signum}. Shutting down gracefully...")
cleanup_on_exit()
exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
demo = create_interface()
print("Starting Advanced Stock Prediction Analysis with real-time market status updates...")
print("Market status will update every 15 minutes automatically.")
demo.launch(ssr_mode=False, mcp_server=True)
except KeyboardInterrupt:
print("\nApplication interrupted by user. Shutting down...")
cleanup_on_exit()
except Exception as e:
print(f"Error starting application: {str(e)}")
cleanup_on_exit()
raise