Pradeep Rajan
Initial deployment of Zyon Traders Backend10
dc68ce6
"""
AI Engine Service
Handles AI-powered trading signal generation, market analysis, and ML models
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import asyncio
import random
from models.signals import (
SignalResponse, SignalType, SignalStrength, TimeFrame,
TechnicalIndicator, FundamentalMetric, SentimentIndicator,
AISignal, MarketAnalysisResponse, BacktestResult
)
logger = logging.getLogger(__name__)
class AIEngine:
"""Service for AI-powered trading analysis and signal generation"""
def __init__(self):
self.models = {
"technical_model": {"accuracy": 0.72, "confidence": 0.85},
"fundamental_model": {"accuracy": 0.68, "confidence": 0.78},
"sentiment_model": {"accuracy": 0.65, "confidence": 0.70},
"ensemble_model": {"accuracy": 0.76, "confidence": 0.82}
}
async def generate_trading_signals(
self,
market_data: List[Dict[str, Any]],
user_preferences: Optional[Dict[str, Any]] = None
) -> List[SignalResponse]:
"""Generate AI trading signals for given market data"""
try:
signals = []
for data in market_data:
signal = await self._generate_single_signal(data, user_preferences)
if signal:
signals.append(signal)
return signals
except Exception as e:
logger.error(f"Error generating trading signals: {e}")
raise
async def _generate_single_signal(
self,
market_data: Dict[str, Any],
user_preferences: Optional[Dict[str, Any]] = None
) -> Optional[SignalResponse]:
"""Generate a single trading signal"""
try:
symbol = market_data.get("symbol", "UNKNOWN")
current_price = market_data.get("ltp", 0)
# Technical Analysis
technical_indicators = self._analyze_technical_indicators(market_data)
# Fundamental Analysis
fundamental_metrics = self._analyze_fundamentals(market_data)
# Sentiment Analysis
sentiment_indicators = self._analyze_sentiment(symbol)
# AI Model Predictions
ai_predictions = self._run_ai_models(market_data)
# Combine all analyses to generate final signal
final_signal = self._combine_analyses(
technical_indicators, fundamental_metrics,
sentiment_indicators, ai_predictions
)
if not final_signal:
return None
# Calculate target and stop loss
target_price, stop_loss = self._calculate_price_targets(
current_price, final_signal["signal_type"], final_signal["strength"]
)
return SignalResponse(
signal_id=f"SIG_{symbol}_{int(datetime.utcnow().timestamp())}",
security_id=market_data.get("security_id", ""),
symbol=symbol,
exchange_segment=market_data.get("exchange_segment", "NSE_EQ"),
signal_type=final_signal["signal_type"],
signal_strength=final_signal["strength"],
confidence_score=final_signal["confidence"],
time_frame=TimeFrame.SHORT_TERM,
current_price=current_price,
target_price=target_price,
stop_loss=stop_loss,
technical_analysis=technical_indicators,
fundamental_analysis=fundamental_metrics,
sentiment_analysis=sentiment_indicators,
ai_analysis=ai_predictions,
risk_rating=self._calculate_risk_rating(final_signal["confidence"]),
expected_return=self._calculate_expected_return(current_price, target_price),
probability_of_success=final_signal["confidence"],
market_condition="bullish", # Mock
generated_at=datetime.utcnow(),
valid_until=datetime.utcnow() + timedelta(days=7),
summary=self._generate_signal_summary(final_signal, symbol),
reasoning=self._generate_reasoning(technical_indicators, fundamental_metrics),
key_catalysts=self._identify_key_catalysts(symbol)
)
except Exception as e:
logger.error(f"Error generating signal for {symbol}: {e}")
return None
def _analyze_technical_indicators(self, market_data: Dict[str, Any]) -> List[TechnicalIndicator]:
"""Analyze technical indicators"""
indicators = []
# Mock RSI analysis
rsi_value = random.uniform(30, 70)
rsi_signal = SignalType.BUY if rsi_value < 40 else SignalType.SELL if rsi_value > 60 else SignalType.HOLD
rsi_strength = SignalStrength.STRONG if abs(rsi_value - 50) > 20 else SignalStrength.MODERATE
indicators.append(TechnicalIndicator(
name="RSI",
value=rsi_value,
signal=rsi_signal,
strength=rsi_strength,
description=f"RSI at {rsi_value:.1f} indicates {'oversold' if rsi_value < 40 else 'overbought' if rsi_value > 60 else 'neutral'} conditions"
))
# Mock MACD analysis
macd_value = random.uniform(-5, 5)
macd_signal = SignalType.BUY if macd_value > 0 else SignalType.SELL
macd_strength = SignalStrength.STRONG if abs(macd_value) > 3 else SignalStrength.MODERATE
indicators.append(TechnicalIndicator(
name="MACD",
value=macd_value,
signal=macd_signal,
strength=macd_strength,
description=f"MACD at {macd_value:.2f} shows {'bullish' if macd_value > 0 else 'bearish'} momentum"
))
# Mock Moving Average analysis
ma_signal = random.choice([SignalType.BUY, SignalType.SELL, SignalType.HOLD])
ma_strength = random.choice([SignalStrength.WEAK, SignalStrength.MODERATE, SignalStrength.STRONG])
indicators.append(TechnicalIndicator(
name="SMA_50",
value=market_data.get("ltp", 0) * random.uniform(0.95, 1.05),
signal=ma_signal,
strength=ma_strength,
description="Price action relative to 50-day moving average"
))
return indicators
def _analyze_fundamentals(self, market_data: Dict[str, Any]) -> List[FundamentalMetric]:
"""Analyze fundamental metrics"""
metrics = []
# Mock P/E ratio analysis
pe_ratio = random.uniform(15, 35)
metrics.append(FundamentalMetric(
metric_name="P/E Ratio",
current_value=pe_ratio,
industry_average=22.5,
percentile_rank=random.uniform(0.2, 0.8),
trend="improving" if pe_ratio < 25 else "deteriorating",
impact="positive" if pe_ratio < 20 else "negative"
))
# Mock ROE analysis
roe = random.uniform(10, 25)
metrics.append(FundamentalMetric(
metric_name="ROE",
current_value=roe,
industry_average=18.0,
percentile_rank=random.uniform(0.3, 0.9),
trend="stable",
impact="positive" if roe > 15 else "neutral"
))
return metrics
def _analyze_sentiment(self, symbol: str) -> List[SentimentIndicator]:
"""Analyze market sentiment"""
indicators = []
# Mock news sentiment
news_score = random.uniform(-0.5, 0.8)
indicators.append(SentimentIndicator(
source="News Analysis",
score=news_score,
confidence=random.uniform(0.6, 0.9),
key_factors=["earnings report", "sector outlook", "market conditions"],
news_count=random.randint(10, 50)
))
# Mock social media sentiment
social_score = random.uniform(-0.3, 0.6)
indicators.append(SentimentIndicator(
source="Social Media",
score=social_score,
confidence=random.uniform(0.5, 0.8),
key_factors=["trader discussions", "volume alerts", "price action"],
news_count=random.randint(100, 500)
))
return indicators
def _run_ai_models(self, market_data: Dict[str, Any]) -> List[AISignal]:
"""Run AI/ML models for predictions"""
predictions = []
for model_name, model_info in self.models.items():
prediction = random.choice([SignalType.BUY, SignalType.SELL, SignalType.HOLD])
confidence = random.uniform(0.6, model_info["confidence"])
predictions.append(AISignal(
model_name=model_name,
prediction=prediction,
confidence_score=confidence,
feature_importance={
"price_momentum": random.uniform(0.1, 0.3),
"volume_profile": random.uniform(0.1, 0.3),
"market_correlation": random.uniform(0.1, 0.3),
"volatility": random.uniform(0.1, 0.3)
},
prediction_horizon="5-7 days"
))
return predictions
def _combine_analyses(
self,
technical: List[TechnicalIndicator],
fundamental: List[FundamentalMetric],
sentiment: List[SentimentIndicator],
ai_predictions: List[AISignal]
) -> Optional[Dict[str, Any]]:
"""Combine all analyses to generate final signal"""
# Weight different analysis types
weights = {
"technical": 0.4,
"fundamental": 0.3,
"sentiment": 0.1,
"ai": 0.2
}
# Calculate weighted scores for each signal type
buy_score = 0
sell_score = 0
hold_score = 0
# Technical indicators
for indicator in technical:
score = 1.0 if indicator.strength == SignalStrength.STRONG else 0.7 if indicator.strength == SignalStrength.MODERATE else 0.3
if indicator.signal == SignalType.BUY:
buy_score += score * weights["technical"] / len(technical)
elif indicator.signal == SignalType.SELL:
sell_score += score * weights["technical"] / len(technical)
else:
hold_score += score * weights["technical"] / len(technical)
# AI predictions
for prediction in ai_predictions:
score = prediction.confidence_score
if prediction.prediction == SignalType.BUY:
buy_score += score * weights["ai"] / len(ai_predictions)
elif prediction.prediction == SignalType.SELL:
sell_score += score * weights["ai"] / len(ai_predictions)
else:
hold_score += score * weights["ai"] / len(ai_predictions)
# Sentiment analysis
avg_sentiment = sum(s.score for s in sentiment) / len(sentiment) if sentiment else 0
if avg_sentiment > 0.2:
buy_score += weights["sentiment"]
elif avg_sentiment < -0.2:
sell_score += weights["sentiment"]
else:
hold_score += weights["sentiment"]
# Fundamental contribution (simplified)
buy_score += weights["fundamental"] * 0.5 # Mock positive fundamentals
# Determine final signal
max_score = max(buy_score, sell_score, hold_score)
if max_score < 0.4: # Threshold for signal generation
return None
if buy_score == max_score:
signal_type = SignalType.STRONG_BUY if max_score > 0.8 else SignalType.BUY
elif sell_score == max_score:
signal_type = SignalType.STRONG_SELL if max_score > 0.8 else SignalType.SELL
else:
signal_type = SignalType.HOLD
strength = SignalStrength.STRONG if max_score > 0.8 else SignalStrength.MODERATE if max_score > 0.6 else SignalStrength.WEAK
return {
"signal_type": signal_type,
"strength": strength,
"confidence": max_score
}
def _calculate_price_targets(self, current_price: float, signal_type: SignalType, strength: SignalStrength) -> tuple:
"""Calculate target price and stop loss"""
if signal_type in [SignalType.BUY, SignalType.STRONG_BUY]:
target_multiplier = 1.08 if strength == SignalStrength.STRONG else 1.05
stop_multiplier = 0.95 if strength == SignalStrength.STRONG else 0.97
target_price = current_price * target_multiplier
stop_loss = current_price * stop_multiplier
elif signal_type in [SignalType.SELL, SignalType.STRONG_SELL]:
target_multiplier = 0.92 if strength == SignalStrength.STRONG else 0.95
stop_multiplier = 1.05 if strength == SignalStrength.STRONG else 1.03
target_price = current_price * target_multiplier
stop_loss = current_price * stop_multiplier
else:
target_price = None
stop_loss = None
return target_price, stop_loss
def _calculate_risk_rating(self, confidence: float) -> str:
"""Calculate risk rating based on confidence"""
if confidence > 0.8:
return "Low"
elif confidence > 0.6:
return "Medium"
else:
return "High"
def _calculate_expected_return(self, current_price: float, target_price: Optional[float]) -> Optional[float]:
"""Calculate expected return percentage"""
if target_price and current_price:
return round(((target_price - current_price) / current_price) * 100, 2)
return None
def _generate_signal_summary(self, signal: Dict[str, Any], symbol: str) -> str:
"""Generate human-readable signal summary"""
signal_type = signal["signal_type"]
strength = signal["strength"]
if signal_type in [SignalType.BUY, SignalType.STRONG_BUY]:
return f"{strength.value.title()} BUY signal for {symbol} based on positive technical and AI analysis"
elif signal_type in [SignalType.SELL, SignalType.STRONG_SELL]:
return f"{strength.value.title()} SELL signal for {symbol} due to bearish indicators"
else:
return f"HOLD recommendation for {symbol} with mixed signals"
def _generate_reasoning(self, technical: List[TechnicalIndicator], fundamental: List[FundamentalMetric]) -> str:
"""Generate detailed reasoning"""
reasons = []
# Technical reasons
for indicator in technical:
if indicator.signal != SignalType.HOLD:
reasons.append(f"{indicator.name} shows {indicator.signal.value.lower()} signal")
# Fundamental reasons
for metric in fundamental:
if metric.impact == "positive":
reasons.append(f"{metric.metric_name} is favorable")
return ". ".join(reasons[:3]) + "."
def _identify_key_catalysts(self, symbol: str) -> List[str]:
"""Identify key catalysts for the signal"""
catalysts = [
"Strong quarterly earnings",
"Positive sector outlook",
"Technical breakout pattern",
"Improved fundamentals",
"Market momentum"
]
return random.sample(catalysts, random.randint(2, 4))
async def generate_market_analysis(self) -> MarketAnalysisResponse:
"""Generate comprehensive market analysis"""
try:
# This would typically involve complex market analysis
# For now, returning mock data
return MarketAnalysisResponse(
overall_market_sentiment="Bullish",
market_trend={
"trend_direction": "Upward",
"trend_strength": SignalStrength.MODERATE,
"support_levels": [21800, 21600, 21400],
"resistance_levels": [22200, 22400, 22600],
"trend_duration": 15
},
volatility_index=18.5,
fear_greed_index=65,
sector_analysis=[
{
"sector_name": "Information Technology",
"performance": 2.1,
"outlook": "Positive",
"key_drivers": ["Strong earnings", "Export growth"],
"top_stocks": ["TCS", "INFY", "WIPRO"],
"recommendation": SignalType.BUY
}
],
macro_factors=["GDP growth", "Inflation control", "FII inflows"],
market_events=["Q4 earnings season", "Budget announcements"],
generated_at=datetime.utcnow()
)
except Exception as e:
logger.error(f"Error generating market analysis: {e}")
raise
async def run_backtesting(self, symbols: List[str], period: str, strategy_config: Dict[str, Any]) -> BacktestResult:
"""Run backtesting on trading strategies"""
try:
# Mock backtesting results
total_signals = random.randint(50, 200)
successful_signals = int(total_signals * random.uniform(0.6, 0.8))
return BacktestResult(
start_date=datetime.utcnow() - timedelta(days=365),
end_date=datetime.utcnow(),
total_signals=total_signals,
successful_signals=successful_signals,
accuracy_rate=successful_signals / total_signals,
total_return=random.uniform(8, 25),
annualized_return=random.uniform(12, 18),
max_drawdown=random.uniform(8, 15),
sharpe_ratio=random.uniform(1.2, 2.1),
win_rate=random.uniform(60, 75),
average_win=random.uniform(3, 8),
average_loss=random.uniform(-2, -5),
profit_factor=random.uniform(1.5, 2.5)
)
except Exception as e:
logger.error(f"Error running backtesting: {e}")
raise
async def analyze_stock_sentiment(self, symbol: str) -> Dict[str, Any]:
"""Analyze sentiment for a specific stock"""
try:
# Mock sentiment analysis
sentiment_score = random.uniform(-0.5, 0.8)
if sentiment_score > 0.5:
label = "Very Positive"
elif sentiment_score > 0.2:
label = "Positive"
elif sentiment_score > -0.2:
label = "Neutral"
elif sentiment_score > -0.5:
label = "Negative"
else:
label = "Very Negative"
return {
"score": sentiment_score,
"label": label,
"confidence": random.uniform(0.7, 0.95),
"factors": ["earnings report", "analyst upgrades", "sector performance"],
"news_summary": f"Recent news about {symbol} has been generally {label.lower()}"
}
except Exception as e:
logger.error(f"Error analyzing sentiment for {symbol}: {e}")
raise
async def get_available_strategies(self) -> List[Dict[str, Any]]:
"""Get list of available AI trading strategies"""
return [
{
"id": "momentum_strategy",
"name": "Momentum Trading",
"description": "Identifies stocks with strong price momentum",
"accuracy": 72.5,
"risk_level": "Medium"
},
{
"id": "mean_reversion",
"name": "Mean Reversion",
"description": "Identifies oversold/overbought conditions",
"accuracy": 68.3,
"risk_level": "Low"
},
{
"id": "breakout_strategy",
"name": "Breakout Detection",
"description": "Identifies technical breakout patterns",
"accuracy": 75.1,
"risk_level": "Medium-High"
}
]
async def create_custom_strategy(self, user_id: str, config: Dict[str, Any]) -> str:
"""Create a custom AI trading strategy"""
try:
# In a real implementation, would train/configure the strategy
strategy_id = f"custom_{user_id}_{int(datetime.utcnow().timestamp())}"
# Mock strategy creation
logger.info(f"Created custom strategy {strategy_id} for user {user_id}")
return strategy_id
except Exception as e:
logger.error(f"Error creating custom strategy: {e}")
raise