Spaces:
Runtime error
Runtime error
| """ | |
| AI Engine Service | |
| Handles AI-powered trading signal generation, market analysis, and ML models | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import random | |
| from models.signals import ( | |
| SignalResponse, SignalType, SignalStrength, TimeFrame, | |
| TechnicalIndicator, FundamentalMetric, SentimentIndicator, | |
| AISignal, MarketAnalysisResponse, BacktestResult | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class AIEngine: | |
| """Service for AI-powered trading analysis and signal generation""" | |
| def __init__(self): | |
| self.models = { | |
| "technical_model": {"accuracy": 0.72, "confidence": 0.85}, | |
| "fundamental_model": {"accuracy": 0.68, "confidence": 0.78}, | |
| "sentiment_model": {"accuracy": 0.65, "confidence": 0.70}, | |
| "ensemble_model": {"accuracy": 0.76, "confidence": 0.82} | |
| } | |
| async def generate_trading_signals( | |
| self, | |
| market_data: List[Dict[str, Any]], | |
| user_preferences: Optional[Dict[str, Any]] = None | |
| ) -> List[SignalResponse]: | |
| """Generate AI trading signals for given market data""" | |
| try: | |
| signals = [] | |
| for data in market_data: | |
| signal = await self._generate_single_signal(data, user_preferences) | |
| if signal: | |
| signals.append(signal) | |
| return signals | |
| except Exception as e: | |
| logger.error(f"Error generating trading signals: {e}") | |
| raise | |
| async def _generate_single_signal( | |
| self, | |
| market_data: Dict[str, Any], | |
| user_preferences: Optional[Dict[str, Any]] = None | |
| ) -> Optional[SignalResponse]: | |
| """Generate a single trading signal""" | |
| try: | |
| symbol = market_data.get("symbol", "UNKNOWN") | |
| current_price = market_data.get("ltp", 0) | |
| # Technical Analysis | |
| technical_indicators = self._analyze_technical_indicators(market_data) | |
| # Fundamental Analysis | |
| fundamental_metrics = self._analyze_fundamentals(market_data) | |
| # Sentiment Analysis | |
| sentiment_indicators = self._analyze_sentiment(symbol) | |
| # AI Model Predictions | |
| ai_predictions = self._run_ai_models(market_data) | |
| # Combine all analyses to generate final signal | |
| final_signal = self._combine_analyses( | |
| technical_indicators, fundamental_metrics, | |
| sentiment_indicators, ai_predictions | |
| ) | |
| if not final_signal: | |
| return None | |
| # Calculate target and stop loss | |
| target_price, stop_loss = self._calculate_price_targets( | |
| current_price, final_signal["signal_type"], final_signal["strength"] | |
| ) | |
| return SignalResponse( | |
| signal_id=f"SIG_{symbol}_{int(datetime.utcnow().timestamp())}", | |
| security_id=market_data.get("security_id", ""), | |
| symbol=symbol, | |
| exchange_segment=market_data.get("exchange_segment", "NSE_EQ"), | |
| signal_type=final_signal["signal_type"], | |
| signal_strength=final_signal["strength"], | |
| confidence_score=final_signal["confidence"], | |
| time_frame=TimeFrame.SHORT_TERM, | |
| current_price=current_price, | |
| target_price=target_price, | |
| stop_loss=stop_loss, | |
| technical_analysis=technical_indicators, | |
| fundamental_analysis=fundamental_metrics, | |
| sentiment_analysis=sentiment_indicators, | |
| ai_analysis=ai_predictions, | |
| risk_rating=self._calculate_risk_rating(final_signal["confidence"]), | |
| expected_return=self._calculate_expected_return(current_price, target_price), | |
| probability_of_success=final_signal["confidence"], | |
| market_condition="bullish", # Mock | |
| generated_at=datetime.utcnow(), | |
| valid_until=datetime.utcnow() + timedelta(days=7), | |
| summary=self._generate_signal_summary(final_signal, symbol), | |
| reasoning=self._generate_reasoning(technical_indicators, fundamental_metrics), | |
| key_catalysts=self._identify_key_catalysts(symbol) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating signal for {symbol}: {e}") | |
| return None | |
| def _analyze_technical_indicators(self, market_data: Dict[str, Any]) -> List[TechnicalIndicator]: | |
| """Analyze technical indicators""" | |
| indicators = [] | |
| # Mock RSI analysis | |
| rsi_value = random.uniform(30, 70) | |
| rsi_signal = SignalType.BUY if rsi_value < 40 else SignalType.SELL if rsi_value > 60 else SignalType.HOLD | |
| rsi_strength = SignalStrength.STRONG if abs(rsi_value - 50) > 20 else SignalStrength.MODERATE | |
| indicators.append(TechnicalIndicator( | |
| name="RSI", | |
| value=rsi_value, | |
| signal=rsi_signal, | |
| strength=rsi_strength, | |
| description=f"RSI at {rsi_value:.1f} indicates {'oversold' if rsi_value < 40 else 'overbought' if rsi_value > 60 else 'neutral'} conditions" | |
| )) | |
| # Mock MACD analysis | |
| macd_value = random.uniform(-5, 5) | |
| macd_signal = SignalType.BUY if macd_value > 0 else SignalType.SELL | |
| macd_strength = SignalStrength.STRONG if abs(macd_value) > 3 else SignalStrength.MODERATE | |
| indicators.append(TechnicalIndicator( | |
| name="MACD", | |
| value=macd_value, | |
| signal=macd_signal, | |
| strength=macd_strength, | |
| description=f"MACD at {macd_value:.2f} shows {'bullish' if macd_value > 0 else 'bearish'} momentum" | |
| )) | |
| # Mock Moving Average analysis | |
| ma_signal = random.choice([SignalType.BUY, SignalType.SELL, SignalType.HOLD]) | |
| ma_strength = random.choice([SignalStrength.WEAK, SignalStrength.MODERATE, SignalStrength.STRONG]) | |
| indicators.append(TechnicalIndicator( | |
| name="SMA_50", | |
| value=market_data.get("ltp", 0) * random.uniform(0.95, 1.05), | |
| signal=ma_signal, | |
| strength=ma_strength, | |
| description="Price action relative to 50-day moving average" | |
| )) | |
| return indicators | |
| def _analyze_fundamentals(self, market_data: Dict[str, Any]) -> List[FundamentalMetric]: | |
| """Analyze fundamental metrics""" | |
| metrics = [] | |
| # Mock P/E ratio analysis | |
| pe_ratio = random.uniform(15, 35) | |
| metrics.append(FundamentalMetric( | |
| metric_name="P/E Ratio", | |
| current_value=pe_ratio, | |
| industry_average=22.5, | |
| percentile_rank=random.uniform(0.2, 0.8), | |
| trend="improving" if pe_ratio < 25 else "deteriorating", | |
| impact="positive" if pe_ratio < 20 else "negative" | |
| )) | |
| # Mock ROE analysis | |
| roe = random.uniform(10, 25) | |
| metrics.append(FundamentalMetric( | |
| metric_name="ROE", | |
| current_value=roe, | |
| industry_average=18.0, | |
| percentile_rank=random.uniform(0.3, 0.9), | |
| trend="stable", | |
| impact="positive" if roe > 15 else "neutral" | |
| )) | |
| return metrics | |
| def _analyze_sentiment(self, symbol: str) -> List[SentimentIndicator]: | |
| """Analyze market sentiment""" | |
| indicators = [] | |
| # Mock news sentiment | |
| news_score = random.uniform(-0.5, 0.8) | |
| indicators.append(SentimentIndicator( | |
| source="News Analysis", | |
| score=news_score, | |
| confidence=random.uniform(0.6, 0.9), | |
| key_factors=["earnings report", "sector outlook", "market conditions"], | |
| news_count=random.randint(10, 50) | |
| )) | |
| # Mock social media sentiment | |
| social_score = random.uniform(-0.3, 0.6) | |
| indicators.append(SentimentIndicator( | |
| source="Social Media", | |
| score=social_score, | |
| confidence=random.uniform(0.5, 0.8), | |
| key_factors=["trader discussions", "volume alerts", "price action"], | |
| news_count=random.randint(100, 500) | |
| )) | |
| return indicators | |
| def _run_ai_models(self, market_data: Dict[str, Any]) -> List[AISignal]: | |
| """Run AI/ML models for predictions""" | |
| predictions = [] | |
| for model_name, model_info in self.models.items(): | |
| prediction = random.choice([SignalType.BUY, SignalType.SELL, SignalType.HOLD]) | |
| confidence = random.uniform(0.6, model_info["confidence"]) | |
| predictions.append(AISignal( | |
| model_name=model_name, | |
| prediction=prediction, | |
| confidence_score=confidence, | |
| feature_importance={ | |
| "price_momentum": random.uniform(0.1, 0.3), | |
| "volume_profile": random.uniform(0.1, 0.3), | |
| "market_correlation": random.uniform(0.1, 0.3), | |
| "volatility": random.uniform(0.1, 0.3) | |
| }, | |
| prediction_horizon="5-7 days" | |
| )) | |
| return predictions | |
| def _combine_analyses( | |
| self, | |
| technical: List[TechnicalIndicator], | |
| fundamental: List[FundamentalMetric], | |
| sentiment: List[SentimentIndicator], | |
| ai_predictions: List[AISignal] | |
| ) -> Optional[Dict[str, Any]]: | |
| """Combine all analyses to generate final signal""" | |
| # Weight different analysis types | |
| weights = { | |
| "technical": 0.4, | |
| "fundamental": 0.3, | |
| "sentiment": 0.1, | |
| "ai": 0.2 | |
| } | |
| # Calculate weighted scores for each signal type | |
| buy_score = 0 | |
| sell_score = 0 | |
| hold_score = 0 | |
| # Technical indicators | |
| for indicator in technical: | |
| score = 1.0 if indicator.strength == SignalStrength.STRONG else 0.7 if indicator.strength == SignalStrength.MODERATE else 0.3 | |
| if indicator.signal == SignalType.BUY: | |
| buy_score += score * weights["technical"] / len(technical) | |
| elif indicator.signal == SignalType.SELL: | |
| sell_score += score * weights["technical"] / len(technical) | |
| else: | |
| hold_score += score * weights["technical"] / len(technical) | |
| # AI predictions | |
| for prediction in ai_predictions: | |
| score = prediction.confidence_score | |
| if prediction.prediction == SignalType.BUY: | |
| buy_score += score * weights["ai"] / len(ai_predictions) | |
| elif prediction.prediction == SignalType.SELL: | |
| sell_score += score * weights["ai"] / len(ai_predictions) | |
| else: | |
| hold_score += score * weights["ai"] / len(ai_predictions) | |
| # Sentiment analysis | |
| avg_sentiment = sum(s.score for s in sentiment) / len(sentiment) if sentiment else 0 | |
| if avg_sentiment > 0.2: | |
| buy_score += weights["sentiment"] | |
| elif avg_sentiment < -0.2: | |
| sell_score += weights["sentiment"] | |
| else: | |
| hold_score += weights["sentiment"] | |
| # Fundamental contribution (simplified) | |
| buy_score += weights["fundamental"] * 0.5 # Mock positive fundamentals | |
| # Determine final signal | |
| max_score = max(buy_score, sell_score, hold_score) | |
| if max_score < 0.4: # Threshold for signal generation | |
| return None | |
| if buy_score == max_score: | |
| signal_type = SignalType.STRONG_BUY if max_score > 0.8 else SignalType.BUY | |
| elif sell_score == max_score: | |
| signal_type = SignalType.STRONG_SELL if max_score > 0.8 else SignalType.SELL | |
| else: | |
| signal_type = SignalType.HOLD | |
| strength = SignalStrength.STRONG if max_score > 0.8 else SignalStrength.MODERATE if max_score > 0.6 else SignalStrength.WEAK | |
| return { | |
| "signal_type": signal_type, | |
| "strength": strength, | |
| "confidence": max_score | |
| } | |
| def _calculate_price_targets(self, current_price: float, signal_type: SignalType, strength: SignalStrength) -> tuple: | |
| """Calculate target price and stop loss""" | |
| if signal_type in [SignalType.BUY, SignalType.STRONG_BUY]: | |
| target_multiplier = 1.08 if strength == SignalStrength.STRONG else 1.05 | |
| stop_multiplier = 0.95 if strength == SignalStrength.STRONG else 0.97 | |
| target_price = current_price * target_multiplier | |
| stop_loss = current_price * stop_multiplier | |
| elif signal_type in [SignalType.SELL, SignalType.STRONG_SELL]: | |
| target_multiplier = 0.92 if strength == SignalStrength.STRONG else 0.95 | |
| stop_multiplier = 1.05 if strength == SignalStrength.STRONG else 1.03 | |
| target_price = current_price * target_multiplier | |
| stop_loss = current_price * stop_multiplier | |
| else: | |
| target_price = None | |
| stop_loss = None | |
| return target_price, stop_loss | |
| def _calculate_risk_rating(self, confidence: float) -> str: | |
| """Calculate risk rating based on confidence""" | |
| if confidence > 0.8: | |
| return "Low" | |
| elif confidence > 0.6: | |
| return "Medium" | |
| else: | |
| return "High" | |
| def _calculate_expected_return(self, current_price: float, target_price: Optional[float]) -> Optional[float]: | |
| """Calculate expected return percentage""" | |
| if target_price and current_price: | |
| return round(((target_price - current_price) / current_price) * 100, 2) | |
| return None | |
| def _generate_signal_summary(self, signal: Dict[str, Any], symbol: str) -> str: | |
| """Generate human-readable signal summary""" | |
| signal_type = signal["signal_type"] | |
| strength = signal["strength"] | |
| if signal_type in [SignalType.BUY, SignalType.STRONG_BUY]: | |
| return f"{strength.value.title()} BUY signal for {symbol} based on positive technical and AI analysis" | |
| elif signal_type in [SignalType.SELL, SignalType.STRONG_SELL]: | |
| return f"{strength.value.title()} SELL signal for {symbol} due to bearish indicators" | |
| else: | |
| return f"HOLD recommendation for {symbol} with mixed signals" | |
| def _generate_reasoning(self, technical: List[TechnicalIndicator], fundamental: List[FundamentalMetric]) -> str: | |
| """Generate detailed reasoning""" | |
| reasons = [] | |
| # Technical reasons | |
| for indicator in technical: | |
| if indicator.signal != SignalType.HOLD: | |
| reasons.append(f"{indicator.name} shows {indicator.signal.value.lower()} signal") | |
| # Fundamental reasons | |
| for metric in fundamental: | |
| if metric.impact == "positive": | |
| reasons.append(f"{metric.metric_name} is favorable") | |
| return ". ".join(reasons[:3]) + "." | |
| def _identify_key_catalysts(self, symbol: str) -> List[str]: | |
| """Identify key catalysts for the signal""" | |
| catalysts = [ | |
| "Strong quarterly earnings", | |
| "Positive sector outlook", | |
| "Technical breakout pattern", | |
| "Improved fundamentals", | |
| "Market momentum" | |
| ] | |
| return random.sample(catalysts, random.randint(2, 4)) | |
| async def generate_market_analysis(self) -> MarketAnalysisResponse: | |
| """Generate comprehensive market analysis""" | |
| try: | |
| # This would typically involve complex market analysis | |
| # For now, returning mock data | |
| return MarketAnalysisResponse( | |
| overall_market_sentiment="Bullish", | |
| market_trend={ | |
| "trend_direction": "Upward", | |
| "trend_strength": SignalStrength.MODERATE, | |
| "support_levels": [21800, 21600, 21400], | |
| "resistance_levels": [22200, 22400, 22600], | |
| "trend_duration": 15 | |
| }, | |
| volatility_index=18.5, | |
| fear_greed_index=65, | |
| sector_analysis=[ | |
| { | |
| "sector_name": "Information Technology", | |
| "performance": 2.1, | |
| "outlook": "Positive", | |
| "key_drivers": ["Strong earnings", "Export growth"], | |
| "top_stocks": ["TCS", "INFY", "WIPRO"], | |
| "recommendation": SignalType.BUY | |
| } | |
| ], | |
| macro_factors=["GDP growth", "Inflation control", "FII inflows"], | |
| market_events=["Q4 earnings season", "Budget announcements"], | |
| generated_at=datetime.utcnow() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating market analysis: {e}") | |
| raise | |
| async def run_backtesting(self, symbols: List[str], period: str, strategy_config: Dict[str, Any]) -> BacktestResult: | |
| """Run backtesting on trading strategies""" | |
| try: | |
| # Mock backtesting results | |
| total_signals = random.randint(50, 200) | |
| successful_signals = int(total_signals * random.uniform(0.6, 0.8)) | |
| return BacktestResult( | |
| start_date=datetime.utcnow() - timedelta(days=365), | |
| end_date=datetime.utcnow(), | |
| total_signals=total_signals, | |
| successful_signals=successful_signals, | |
| accuracy_rate=successful_signals / total_signals, | |
| total_return=random.uniform(8, 25), | |
| annualized_return=random.uniform(12, 18), | |
| max_drawdown=random.uniform(8, 15), | |
| sharpe_ratio=random.uniform(1.2, 2.1), | |
| win_rate=random.uniform(60, 75), | |
| average_win=random.uniform(3, 8), | |
| average_loss=random.uniform(-2, -5), | |
| profit_factor=random.uniform(1.5, 2.5) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error running backtesting: {e}") | |
| raise | |
| async def analyze_stock_sentiment(self, symbol: str) -> Dict[str, Any]: | |
| """Analyze sentiment for a specific stock""" | |
| try: | |
| # Mock sentiment analysis | |
| sentiment_score = random.uniform(-0.5, 0.8) | |
| if sentiment_score > 0.5: | |
| label = "Very Positive" | |
| elif sentiment_score > 0.2: | |
| label = "Positive" | |
| elif sentiment_score > -0.2: | |
| label = "Neutral" | |
| elif sentiment_score > -0.5: | |
| label = "Negative" | |
| else: | |
| label = "Very Negative" | |
| return { | |
| "score": sentiment_score, | |
| "label": label, | |
| "confidence": random.uniform(0.7, 0.95), | |
| "factors": ["earnings report", "analyst upgrades", "sector performance"], | |
| "news_summary": f"Recent news about {symbol} has been generally {label.lower()}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error analyzing sentiment for {symbol}: {e}") | |
| raise | |
| async def get_available_strategies(self) -> List[Dict[str, Any]]: | |
| """Get list of available AI trading strategies""" | |
| return [ | |
| { | |
| "id": "momentum_strategy", | |
| "name": "Momentum Trading", | |
| "description": "Identifies stocks with strong price momentum", | |
| "accuracy": 72.5, | |
| "risk_level": "Medium" | |
| }, | |
| { | |
| "id": "mean_reversion", | |
| "name": "Mean Reversion", | |
| "description": "Identifies oversold/overbought conditions", | |
| "accuracy": 68.3, | |
| "risk_level": "Low" | |
| }, | |
| { | |
| "id": "breakout_strategy", | |
| "name": "Breakout Detection", | |
| "description": "Identifies technical breakout patterns", | |
| "accuracy": 75.1, | |
| "risk_level": "Medium-High" | |
| } | |
| ] | |
| async def create_custom_strategy(self, user_id: str, config: Dict[str, Any]) -> str: | |
| """Create a custom AI trading strategy""" | |
| try: | |
| # In a real implementation, would train/configure the strategy | |
| strategy_id = f"custom_{user_id}_{int(datetime.utcnow().timestamp())}" | |
| # Mock strategy creation | |
| logger.info(f"Created custom strategy {strategy_id} for user {user_id}") | |
| return strategy_id | |
| except Exception as e: | |
| logger.error(f"Error creating custom strategy: {e}") | |
| raise | |