""" Decision Agent for synthesizing analysis into trading recommendations. This agent takes input from Indicator, Pattern, and Trend agents and produces a final trading decision with entry, target, and stop-loss levels. """ import json import logging import time from typing import Any, Dict, Literal, Optional import pandas as pd from langchain_core.messages import HumanMessage, SystemMessage # Configure logger logger = logging.getLogger(__name__) from config.default_config import DEFAULT_CONFIG from config.models import AGENT_MODELS from graph.state.agent_state import ( add_agent_message, get_agent_messages, update_analysis_result, ) from graph.state.trading_state import TechnicalWorkflowState from utils.llm.provider_factory import LLMProviderFactory class DecisionAgent: """ Trading Decision Agent. Responsibilities: - Synthesize indicator, pattern, and trend analysis - Generate trading recommendation (strong buy/buy/hold/sell/strong sell) - Calculate entry price, target price, and stop-loss levels - Assess confidence and risk level - Provide clear rationale for the decision """ AGENT_NAME = "decision_agent" def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize Decision Agent. Args: config: Optional configuration override """ self.config = config or DEFAULT_CONFIG # Initialize LLM - use runtime provider override if available from config.models import DEFAULT_MODELS_BY_PROVIDER model_config = AGENT_MODELS[self.AGENT_NAME] runtime_provider = self.config.get("llm_provider", model_config["provider"]) # If provider is overridden but model is not, use default model for that provider if "llm_provider" in self.config and "llm_model" not in self.config: runtime_model = DEFAULT_MODELS_BY_PROVIDER.get( runtime_provider, model_config["model"] ) else: runtime_model = self.config.get("llm_model", model_config["model"]) self.llm = LLMProviderFactory.create( provider=runtime_provider, model=runtime_model, temperature=model_config["temperature"], ) def _get_timeframe_significance(self, timeframe: str) -> Dict[str, Any]: """ Get timeframe significance level for trading decisions. Args: timeframe: Timeframe string (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w) Returns: Dict with significance info """ timeframe_map = { "1m": { "weight": 0.3, "label": "1-minute", "scope": "scalping", "hold_duration": "seconds to minutes", }, "5m": { "weight": 0.4, "label": "5-minute", "scope": "scalping", "hold_duration": "minutes", }, "15m": { "weight": 0.5, "label": "15-minute", "scope": "day trading", "hold_duration": "minutes to hours", }, "30m": { "weight": 0.6, "label": "30-minute", "scope": "day trading", "hold_duration": "hours", }, "1h": { "weight": 0.7, "label": "1-hour", "scope": "swing trading", "hold_duration": "hours to days", }, "4h": { "weight": 0.8, "label": "4-hour", "scope": "swing trading", "hold_duration": "days", }, "1d": { "weight": 0.9, "label": "daily", "scope": "position trading", "hold_duration": "days to weeks", }, "1w": { "weight": 1.0, "label": "weekly", "scope": "long-term investment", "hold_duration": "quarters to years", }, } return timeframe_map.get( timeframe, { "weight": 0.5, "label": timeframe, "scope": "intraday", "hold_duration": "varies", }, ) def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState: """ Execute trading decision synthesis. Args: state: Current workflow state with analysis from previous agents Returns: Updated state with trading decision """ start_time = time.time() ticker = state.get("ticker", "UNKNOWN") timeframe = state.get("timeframe", "UNKNOWN") logger.info( json.dumps( { "agent": self.AGENT_NAME, "action": "start", "ticker": ticker, "timeframe": timeframe, "timestamp": time.time(), } ) ) try: # Extract all analysis data indicators = state.get("indicators", {}) patterns = state.get("patterns", {}) trends = state.get("trends", {}) market_data = state.get("market_data", {}) if not market_data.get("ohlc_data"): raise ValueError("No market data available for decision making") df = self._deserialize_dataframe(market_data["ohlc_data"]) current_price = float(df["close"].iloc[-1]) # Get messages from previous agents for context agent_messages = get_agent_messages(state) # Get timeframe significance for execution context timeframe_info = self._get_timeframe_significance(timeframe) # Extract investment style from state config if available investment_style = None if "config" in state: config_dict = state["config"] if isinstance(config_dict, dict): investment_style = config_dict.get("investment_style") if not investment_style: investment_style = state.get("investment_style") # Extract cost tracker from state cost_tracker = state.get("_cost_tracker") # Generate decision using LLM with timeframe and investment style context decision_result = self._make_decision( ticker=state["ticker"], timeframe=state["timeframe"], timeframe_info=timeframe_info, current_price=current_price, indicators=indicators, patterns=patterns, trends=trends, agent_messages=agent_messages, investment_style=investment_style, cost_tracker=cost_tracker, ) # Calculate price levels price_levels = self._calculate_price_levels( current_price=current_price, recommendation=decision_result["recommendation"], trends=trends, patterns=patterns, ) decision_result.update(price_levels) # Generate detailed rationale rationale = self._generate_rationale( state["ticker"], state["timeframe"], decision_result, indicators, patterns, trends, cost_tracker, ) decision_result["rationale"] = rationale # Update state new_state = update_analysis_result(state, "decision", decision_result) new_state = add_agent_message( new_state, self.AGENT_NAME, rationale, metadata={"decision": decision_result}, ) execution_time = time.time() - start_time logger.info( json.dumps( { "agent": self.AGENT_NAME, "action": "complete", "ticker": ticker, "timeframe": timeframe, "execution_time": execution_time, "recommendation": decision_result.get("recommendation"), "confidence": decision_result.get("confidence"), "risk_level": decision_result.get("risk_level"), "timestamp": time.time(), } ) ) return new_state except Exception as e: execution_time = time.time() - start_time logger.error( json.dumps( { "agent": self.AGENT_NAME, "action": "error", "ticker": ticker, "timeframe": timeframe, "execution_time": execution_time, "error": str(e), "timestamp": time.time(), } ) ) # Add error message to state error_state = add_agent_message( state, self.AGENT_NAME, f"Error making trading decision: {str(e)}", metadata={"error": True}, ) return error_state def _make_decision( self, ticker: str, timeframe: str, timeframe_info: Dict[str, Any], current_price: float, indicators: Dict[str, Any], patterns: Dict[str, Any], trends: Dict[str, Any], agent_messages: list, investment_style: Optional[str] = None, cost_tracker=None, ) -> Dict[str, Any]: """ Make trading decision using LLM synthesis with timeframe and investment style context. Args: ticker: Asset ticker timeframe: Analysis timeframe timeframe_info: Timeframe significance info current_price: Current price indicators: Indicator analysis patterns: Pattern analysis trends: Trend analysis agent_messages: Messages from previous agents investment_style: Investment style (long_term or swing_trading) Returns: Decision dict with recommendation, confidence, risk_level, and timeframe context """ # Build comprehensive summary summary_parts = [ f"TRADING DECISION ANALYSIS FOR {ticker}", f"Timeframe: {timeframe_info['label']} ({timeframe_info['scope']})", f"Expected Hold Duration: {timeframe_info['hold_duration']}", f"Timeframe Weight: {timeframe_info['weight']:.1f}", f"Current Price: ${current_price:.2f}", "", "=" * 50, "TECHNICAL INDICATORS", "=" * 50, ] # Indicators summary if indicators.get("rsi"): rsi = indicators["rsi"] if "value" in rsi: summary_parts.append( f"RSI: {rsi['value']:.2f} - {rsi.get('interpretation', 'N/A')}" ) if indicators.get("macd"): macd = indicators["macd"] if "macd" in macd: summary_parts.append( f"MACD: {macd['macd']:.4f} | Signal: {macd['signal']:.4f} | Histogram: {macd['histogram']:.4f}" ) summary_parts.append(f" {macd.get('interpretation', 'N/A')}") if indicators.get("stochastic"): stoch = indicators["stochastic"] if "k" in stoch: summary_parts.append( f"Stochastic: %K={stoch['k']:.2f}, %D={stoch['d']:.2f}" ) summary_parts.append(f" {stoch.get('interpretation', 'N/A')}") # Patterns summary summary_parts.extend( [ "", "=" * 50, "CHART PATTERNS", "=" * 50, ] ) if patterns.get("candlestick_patterns"): summary_parts.append("Candlestick Patterns:") for p in patterns["candlestick_patterns"][-3:]: # Last 3 patterns summary_parts.append( f" - {p['name']} ({p['signal']}, conf: {p['confidence']:.0%}): {p['description']}" ) if patterns.get("chart_patterns"): summary_parts.append("Chart Patterns:") for p in patterns["chart_patterns"]: summary_parts.append( f" - {p['type']} ({p['signal']}, conf: {p['confidence']:.0%}): {p['description']}" ) if patterns.get("support_levels") or patterns.get("resistance_levels"): summary_parts.append("") if patterns.get("support_levels"): summary_parts.append( f"Support Levels: {[f'${s:.2f}' for s in patterns['support_levels']]}" ) if patterns.get("resistance_levels"): summary_parts.append( f"Resistance Levels: {[f'${r:.2f}' for r in patterns['resistance_levels']]}" ) # Trend summary summary_parts.extend( [ "", "=" * 50, "TREND ANALYSIS", "=" * 50, ] ) if trends.get("overall_trend"): summary_parts.append(f"Overall Trend: {trends['overall_trend'].upper()}") summary_parts.append( f"Trend Strength: {trends.get('trend_strength', 0):.2f}/1.00" ) summary_parts.append( f"Trend Duration: {trends.get('trend_duration', 0)} periods" ) summary_parts.append( f"Momentum: {(trends.get('momentum') or 'unknown').upper()}" ) if trends.get("key_levels"): levels = trends["key_levels"] summary_parts.append("") if levels.get("sma_20"): summary_parts.append(f"SMA(20): ${levels['sma_20']:.2f}") if levels.get("sma_50"): summary_parts.append(f"SMA(50): ${levels['sma_50']:.2f}") analysis_summary = "\n".join(summary_parts) # Determine investment style context style_context = "" if investment_style == "long_term": style_context = """ INVESTMENT STYLE: Long-Term Investment - Focus: Fundamental strength, sustainable trends, quality over quick gains - Holding Period: Quarters to years - Priorities: Trend sustainability, support from fundamentals, lower risk tolerance - Avoid: Short-term noise, minor fluctuations, intraday volatility """ elif investment_style == "swing_trading": style_context = """ INVESTMENT STYLE: Swing Trading - Focus: Medium-term price swings, technical setups, momentum - Holding Period: Weeks to months - Priorities: Clear technical patterns, good risk/reward setups, momentum alignment - Consider: Both technical signals and fundamental catalysts """ else: style_context = """ INVESTMENT STYLE: General Analysis - Balanced approach considering both technical and fundamental factors """ # LLM prompt for decision system_prompt = f"""You are an expert trading decision maker. Your job is to synthesize technical analysis from multiple sources and make a clear trading recommendation. {style_context} You must output a JSON object with the following structure: {{ "recommendation": "strong_buy" | "buy" | "hold" | "sell" | "strong_sell", "confidence": 0.0 to 1.0, "risk_level": "low" | "medium" | "high" }} Consider: - Indicator signals and their agreement/disagreement - Pattern reliability and timeframe appropriateness - Trend strength and sustainability - Risk/reward potential - Timeframe scope (scalping vs swing vs position trading) - Expected hold duration for the timeframe - Investment style priorities and holding period Be decisive but realistic. A "hold" decision is valid when signals conflict. Adjust your recommendation to match both the timeframe scope AND the investment style - long-term investors need sustainable trends, swing traders need clear technical setups.""" user_prompt = f"""{analysis_summary} Based on this comprehensive technical analysis, provide your trading decision as a JSON object. IMPORTANT: Consider the timeframe scope ({timeframe_info["scope"]}) and expected hold duration ({timeframe_info["hold_duration"]}) when making your recommendation.""" # Call LLM messages = [ SystemMessage(content=system_prompt), HumanMessage(content=user_prompt), ] # Create callback if cost tracker is available if cost_tracker: callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) response = self.llm.invoke(messages, config={"callbacks": [callback]}) else: response = self.llm.invoke(messages) # Parse response (simplified - in production would use structured output) result = self._parse_decision_response(response.content) # Add timeframe context to the decision result["timeframe_context"] = { "timeframe": timeframe, "label": timeframe_info["label"], "scope": timeframe_info["scope"], "hold_duration": timeframe_info["hold_duration"], "weight": timeframe_info["weight"], } return result def _parse_decision_response(self, content: str) -> Dict[str, Any]: """ Parse LLM decision response. Args: content: LLM response content Returns: Parsed decision dict """ import json import re # Try to extract JSON from response json_match = re.search(r"\{[^}]+\}", content, re.DOTALL) if json_match: try: return json.loads(json_match.group()) except json.JSONDecodeError: pass # Fallback: parse text for keywords content_lower = content.lower() if "strong buy" in content_lower or "strong_buy" in content_lower: recommendation = "strong_buy" elif "strong sell" in content_lower or "strong_sell" in content_lower: recommendation = "strong_sell" elif "buy" in content_lower: recommendation = "buy" elif "sell" in content_lower: recommendation = "sell" else: recommendation = "hold" # Estimate confidence and risk if "high confidence" in content_lower: confidence = 0.8 elif "low confidence" in content_lower: confidence = 0.4 else: confidence = 0.6 if "high risk" in content_lower: risk_level = "high" elif "low risk" in content_lower: risk_level = "low" else: risk_level = "medium" return { "recommendation": recommendation, "confidence": confidence, "risk_level": risk_level, } def _calculate_price_levels( self, current_price: float, recommendation: str, trends: Dict[str, Any], patterns: Dict[str, Any], ) -> Dict[str, float]: """ Calculate entry, target, and stop-loss price levels. Args: current_price: Current market price recommendation: Trading recommendation trends: Trend analysis patterns: Pattern analysis Returns: Dict with entry_price, target_price, stop_loss, risk_reward_ratio """ # Entry price is typically current price for market orders entry_price = current_price # Get support and resistance levels support_levels = patterns.get("support_levels", []) resistance_levels = patterns.get("resistance_levels", []) # Calculate target and stop based on recommendation if recommendation in ["strong_buy", "buy"]: # Target: nearest resistance or 2-3% above entry if resistance_levels: target_price = min( [r for r in resistance_levels if r > current_price], default=current_price * 1.03, ) else: target_price = current_price * 1.03 # Stop loss: nearest support or 1-2% below entry if support_levels: stop_loss = max( [s for s in support_levels if s < current_price], default=current_price * 0.98, ) else: stop_loss = current_price * 0.98 elif recommendation in ["strong_sell", "sell"]: # Target: nearest support or 2-3% below entry if support_levels: target_price = max( [s for s in support_levels if s < current_price], default=current_price * 0.97, ) else: target_price = current_price * 0.97 # Stop loss: nearest resistance or 1-2% above entry if resistance_levels: stop_loss = min( [r for r in resistance_levels if r > current_price], default=current_price * 1.02, ) else: stop_loss = current_price * 1.02 else: # hold target_price = current_price stop_loss = current_price # Calculate risk/reward ratio if recommendation in ["strong_buy", "buy"]: risk = abs(entry_price - stop_loss) reward = abs(target_price - entry_price) elif recommendation in ["strong_sell", "sell"]: risk = abs(stop_loss - entry_price) reward = abs(entry_price - target_price) else: risk = 1 reward = 1 risk_reward_ratio = reward / risk if risk > 0 else 1.0 return { "entry_price": round(entry_price, 2), "target_price": round(target_price, 2), "stop_loss": round(stop_loss, 2), "risk_reward_ratio": round(risk_reward_ratio, 2), } def _generate_rationale( self, ticker: str, timeframe: str, decision: Dict[str, Any], indicators: Dict[str, Any], patterns: Dict[str, Any], trends: Dict[str, Any], cost_tracker=None, ) -> str: """ Generate detailed rationale for the trading decision. Args: ticker: Asset ticker timeframe: Analysis timeframe decision: Decision result indicators: Indicator analysis patterns: Pattern analysis trends: Trend analysis Returns: Detailed rationale string """ system_prompt = """You are an expert trading advisor. Provide a clear, concise rationale for a trading decision. Your rationale should: 1. Start with the recommendation and key price levels 2. Explain the main reasons supporting this decision (2-3 key points) 3. Note any contradicting signals or risks 4. End with a clear action statement Keep it under 200 words. Be professional and actionable.""" # Get timeframe context from decision timeframe_context = decision.get("timeframe_context", {}) timeframe_label = timeframe_context.get("label", timeframe) timeframe_scope = timeframe_context.get("scope", "trading") hold_duration = timeframe_context.get("hold_duration", "varies") user_prompt = f"""Provide a trading rationale for the following decision: Ticker: {ticker} Timeframe: {timeframe_label} ({timeframe_scope}) Expected Hold Duration: {hold_duration} Recommendation: {decision["recommendation"].replace("_", " ").upper()} Confidence: {decision["confidence"]:.0%} Risk Level: {decision["risk_level"].upper()} Price Levels: - Entry: ${decision.get("entry_price", 0):.2f} - Target: ${decision.get("target_price", 0):.2f} - Stop Loss: ${decision.get("stop_loss", 0):.2f} - Risk/Reward: {decision.get("risk_reward_ratio", 0):.2f} Key Technical Factors: - Trend: {(trends.get("overall_trend") or "unknown").upper()} (strength: {trends.get("trend_strength", 0):.2f}) - RSI: {indicators.get("rsi", {}).get("value", "N/A")} - MACD: {indicators.get("macd", {}).get("interpretation", "N/A")} - Patterns: {len(patterns.get("candlestick_patterns", []))} candlestick, {len(patterns.get("chart_patterns", []))} chart patterns Write a clear rationale for this decision appropriate for the {timeframe_scope} timeframe.""" messages = [ SystemMessage(content=system_prompt), HumanMessage(content=user_prompt), ] # Create callback if cost tracker is available if cost_tracker: callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) response = self.llm.invoke(messages, config={"callbacks": [callback]}) else: response = self.llm.invoke(messages) return response.content def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame: """ Convert serialized data back to DataFrame. Args: data: Serialized DataFrame data Returns: pandas DataFrame """ df = pd.DataFrame(data) if "Date" in df.columns: df["Date"] = pd.to_datetime(df["Date"]) df = df.set_index("Date") return df