Spaces:
Sleeping
Sleeping
| # trade_analysis/enhanced_api.py | |
| import os | |
| from fastapi import FastAPI, Query, HTTPException | |
| from pydantic import BaseModel | |
| import httpx | |
| from typing import Dict, Any, List | |
| import pandas as pd | |
| import numpy as np | |
| import asyncio | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Import only modules that still exist | |
| from .data import UnifiedDataProvider | |
| from .indicators import enrich_with_indicators, identify_current_setup | |
| from .enhanced_sentiment import EnhancedFinancialSentimentAnalyzer, analyze_momentum_sentiment | |
| from .momentum_trading_engine import IntegratedMomentumEngine | |
| from .enhanced_llm import EnhancedLLMEngine, generate_enhanced_llm_signal | |
| from .tft_model import GapPredictionTFT | |
| from .agent import TradingAgent, analyze_agent_performance | |
| # Global dictionary to store TFT models | |
| api_tft_models = {} | |
| trading_agent = None | |
| def sanitize_for_json(data: any) -> any: | |
| """Recursively converts numpy and pandas types to JSON-serializable types.""" | |
| if isinstance(data, dict): | |
| return {key: sanitize_for_json(value) for key, value in data.items()} | |
| elif isinstance(data, list): | |
| return [sanitize_for_json(item) for item in data] | |
| elif isinstance(data, np.bool_): | |
| return bool(data) | |
| elif isinstance(data, (np.integer, np.int64)): | |
| return int(data) | |
| elif isinstance(data, np.floating): | |
| return float(data) | |
| elif isinstance(data, pd.Timestamp): | |
| return data.isoformat() | |
| elif isinstance(data, (pd.Series, pd.Index, np.ndarray)): | |
| return data.tolist() | |
| return data | |
| class EnhancedSignalResponse(BaseModel): | |
| """Enhanced response model with momentum and LLM analysis""" | |
| symbol: str | |
| signal: str | |
| confidence: float | |
| reasoning: str | |
| position_size: float | |
| status: str | |
| details: Dict[str, Any] | |
| # Enhanced fields | |
| momentum_analysis: Dict[str, Any] = {} | |
| llm_ensemble: Dict[str, Any] = {} | |
| options_strategy: Dict[str, Any] = {} | |
| timeframe_recommendation: str = "15m" | |
| expected_hold_time: str = "Unknown" | |
| # Enhanced FastAPI App | |
| app = FastAPI( | |
| title="Enhanced Intraday Momentum Engine", | |
| version="2.0.0", | |
| description="SOTA Financial AI with multi-LLM ensemble and momentum analysis" | |
| ) | |
| # Initialize enhanced components | |
| data_provider = UnifiedDataProvider() | |
| sentiment_analyzer = EnhancedFinancialSentimentAnalyzer() | |
| momentum_engine = IntegratedMomentumEngine() | |
| llm_engine = EnhancedLLMEngine() | |
| tft_predictor = GapPredictionTFT(context_length=96, prediction_length=1) | |
| async def startup_event(): | |
| """Initialize all AI models on startup and launch the agent.""" | |
| print("🚀 Starting Enhanced Trading Engine...") | |
| # --- This new logic checks the environment before loading models --- | |
| from .deploy import DeploymentConfig | |
| config = DeploymentConfig.auto_detect() | |
| # Load sentiment models regardless of environment | |
| print("📊 Loading sentiment models...") | |
| sentiment_analyzer.initialize_models() | |
| # Only load LLMs if we are NOT on a CPU | |
| if config.device != "cpu": | |
| print("🧠 Loading LLM ensemble...") | |
| llm_engine.initialize_llm_models() | |
| else: | |
| print("🚫 CPU environment detected. Skipping LLM loading.") | |
| # Load TFT models | |
| print("🤖 Loading TFT models...") | |
| # (Your existing TFT model loading logic here, ensure it writes to /tmp if needed) | |
| symbols = ['QQQ', 'SPY', 'MSFT', 'TSLA', 'NVDA', 'META'] | |
| for symbol in symbols: | |
| model_path = f"/tmp/tft_{symbol}_validated.pth" # Use /tmp for models | |
| tft_instance = GapPredictionTFT() | |
| # (The rest of your TFT loading logic...) | |
| api_tft_models[symbol] = tft_instance | |
| # Initialize and run the agent as a background task | |
| global trading_agent | |
| trading_agent = TradingAgent(api_url="http://localhost:7860") | |
| print("🤖 Launching Trading Agent as a background task...") | |
| asyncio.create_task(trading_agent.run()) | |
| print("✅ Enhanced Trading Engine startup complete!") | |
| def read_root(): | |
| """Enhanced root endpoint with system info""" | |
| import torch | |
| gpu_info = "CPU only" | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| gpu_info = f"{gpu_name} ({gpu_memory:.1f} GB)" | |
| return { | |
| "status": "operational", | |
| "engine": "Enhanced Intraday Momentum Engine v2.0.0", | |
| "gpu_info": gpu_info, | |
| "features": [ | |
| "Multi-LLM Ensemble Analysis", | |
| "Advanced Sentiment Analysis (10+ models)", | |
| "High-Frequency Momentum Engine", | |
| "Options Strategy Generation", | |
| "TFT Gap Prediction", | |
| "Autonomous Trading Agent" | |
| ], | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def predict_enhanced_signal( | |
| symbol: str = Query(..., description="Stock symbol (e.g., QQQ, SPY)"), | |
| timeframe: str = Query("5m", description="Trading timeframe: 1m, 5m, 15m, 1h"), | |
| strategy_mode: str = Query("momentum", description="Strategy: momentum, scalp, gap, swing") | |
| ): | |
| """ | |
| Enhanced prediction endpoint with full AI stack | |
| """ | |
| try: | |
| start_time = datetime.now() | |
| # Fetch market data | |
| async with httpx.AsyncClient() as client: | |
| print(f"📈 Fetching data for {symbol}...") | |
| # Multi-timeframe OHLCV data | |
| ohlcv_data = await data_provider.fetch_multi_timeframe_stock_data(symbol) | |
| # News and social data - FIXED SYNTAX | |
| news_data, _ = await data_provider.fetch_news(symbol, client) | |
| reddit_data, _ = await data_provider.fetch_reddit_data(symbol) | |
| # Alternative data | |
| alt_data = data_provider.get_alternative_data(symbol) | |
| # Process dataframes | |
| news_df = pd.DataFrame(news_data) if news_data else pd.DataFrame() | |
| reddit_df = pd.DataFrame(reddit_data) if reddit_data else pd.DataFrame() | |
| # Technical analysis for each timeframe | |
| tech_setups = {} | |
| for tf, df in ohlcv_data.items(): | |
| if not df.empty: | |
| enriched_df = enrich_with_indicators(df.copy(), tf) | |
| tech_setups[tf] = identify_current_setup(enriched_df, tf) | |
| print("🔄 Running AI analysis...") | |
| # 1. Enhanced Sentiment Analysis | |
| sentiment_analysis = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| analyze_momentum_sentiment, | |
| news_df, reddit_df, symbol, timeframe | |
| ) | |
| # 2. Momentum Analysis | |
| momentum_analysis = momentum_engine.generate_enhanced_signal( | |
| ohlcv_data, sentiment_analysis, alt_data | |
| ) | |
| # 3. TFT Prediction | |
| daily_df = ohlcv_data.get("daily") | |
| tft_prediction = None | |
| tft_model = api_tft_models.get(symbol.upper()) | |
| if daily_df is not None and len(daily_df) >= 96 and tft_model: | |
| if tft_model.is_trained: | |
| tft_prediction = tft_model.predict_gap_probability(daily_df) | |
| print(f"🚀 Using pretrained TFT model for {symbol}") | |
| else: | |
| print(f"🤖 Training TFT model for {symbol}...") | |
| tft_model.train(daily_df, epochs=20) | |
| tft_prediction = tft_model.predict_gap_probability(daily_df) | |
| else: | |
| if tft_model: | |
| tft_prediction = tft_model._default_prediction() | |
| else: | |
| temp_tft = GapPredictionTFT() | |
| tft_prediction = temp_tft._default_prediction() | |
| # 4. LLM Ensemble Analysis | |
| llm_analysis = {} | |
| try: | |
| llm_analysis = llm_engine.generate_enhanced_trading_signal( | |
| ohlcv_data, sentiment_analysis, momentum_analysis, alt_data | |
| ) | |
| except Exception as e: | |
| print(f"LLM analysis failed: {e}") | |
| conditions = { | |
| "is_vix_high": alt_data.get('vix_level', 0) > 25, | |
| "is_15m_rsi_bullish": tech_setups.get("15m", {}).get('rsi', 50) > 65, | |
| "is_15m_rsi_bearish": tech_setups.get("15m", {}).get('rsi', 50) < 35, | |
| "is_15m_volume_spike": tech_setups.get("15m", {}).get('volume_spike', False), | |
| "is_hourly_trend_bullish": tech_setups.get("hourly", {}).get('direction') == 'up', | |
| "is_hourly_trend_bearish": tech_setups.get("hourly", {}).get('direction') == 'down' | |
| } | |
| llm_analysis = generate_enhanced_llm_signal(conditions) | |
| # 5. Master Signal Generation - FIXED FUNCTION NAME | |
| master_signal = _generate_master_signal( | |
| momentum_analysis, llm_analysis, sentiment_analysis, tft_prediction, | |
| timeframe, strategy_mode | |
| ) | |
| # 6. Options Strategy - FIXED FUNCTION NAME | |
| options_strategy = _generate_options_strategy( | |
| master_signal, momentum_analysis, alt_data, timeframe, strategy_mode | |
| ) | |
| # Calculate processing time | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| # Prepare response | |
| sanitized_details = sanitize_for_json({ | |
| "tech_setups": tech_setups, | |
| "sentiment": sentiment_analysis, | |
| "alternative_data": alt_data, | |
| "tft_prediction": tft_prediction, | |
| "processing_time_seconds": processing_time, | |
| "data_sources": { | |
| "news_articles": len(news_df), | |
| "social_posts": len(reddit_df), | |
| "timeframes_analyzed": list(ohlcv_data.keys()) | |
| } | |
| }) | |
| return EnhancedSignalResponse( | |
| symbol=symbol, | |
| signal=master_signal["signal"], | |
| confidence=master_signal["confidence"], | |
| reasoning=master_signal["reasoning"], | |
| position_size=master_signal["position_size"], | |
| status="Success", | |
| details=sanitized_details, | |
| momentum_analysis=sanitize_for_json(momentum_analysis), | |
| llm_ensemble=sanitize_for_json(llm_analysis), | |
| options_strategy=sanitize_for_json(options_strategy), | |
| timeframe_recommendation=master_signal.get("timeframe", timeframe), | |
| expected_hold_time=master_signal.get("hold_time", "Unknown") | |
| ) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"Enhanced analysis failed: {e}") | |
| async def start_agent(): | |
| """Start the autonomous agent""" | |
| if trading_agent: | |
| asyncio.create_task(trading_agent.run()) | |
| return {"status": "Agent started"} | |
| return {"status": "Agent not initialized"} | |
| async def get_agent_stats(): | |
| """Get agent's performance stats""" | |
| if trading_agent: | |
| return trading_agent.get_stats() | |
| return {"error": "Agent not initialized"} | |
| async def get_agent_positions(): | |
| """Get current positions""" | |
| if trading_agent: | |
| return {"positions": trading_agent.positions} | |
| return {"positions": {}} | |
| async def analyze_agent(): | |
| """Analyze agent's performance""" | |
| try: | |
| analyze_agent_performance() | |
| return {"status": "Analysis complete - check console output"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def _generate_master_signal(momentum_analysis: Dict, llm_analysis: Dict, | |
| sentiment_analysis: Dict, tft_prediction: Dict, | |
| timeframe: str = "15m", strategy_mode: str = "momentum") -> Dict: | |
| """Generate master trading signal from all analyses - FIXED VERSION""" | |
| # Extract signals | |
| momentum_signal = momentum_analysis.get("signal", "HOLD") | |
| momentum_confidence = momentum_analysis.get("confidence", 50) | |
| # Use the actual momentum analysis results | |
| momentum_master = momentum_analysis.get("momentum_analysis", {}).get("master_signal", {}) | |
| momentum_strategy = momentum_master.get("strategy", "WAIT") | |
| llm_signal = llm_analysis.get("signal", "HOLD") | |
| sentiment_composite = sentiment_analysis.get("composite_score", 0) | |
| tft_direction = tft_prediction.get("expected_direction", "FLAT") if tft_prediction else "FLAT" | |
| # TIMEFRAME-SPECIFIC THRESHOLDS | |
| timeframe_configs = { | |
| "1m": { | |
| "threshold": 0.2, | |
| "min_confidence": 70, | |
| "hold_time": "1-2 minutes", | |
| "position_multiplier": 0.5 | |
| }, | |
| "5m": { | |
| "threshold": 0.25, | |
| "min_confidence": 65, | |
| "hold_time": "2-5 minutes", | |
| "position_multiplier": 0.7 | |
| }, | |
| "15m": { | |
| "threshold": 0.3, | |
| "min_confidence": 60, | |
| "hold_time": "10-30 minutes", | |
| "position_multiplier": 1.0 | |
| }, | |
| "1h": { | |
| "threshold": 0.35, | |
| "min_confidence": 55, | |
| "hold_time": "30-60 minutes", | |
| "position_multiplier": 1.2 | |
| } | |
| } | |
| config = timeframe_configs.get(timeframe, timeframe_configs["15m"]) | |
| # STRATEGY MODE ADJUSTMENTS | |
| if strategy_mode == "scalp": | |
| config["threshold"] *= 0.8 | |
| config["hold_time"] = "1-3 minutes" | |
| elif strategy_mode == "gap" and tft_prediction: | |
| if tft_direction != "FLAT" and tft_prediction.get("gap_probability", 50) > 70: | |
| config["min_confidence"] -= 10 | |
| # Calculate weighted score | |
| if momentum_strategy in ["AGGRESSIVE_SCALP", "STANDARD_MOMENTUM"]: | |
| weighted_score = momentum_master.get("conviction", 0) | |
| weighted_confidence = momentum_confidence | |
| else: | |
| weights = { | |
| "momentum": 0.4, | |
| "llm": 0.25, | |
| "sentiment": 0.2, | |
| "tft": 0.15 | |
| } | |
| signal_scores = {} | |
| signal_scores["momentum"] = 1.0 if momentum_signal == "CALLS" else -1.0 if momentum_signal == "PUTS" else 0.0 | |
| signal_scores["llm"] = 1.0 if llm_signal == "CALLS" else -1.0 if llm_signal == "PUTS" else 0.0 | |
| signal_scores["sentiment"] = np.clip(sentiment_composite, -1, 1) | |
| signal_scores["tft"] = 0.7 if tft_direction == "UP" else -0.7 if tft_direction == "DOWN" else 0.0 | |
| weighted_score = sum(signal_scores[k] * weights[k] for k in weights) | |
| weighted_confidence = (momentum_confidence * 0.4 + | |
| llm_analysis.get("conviction", 50) * 0.3 + | |
| (sentiment_analysis.get("confidence", "LOW") == "HIGH") * 80 * 0.3) | |
| # Generate final signal | |
| if weighted_score > config["threshold"] and weighted_confidence > config["min_confidence"]: | |
| final_signal = "CALLS" | |
| position_size = min(0.5, (weighted_confidence / 100) * config["position_multiplier"]) | |
| elif weighted_score < -config["threshold"] and weighted_confidence > config["min_confidence"]: | |
| final_signal = "PUTS" | |
| position_size = min(0.5, (weighted_confidence / 100) * config["position_multiplier"]) | |
| else: | |
| final_signal = "HOLD" | |
| position_size = 0.0 | |
| config["hold_time"] = "Wait for better setup" | |
| # Build reasoning | |
| reasoning = [] | |
| reasoning.append(f"{strategy_mode.upper()} {timeframe}: {final_signal}") | |
| reasoning.append(f"Confidence: {weighted_confidence:.0f}%") | |
| if momentum_strategy != "WAIT": | |
| reasoning.append(f"Momentum: {momentum_strategy}") | |
| if abs(sentiment_composite) > 0.3: | |
| reasoning.append(f"Sentiment: {'Bullish' if sentiment_composite > 0 else 'Bearish'}") | |
| if tft_prediction and tft_prediction.get("gap_probability", 50) > 70: | |
| reasoning.append(f"Gap probability: {tft_prediction['gap_probability']:.0f}%") | |
| return { | |
| "signal": final_signal, | |
| "confidence": int(weighted_confidence), | |
| "reasoning": ". ".join(reasoning), | |
| "position_size": position_size, | |
| "timeframe": timeframe, | |
| "hold_time": config["hold_time"], | |
| "weighted_score": weighted_score, | |
| "strategy_mode": strategy_mode, | |
| "momentum_strategy": momentum_strategy | |
| } | |
| def _generate_options_strategy(master_signal: Dict, momentum_analysis: Dict, | |
| alt_data: Dict, timeframe: str = "15m", | |
| strategy_mode: str = "momentum") -> Dict: | |
| """Generate options strategy with timeframe awareness""" | |
| signal = master_signal["signal"] | |
| confidence = master_signal["confidence"] | |
| vix_level = alt_data.get("vix_level", 20) | |
| if signal == "HOLD": | |
| return { | |
| "strategy": "WAIT", | |
| "reasoning": "No clear directional bias", | |
| "contracts": [], | |
| "risk_management": "Wait for better setup" | |
| } | |
| # TIMEFRAME-SPECIFIC STRATEGIES | |
| if timeframe in ["1m", "5m"] and strategy_mode == "scalp": | |
| strategy = { | |
| "strategy": "0DTE_SCALP", | |
| "reasoning": f"{timeframe} scalp: {signal} with {confidence}% confidence", | |
| "contracts": [ | |
| { | |
| "type": "CALL" if signal == "CALLS" else "PUT", | |
| "strike": "ATM", | |
| "quantity": min(int(confidence / 8), 15), | |
| "dte": 0, | |
| "target_profit": 20, | |
| "stop_loss": 10 | |
| } | |
| ], | |
| "max_hold_time": f"{timeframe} bars (max 5 minutes)", | |
| "risk_management": "Ultra-tight stops, quick exits" | |
| } | |
| elif timeframe == "15m" and confidence > 70: | |
| strategy = { | |
| "strategy": "MOMENTUM_15M", | |
| "reasoning": f"15-minute momentum {signal} play, {confidence}% confidence", | |
| "contracts": [ | |
| { | |
| "type": "CALL" if signal == "CALLS" else "PUT", | |
| "strike": "1% ITM", | |
| "quantity": min(int(confidence / 12), 8), | |
| "dte": 1, | |
| "target_profit": 40, | |
| "stop_loss": 20 | |
| } | |
| ], | |
| "max_hold_time": "30 minutes", | |
| "risk_management": "Standard momentum stops" | |
| } | |
| elif timeframe == "1h": | |
| strategy = { | |
| "strategy": "HOURLY_SWING", | |
| "reasoning": f"Hourly swing {signal}, {confidence}% confidence", | |
| "contracts": [ | |
| { | |
| "type": "CALL_SPREAD" if signal == "CALLS" else "PUT_SPREAD", | |
| "long_strike": "ATM", | |
| "short_strike": "3% OTM", | |
| "quantity": min(int(confidence / 15), 5), | |
| "dte": 3, | |
| "target_profit": 35, | |
| "stop_loss": 25 | |
| } | |
| ], | |
| "max_hold_time": "2-4 hours", | |
| "risk_management": "Defined risk spreads" | |
| } | |
| else: | |
| strategy = { | |
| "strategy": "CONSERVATIVE", | |
| "reasoning": f"Lower conviction {signal}, using conservative approach", | |
| "contracts": [ | |
| { | |
| "type": "CALL_SPREAD" if signal == "CALLS" else "PUT_SPREAD", | |
| "long_strike": "ATM", | |
| "short_strike": "5% OTM", | |
| "quantity": 3, | |
| "dte": 7, | |
| "target_profit": 25, | |
| "stop_loss": 20 | |
| } | |
| ], | |
| "max_hold_time": "End of day", | |
| "risk_management": "Limited risk, defined reward" | |
| } | |
| # VIX adjustments | |
| if vix_level > 30: | |
| strategy["reasoning"] += f". High VIX ({vix_level}) - reduced size" | |
| for contract in strategy["contracts"]: | |
| contract["quantity"] = max(1, contract["quantity"] // 2) | |
| return strategy | |
| async def enhanced_backtest( | |
| symbol: str = Query(..., description="Stock symbol"), | |
| start_date: str = Query(..., description="Start date (YYYY-MM-DD)"), | |
| end_date: str = Query(..., description="End date (YYYY-MM-DD)"), | |
| strategy_mode: str = Query("momentum", description="Strategy mode"), | |
| initial_capital: float = Query(100000, description="Initial capital") | |
| ): | |
| """Enhanced backtesting with momentum strategies""" | |
| try: | |
| return { | |
| "status": "success", | |
| "message": "Enhanced backtesting ready", | |
| "features": [ | |
| "Multi-timeframe momentum analysis", | |
| "LLM ensemble signal validation", | |
| "Options strategy backtesting", | |
| "Risk-adjusted performance metrics", | |
| "Slippage and commission modeling" | |
| ] | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def detailed_health_check(): | |
| """Detailed system health check""" | |
| import torch | |
| health_status = { | |
| "timestamp": datetime.now().isoformat(), | |
| "overall_status": "healthy", | |
| "components": {} | |
| } | |
| # Check GPU | |
| if torch.cuda.is_available(): | |
| gpu_memory_used = torch.cuda.memory_allocated(0) / 1e9 | |
| gpu_memory_total = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| health_status["components"]["gpu"] = { | |
| "status": "available", | |
| "device": torch.cuda.get_device_name(0), | |
| "memory_used_gb": gpu_memory_used, | |
| "memory_total_gb": gpu_memory_total, | |
| "utilization": f"{gpu_memory_used/gpu_memory_total*100:.1f}%" | |
| } | |
| else: | |
| health_status["components"]["gpu"] = {"status": "not_available"} | |
| # Check model status | |
| health_status["components"]["sentiment_models"] = { | |
| "loaded": len(sentiment_analyzer.models), | |
| "status": "ready" if sentiment_analyzer.models else "not_loaded" | |
| } | |
| health_status["components"]["llm_models"] = { | |
| "loaded": len(llm_engine.models), | |
| "status": "ready" if llm_engine.models else "not_loaded" | |
| } | |
| health_status["components"]["tft_model"] = { | |
| "status": "trained" if tft_predictor.is_trained else "not_trained" | |
| } | |
| health_status["components"]["agent"] = { | |
| "status": "initialized" if trading_agent else "not_initialized" | |
| } | |
| return health_status | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |