Spaces:
Sleeping
Sleeping
| """ | |
| Decision Agent for synthesizing analysis into trading recommendations. | |
| This agent takes input from Indicator, Pattern, and Trend agents and produces | |
| a final trading decision with entry, target, and stop-loss levels. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from typing import Any, Dict, Literal, Optional | |
| import pandas as pd | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| from config.default_config import DEFAULT_CONFIG | |
| from config.models import AGENT_MODELS | |
| from graph.state.agent_state import ( | |
| add_agent_message, | |
| get_agent_messages, | |
| update_analysis_result, | |
| ) | |
| from graph.state.trading_state import TechnicalWorkflowState | |
| from utils.llm.provider_factory import LLMProviderFactory | |
| class DecisionAgent: | |
| """ | |
| Trading Decision Agent. | |
| Responsibilities: | |
| - Synthesize indicator, pattern, and trend analysis | |
| - Generate trading recommendation (strong buy/buy/hold/sell/strong sell) | |
| - Calculate entry price, target price, and stop-loss levels | |
| - Assess confidence and risk level | |
| - Provide clear rationale for the decision | |
| """ | |
| AGENT_NAME = "decision_agent" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """ | |
| Initialize Decision Agent. | |
| Args: | |
| config: Optional configuration override | |
| """ | |
| self.config = config or DEFAULT_CONFIG | |
| # Initialize LLM - use runtime provider override if available | |
| from config.models import DEFAULT_MODELS_BY_PROVIDER | |
| model_config = AGENT_MODELS[self.AGENT_NAME] | |
| runtime_provider = self.config.get("llm_provider", model_config["provider"]) | |
| # If provider is overridden but model is not, use default model for that provider | |
| if "llm_provider" in self.config and "llm_model" not in self.config: | |
| runtime_model = DEFAULT_MODELS_BY_PROVIDER.get( | |
| runtime_provider, model_config["model"] | |
| ) | |
| else: | |
| runtime_model = self.config.get("llm_model", model_config["model"]) | |
| self.llm = LLMProviderFactory.create( | |
| provider=runtime_provider, | |
| model=runtime_model, | |
| temperature=model_config["temperature"], | |
| ) | |
| def _get_timeframe_significance(self, timeframe: str) -> Dict[str, Any]: | |
| """ | |
| Get timeframe significance level for trading decisions. | |
| Args: | |
| timeframe: Timeframe string (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w) | |
| Returns: | |
| Dict with significance info | |
| """ | |
| timeframe_map = { | |
| "1m": { | |
| "weight": 0.3, | |
| "label": "1-minute", | |
| "scope": "scalping", | |
| "hold_duration": "seconds to minutes", | |
| }, | |
| "5m": { | |
| "weight": 0.4, | |
| "label": "5-minute", | |
| "scope": "scalping", | |
| "hold_duration": "minutes", | |
| }, | |
| "15m": { | |
| "weight": 0.5, | |
| "label": "15-minute", | |
| "scope": "day trading", | |
| "hold_duration": "minutes to hours", | |
| }, | |
| "30m": { | |
| "weight": 0.6, | |
| "label": "30-minute", | |
| "scope": "day trading", | |
| "hold_duration": "hours", | |
| }, | |
| "1h": { | |
| "weight": 0.7, | |
| "label": "1-hour", | |
| "scope": "swing trading", | |
| "hold_duration": "hours to days", | |
| }, | |
| "4h": { | |
| "weight": 0.8, | |
| "label": "4-hour", | |
| "scope": "swing trading", | |
| "hold_duration": "days", | |
| }, | |
| "1d": { | |
| "weight": 0.9, | |
| "label": "daily", | |
| "scope": "position trading", | |
| "hold_duration": "days to weeks", | |
| }, | |
| "1w": { | |
| "weight": 1.0, | |
| "label": "weekly", | |
| "scope": "long-term investment", | |
| "hold_duration": "quarters to years", | |
| }, | |
| } | |
| return timeframe_map.get( | |
| timeframe, | |
| { | |
| "weight": 0.5, | |
| "label": timeframe, | |
| "scope": "intraday", | |
| "hold_duration": "varies", | |
| }, | |
| ) | |
| def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState: | |
| """ | |
| Execute trading decision synthesis. | |
| Args: | |
| state: Current workflow state with analysis from previous agents | |
| Returns: | |
| Updated state with trading decision | |
| """ | |
| start_time = time.time() | |
| ticker = state.get("ticker", "UNKNOWN") | |
| timeframe = state.get("timeframe", "UNKNOWN") | |
| logger.info( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "start", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Extract all analysis data | |
| indicators = state.get("indicators", {}) | |
| patterns = state.get("patterns", {}) | |
| trends = state.get("trends", {}) | |
| market_data = state.get("market_data", {}) | |
| if not market_data.get("ohlc_data"): | |
| raise ValueError("No market data available for decision making") | |
| df = self._deserialize_dataframe(market_data["ohlc_data"]) | |
| current_price = float(df["close"].iloc[-1]) | |
| # Get messages from previous agents for context | |
| agent_messages = get_agent_messages(state) | |
| # Get timeframe significance for execution context | |
| timeframe_info = self._get_timeframe_significance(timeframe) | |
| # Extract investment style from state config if available | |
| investment_style = None | |
| if "config" in state: | |
| config_dict = state["config"] | |
| if isinstance(config_dict, dict): | |
| investment_style = config_dict.get("investment_style") | |
| if not investment_style: | |
| investment_style = state.get("investment_style") | |
| # Extract cost tracker from state | |
| cost_tracker = state.get("_cost_tracker") | |
| # Generate decision using LLM with timeframe and investment style context | |
| decision_result = self._make_decision( | |
| ticker=state["ticker"], | |
| timeframe=state["timeframe"], | |
| timeframe_info=timeframe_info, | |
| current_price=current_price, | |
| indicators=indicators, | |
| patterns=patterns, | |
| trends=trends, | |
| agent_messages=agent_messages, | |
| investment_style=investment_style, | |
| cost_tracker=cost_tracker, | |
| ) | |
| # Calculate price levels | |
| price_levels = self._calculate_price_levels( | |
| current_price=current_price, | |
| recommendation=decision_result["recommendation"], | |
| trends=trends, | |
| patterns=patterns, | |
| ) | |
| decision_result.update(price_levels) | |
| # Generate detailed rationale | |
| rationale = self._generate_rationale( | |
| state["ticker"], | |
| state["timeframe"], | |
| decision_result, | |
| indicators, | |
| patterns, | |
| trends, | |
| cost_tracker, | |
| ) | |
| decision_result["rationale"] = rationale | |
| # Update state | |
| new_state = update_analysis_result(state, "decision", decision_result) | |
| new_state = add_agent_message( | |
| new_state, | |
| self.AGENT_NAME, | |
| rationale, | |
| metadata={"decision": decision_result}, | |
| ) | |
| execution_time = time.time() - start_time | |
| logger.info( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "complete", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "execution_time": execution_time, | |
| "recommendation": decision_result.get("recommendation"), | |
| "confidence": decision_result.get("confidence"), | |
| "risk_level": decision_result.get("risk_level"), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return new_state | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| logger.error( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "error", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "execution_time": execution_time, | |
| "error": str(e), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Add error message to state | |
| error_state = add_agent_message( | |
| state, | |
| self.AGENT_NAME, | |
| f"Error making trading decision: {str(e)}", | |
| metadata={"error": True}, | |
| ) | |
| return error_state | |
| def _make_decision( | |
| self, | |
| ticker: str, | |
| timeframe: str, | |
| timeframe_info: Dict[str, Any], | |
| current_price: float, | |
| indicators: Dict[str, Any], | |
| patterns: Dict[str, Any], | |
| trends: Dict[str, Any], | |
| agent_messages: list, | |
| investment_style: Optional[str] = None, | |
| cost_tracker=None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Make trading decision using LLM synthesis with timeframe and investment style context. | |
| Args: | |
| ticker: Asset ticker | |
| timeframe: Analysis timeframe | |
| timeframe_info: Timeframe significance info | |
| current_price: Current price | |
| indicators: Indicator analysis | |
| patterns: Pattern analysis | |
| trends: Trend analysis | |
| agent_messages: Messages from previous agents | |
| investment_style: Investment style (long_term or swing_trading) | |
| Returns: | |
| Decision dict with recommendation, confidence, risk_level, and timeframe context | |
| """ | |
| # Build comprehensive summary | |
| summary_parts = [ | |
| f"TRADING DECISION ANALYSIS FOR {ticker}", | |
| f"Timeframe: {timeframe_info['label']} ({timeframe_info['scope']})", | |
| f"Expected Hold Duration: {timeframe_info['hold_duration']}", | |
| f"Timeframe Weight: {timeframe_info['weight']:.1f}", | |
| f"Current Price: ${current_price:.2f}", | |
| "", | |
| "=" * 50, | |
| "TECHNICAL INDICATORS", | |
| "=" * 50, | |
| ] | |
| # Indicators summary | |
| if indicators.get("rsi"): | |
| rsi = indicators["rsi"] | |
| if "value" in rsi: | |
| summary_parts.append( | |
| f"RSI: {rsi['value']:.2f} - {rsi.get('interpretation', 'N/A')}" | |
| ) | |
| if indicators.get("macd"): | |
| macd = indicators["macd"] | |
| if "macd" in macd: | |
| summary_parts.append( | |
| f"MACD: {macd['macd']:.4f} | Signal: {macd['signal']:.4f} | Histogram: {macd['histogram']:.4f}" | |
| ) | |
| summary_parts.append(f" {macd.get('interpretation', 'N/A')}") | |
| if indicators.get("stochastic"): | |
| stoch = indicators["stochastic"] | |
| if "k" in stoch: | |
| summary_parts.append( | |
| f"Stochastic: %K={stoch['k']:.2f}, %D={stoch['d']:.2f}" | |
| ) | |
| summary_parts.append(f" {stoch.get('interpretation', 'N/A')}") | |
| # Patterns summary | |
| summary_parts.extend( | |
| [ | |
| "", | |
| "=" * 50, | |
| "CHART PATTERNS", | |
| "=" * 50, | |
| ] | |
| ) | |
| if patterns.get("candlestick_patterns"): | |
| summary_parts.append("Candlestick Patterns:") | |
| for p in patterns["candlestick_patterns"][-3:]: # Last 3 patterns | |
| summary_parts.append( | |
| f" - {p['name']} ({p['signal']}, conf: {p['confidence']:.0%}): {p['description']}" | |
| ) | |
| if patterns.get("chart_patterns"): | |
| summary_parts.append("Chart Patterns:") | |
| for p in patterns["chart_patterns"]: | |
| summary_parts.append( | |
| f" - {p['type']} ({p['signal']}, conf: {p['confidence']:.0%}): {p['description']}" | |
| ) | |
| if patterns.get("support_levels") or patterns.get("resistance_levels"): | |
| summary_parts.append("") | |
| if patterns.get("support_levels"): | |
| summary_parts.append( | |
| f"Support Levels: {[f'${s:.2f}' for s in patterns['support_levels']]}" | |
| ) | |
| if patterns.get("resistance_levels"): | |
| summary_parts.append( | |
| f"Resistance Levels: {[f'${r:.2f}' for r in patterns['resistance_levels']]}" | |
| ) | |
| # Trend summary | |
| summary_parts.extend( | |
| [ | |
| "", | |
| "=" * 50, | |
| "TREND ANALYSIS", | |
| "=" * 50, | |
| ] | |
| ) | |
| if trends.get("overall_trend"): | |
| summary_parts.append(f"Overall Trend: {trends['overall_trend'].upper()}") | |
| summary_parts.append( | |
| f"Trend Strength: {trends.get('trend_strength', 0):.2f}/1.00" | |
| ) | |
| summary_parts.append( | |
| f"Trend Duration: {trends.get('trend_duration', 0)} periods" | |
| ) | |
| summary_parts.append( | |
| f"Momentum: {(trends.get('momentum') or 'unknown').upper()}" | |
| ) | |
| if trends.get("key_levels"): | |
| levels = trends["key_levels"] | |
| summary_parts.append("") | |
| if levels.get("sma_20"): | |
| summary_parts.append(f"SMA(20): ${levels['sma_20']:.2f}") | |
| if levels.get("sma_50"): | |
| summary_parts.append(f"SMA(50): ${levels['sma_50']:.2f}") | |
| analysis_summary = "\n".join(summary_parts) | |
| # Determine investment style context | |
| style_context = "" | |
| if investment_style == "long_term": | |
| style_context = """ | |
| INVESTMENT STYLE: Long-Term Investment | |
| - Focus: Fundamental strength, sustainable trends, quality over quick gains | |
| - Holding Period: Quarters to years | |
| - Priorities: Trend sustainability, support from fundamentals, lower risk tolerance | |
| - Avoid: Short-term noise, minor fluctuations, intraday volatility | |
| """ | |
| elif investment_style == "swing_trading": | |
| style_context = """ | |
| INVESTMENT STYLE: Swing Trading | |
| - Focus: Medium-term price swings, technical setups, momentum | |
| - Holding Period: Weeks to months | |
| - Priorities: Clear technical patterns, good risk/reward setups, momentum alignment | |
| - Consider: Both technical signals and fundamental catalysts | |
| """ | |
| else: | |
| style_context = """ | |
| INVESTMENT STYLE: General Analysis | |
| - Balanced approach considering both technical and fundamental factors | |
| """ | |
| # LLM prompt for decision | |
| system_prompt = f"""You are an expert trading decision maker. Your job is to synthesize technical analysis from multiple sources and make a clear trading recommendation. | |
| {style_context} | |
| You must output a JSON object with the following structure: | |
| {{ | |
| "recommendation": "strong_buy" | "buy" | "hold" | "sell" | "strong_sell", | |
| "confidence": 0.0 to 1.0, | |
| "risk_level": "low" | "medium" | "high" | |
| }} | |
| Consider: | |
| - Indicator signals and their agreement/disagreement | |
| - Pattern reliability and timeframe appropriateness | |
| - Trend strength and sustainability | |
| - Risk/reward potential | |
| - Timeframe scope (scalping vs swing vs position trading) | |
| - Expected hold duration for the timeframe | |
| - Investment style priorities and holding period | |
| Be decisive but realistic. A "hold" decision is valid when signals conflict. | |
| Adjust your recommendation to match both the timeframe scope AND the investment style - long-term investors need sustainable trends, swing traders need clear technical setups.""" | |
| user_prompt = f"""{analysis_summary} | |
| Based on this comprehensive technical analysis, provide your trading decision as a JSON object. | |
| IMPORTANT: Consider the timeframe scope ({timeframe_info["scope"]}) and expected hold duration ({timeframe_info["hold_duration"]}) when making your recommendation.""" | |
| # Call LLM | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=user_prompt), | |
| ] | |
| # Create callback if cost tracker is available | |
| if cost_tracker: | |
| callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) | |
| response = self.llm.invoke(messages, config={"callbacks": [callback]}) | |
| else: | |
| response = self.llm.invoke(messages) | |
| # Parse response (simplified - in production would use structured output) | |
| result = self._parse_decision_response(response.content) | |
| # Add timeframe context to the decision | |
| result["timeframe_context"] = { | |
| "timeframe": timeframe, | |
| "label": timeframe_info["label"], | |
| "scope": timeframe_info["scope"], | |
| "hold_duration": timeframe_info["hold_duration"], | |
| "weight": timeframe_info["weight"], | |
| } | |
| return result | |
| def _parse_decision_response(self, content: str) -> Dict[str, Any]: | |
| """ | |
| Parse LLM decision response. | |
| Args: | |
| content: LLM response content | |
| Returns: | |
| Parsed decision dict | |
| """ | |
| import json | |
| import re | |
| # Try to extract JSON from response | |
| json_match = re.search(r"\{[^}]+\}", content, re.DOTALL) | |
| if json_match: | |
| try: | |
| return json.loads(json_match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| # Fallback: parse text for keywords | |
| content_lower = content.lower() | |
| if "strong buy" in content_lower or "strong_buy" in content_lower: | |
| recommendation = "strong_buy" | |
| elif "strong sell" in content_lower or "strong_sell" in content_lower: | |
| recommendation = "strong_sell" | |
| elif "buy" in content_lower: | |
| recommendation = "buy" | |
| elif "sell" in content_lower: | |
| recommendation = "sell" | |
| else: | |
| recommendation = "hold" | |
| # Estimate confidence and risk | |
| if "high confidence" in content_lower: | |
| confidence = 0.8 | |
| elif "low confidence" in content_lower: | |
| confidence = 0.4 | |
| else: | |
| confidence = 0.6 | |
| if "high risk" in content_lower: | |
| risk_level = "high" | |
| elif "low risk" in content_lower: | |
| risk_level = "low" | |
| else: | |
| risk_level = "medium" | |
| return { | |
| "recommendation": recommendation, | |
| "confidence": confidence, | |
| "risk_level": risk_level, | |
| } | |
| def _calculate_price_levels( | |
| self, | |
| current_price: float, | |
| recommendation: str, | |
| trends: Dict[str, Any], | |
| patterns: Dict[str, Any], | |
| ) -> Dict[str, float]: | |
| """ | |
| Calculate entry, target, and stop-loss price levels. | |
| Args: | |
| current_price: Current market price | |
| recommendation: Trading recommendation | |
| trends: Trend analysis | |
| patterns: Pattern analysis | |
| Returns: | |
| Dict with entry_price, target_price, stop_loss, risk_reward_ratio | |
| """ | |
| # Entry price is typically current price for market orders | |
| entry_price = current_price | |
| # Get support and resistance levels | |
| support_levels = patterns.get("support_levels", []) | |
| resistance_levels = patterns.get("resistance_levels", []) | |
| # Calculate target and stop based on recommendation | |
| if recommendation in ["strong_buy", "buy"]: | |
| # Target: nearest resistance or 2-3% above entry | |
| if resistance_levels: | |
| target_price = min( | |
| [r for r in resistance_levels if r > current_price], | |
| default=current_price * 1.03, | |
| ) | |
| else: | |
| target_price = current_price * 1.03 | |
| # Stop loss: nearest support or 1-2% below entry | |
| if support_levels: | |
| stop_loss = max( | |
| [s for s in support_levels if s < current_price], | |
| default=current_price * 0.98, | |
| ) | |
| else: | |
| stop_loss = current_price * 0.98 | |
| elif recommendation in ["strong_sell", "sell"]: | |
| # Target: nearest support or 2-3% below entry | |
| if support_levels: | |
| target_price = max( | |
| [s for s in support_levels if s < current_price], | |
| default=current_price * 0.97, | |
| ) | |
| else: | |
| target_price = current_price * 0.97 | |
| # Stop loss: nearest resistance or 1-2% above entry | |
| if resistance_levels: | |
| stop_loss = min( | |
| [r for r in resistance_levels if r > current_price], | |
| default=current_price * 1.02, | |
| ) | |
| else: | |
| stop_loss = current_price * 1.02 | |
| else: # hold | |
| target_price = current_price | |
| stop_loss = current_price | |
| # Calculate risk/reward ratio | |
| if recommendation in ["strong_buy", "buy"]: | |
| risk = abs(entry_price - stop_loss) | |
| reward = abs(target_price - entry_price) | |
| elif recommendation in ["strong_sell", "sell"]: | |
| risk = abs(stop_loss - entry_price) | |
| reward = abs(entry_price - target_price) | |
| else: | |
| risk = 1 | |
| reward = 1 | |
| risk_reward_ratio = reward / risk if risk > 0 else 1.0 | |
| return { | |
| "entry_price": round(entry_price, 2), | |
| "target_price": round(target_price, 2), | |
| "stop_loss": round(stop_loss, 2), | |
| "risk_reward_ratio": round(risk_reward_ratio, 2), | |
| } | |
| def _generate_rationale( | |
| self, | |
| ticker: str, | |
| timeframe: str, | |
| decision: Dict[str, Any], | |
| indicators: Dict[str, Any], | |
| patterns: Dict[str, Any], | |
| trends: Dict[str, Any], | |
| cost_tracker=None, | |
| ) -> str: | |
| """ | |
| Generate detailed rationale for the trading decision. | |
| Args: | |
| ticker: Asset ticker | |
| timeframe: Analysis timeframe | |
| decision: Decision result | |
| indicators: Indicator analysis | |
| patterns: Pattern analysis | |
| trends: Trend analysis | |
| Returns: | |
| Detailed rationale string | |
| """ | |
| system_prompt = """You are an expert trading advisor. Provide a clear, concise rationale for a trading decision. | |
| Your rationale should: | |
| 1. Start with the recommendation and key price levels | |
| 2. Explain the main reasons supporting this decision (2-3 key points) | |
| 3. Note any contradicting signals or risks | |
| 4. End with a clear action statement | |
| Keep it under 200 words. Be professional and actionable.""" | |
| # Get timeframe context from decision | |
| timeframe_context = decision.get("timeframe_context", {}) | |
| timeframe_label = timeframe_context.get("label", timeframe) | |
| timeframe_scope = timeframe_context.get("scope", "trading") | |
| hold_duration = timeframe_context.get("hold_duration", "varies") | |
| user_prompt = f"""Provide a trading rationale for the following decision: | |
| Ticker: {ticker} | |
| Timeframe: {timeframe_label} ({timeframe_scope}) | |
| Expected Hold Duration: {hold_duration} | |
| Recommendation: {decision["recommendation"].replace("_", " ").upper()} | |
| Confidence: {decision["confidence"]:.0%} | |
| Risk Level: {decision["risk_level"].upper()} | |
| Price Levels: | |
| - Entry: ${decision.get("entry_price", 0):.2f} | |
| - Target: ${decision.get("target_price", 0):.2f} | |
| - Stop Loss: ${decision.get("stop_loss", 0):.2f} | |
| - Risk/Reward: {decision.get("risk_reward_ratio", 0):.2f} | |
| Key Technical Factors: | |
| - Trend: {(trends.get("overall_trend") or "unknown").upper()} (strength: {trends.get("trend_strength", 0):.2f}) | |
| - RSI: {indicators.get("rsi", {}).get("value", "N/A")} | |
| - MACD: {indicators.get("macd", {}).get("interpretation", "N/A")} | |
| - Patterns: {len(patterns.get("candlestick_patterns", []))} candlestick, {len(patterns.get("chart_patterns", []))} chart patterns | |
| Write a clear rationale for this decision appropriate for the {timeframe_scope} timeframe.""" | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=user_prompt), | |
| ] | |
| # Create callback if cost tracker is available | |
| if cost_tracker: | |
| callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) | |
| response = self.llm.invoke(messages, config={"callbacks": [callback]}) | |
| else: | |
| response = self.llm.invoke(messages) | |
| return response.content | |
| def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame: | |
| """ | |
| Convert serialized data back to DataFrame. | |
| Args: | |
| data: Serialized DataFrame data | |
| Returns: | |
| pandas DataFrame | |
| """ | |
| df = pd.DataFrame(data) | |
| if "Date" in df.columns: | |
| df["Date"] = pd.to_datetime(df["Date"]) | |
| df = df.set_index("Date") | |
| return df | |