""" Comprehensive Stock Analysis Workflow This workflow orchestrates all 11 agents for comprehensive stock research: - Technical Analysis: Indicator, Pattern, Trend, Decision agents - Fundamental Analysis: Fundamentals, Sentiment, News agents - Synthesis: Technical Analyst (bridges technical + fundamental) - Research Debate: Researcher Team (bull vs bear) - Risk Assessment: Risk Manager - Final Decision: Portfolio Manager The workflow uses LangGraph for parallel execution and state management. """ import json import logging import time import traceback from typing import Annotated, Any, Dict, Optional, TypedDict import pandas as pd from langgraph.graph import END, StateGraph from langgraph.graph.message import add_messages from agents.fundamental.fundamentals_agent import FundamentalsAgent from agents.fundamental.news_agent import NewsAgent from agents.fundamental.sentiment_agent import SentimentAgent from agents.fundamental.technical_analyst import TechnicalAnalystAgent from agents.management.portfolio_manager import PortfolioManagerAgent from agents.management.researcher_team import ResearcherTeamAgent from agents.management.risk_manager import RiskManagerAgent from agents.technical.decision_agent import DecisionAgent from agents.technical.indicator_agent import IndicatorAgent from agents.technical.pattern_agent import PatternAgent from agents.technical.trend_agent import TrendAgent from config.default_config import DEFAULT_CONFIG from data.providers.base import DataProvider from data.providers.yahoo_finance import YahooFinanceProvider from data.schemas.market_data import validate_ohlc from graph.state.trading_state import TechnicalWorkflowState from utils.charts.chart_generator import ChartGenerator logger = logging.getLogger(__name__) class ComprehensiveState(TypedDict, total=False): """State for comprehensive workflow.""" # Input ticker: str timeframe: str # Market data market_data: pd.DataFrame fundamental_data: Dict[str, Any] news_data: list # Technical analysis results (4 agents) indicator_analysis: Dict[str, Any] pattern_analysis: Dict[str, Any] trend_analysis: Dict[str, Any] decision_analysis: Dict[str, Any] # Fundamental analysis results (3 agents) fundamentals_analysis: Dict[str, Any] sentiment_analysis: Dict[str, Any] news_analysis: Dict[str, Any] # Synthesis results technical_analyst: Dict[str, Any] # Bridges technical + fundamental # Research debate results researcher_synthesis: Dict[str, Any] # Bull vs bear debate # Risk assessment results risk_assessment: Dict[str, Any] # Final decision portfolio_decision: Dict[str, Any] # Cross-validation contradictions: list # Output artifacts chart_path: str messages: Annotated[list, add_messages] # Use reducer to handle concurrent updates error: str | None class ComprehensiveWorkflow: """Comprehensive stock analysis workflow with 11 agents.""" def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the comprehensive workflow. Args: config: Optional configuration override """ self.config = config or DEFAULT_CONFIG self.data_provider = YahooFinanceProvider() self.chart_generator = ChartGenerator() # Technical agents - pass config for indicator parameters self.indicator_agent = IndicatorAgent(config=self.config) self.pattern_agent = PatternAgent(config=self.config) self.trend_agent = TrendAgent(config=self.config) self.decision_agent = DecisionAgent(config=self.config) # Fundamental agents - pass config for LLM settings self.fundamentals_agent = FundamentalsAgent(config=self.config) self.sentiment_agent = SentimentAgent(config=self.config) self.news_agent = NewsAgent(config=self.config) # Synthesis agent - pass config for runtime LLM overrides self.technical_analyst = TechnicalAnalystAgent(config=self.config) # Management agents - pass config for runtime LLM overrides self.researcher_team = ResearcherTeamAgent(config=self.config) self.risk_manager = RiskManagerAgent(config=self.config) self.portfolio_manager = PortfolioManagerAgent(config=self.config) self.workflow = self._build_workflow() def _build_workflow(self) -> StateGraph: """Build the LangGraph workflow.""" workflow = StateGraph(ComprehensiveState) # Data fetching nodes (parallel) workflow.add_node("fetch_market_data", self._fetch_market_data) workflow.add_node("fetch_fundamental_data", self._fetch_fundamental_data) workflow.add_node("fetch_news_data", self._fetch_news_data) # Technical analysis nodes (parallel) workflow.add_node("indicator_analysis", self._run_indicator_agent) workflow.add_node("pattern_analysis", self._run_pattern_agent) workflow.add_node("trend_analysis", self._run_trend_agent) workflow.add_node("decision_analysis", self._run_decision_agent) # Fundamental analysis nodes (parallel) workflow.add_node("fundamentals_analysis", self._run_fundamentals_agent) workflow.add_node("sentiment_analysis", self._run_sentiment_agent) workflow.add_node("news_analysis", self._run_news_agent) # Synthesis node (waits for all 7 analysis agents) workflow.add_node("technical_analyst", self._run_technical_analyst) # Research debate node workflow.add_node("researcher_debate", self._run_researcher_team) # Risk assessment node workflow.add_node("risk_assessment", self._run_risk_manager) # Final decision node workflow.add_node("portfolio_decision", self._run_portfolio_manager) # Cross-validation node workflow.add_node("cross_validation", self._detect_contradictions) # Chart generation node workflow.add_node("generate_chart", self._generate_chart) # Define workflow edges workflow.set_entry_point("fetch_market_data") # After market data, fetch fundamental and news in parallel workflow.add_edge("fetch_market_data", "fetch_fundamental_data") workflow.add_edge("fetch_market_data", "fetch_news_data") # After fundamental data, run fundamental agents in parallel workflow.add_edge("fetch_fundamental_data", "fundamentals_analysis") workflow.add_edge("fetch_fundamental_data", "sentiment_analysis") # After news data, run news agent workflow.add_edge("fetch_news_data", "news_analysis") # After market data, run technical agents in parallel workflow.add_edge("fetch_market_data", "indicator_analysis") workflow.add_edge("fetch_market_data", "pattern_analysis") workflow.add_edge("fetch_market_data", "trend_analysis") # Decision agent waits for indicator, pattern, trend workflow.add_edge("indicator_analysis", "decision_analysis") workflow.add_edge("pattern_analysis", "decision_analysis") workflow.add_edge("trend_analysis", "decision_analysis") # Technical analyst waits for all 7 analysis agents workflow.add_edge("decision_analysis", "technical_analyst") workflow.add_edge("fundamentals_analysis", "technical_analyst") workflow.add_edge("sentiment_analysis", "technical_analyst") workflow.add_edge("news_analysis", "technical_analyst") # Researcher debate waits for technical analyst workflow.add_edge("technical_analyst", "researcher_debate") # Risk manager waits for researcher debate workflow.add_edge("researcher_debate", "risk_assessment") # Portfolio manager waits for risk manager workflow.add_edge("risk_assessment", "portfolio_decision") # Cross-validation waits for portfolio decision workflow.add_edge("portfolio_decision", "cross_validation") # Chart generation waits for cross-validation workflow.add_edge("cross_validation", "generate_chart") # End after chart generation workflow.add_edge("generate_chart", END) return workflow.compile() def run(self, ticker: str, timeframe: str = "1y") -> ComprehensiveState: """ Run comprehensive analysis workflow. Args: ticker: Stock ticker symbol timeframe: Analysis timeframe (default: 1y) Returns: Final state with all analysis results """ start_time = time.time() logger.info( json.dumps( { "workflow": "comprehensive", "action": "start", "ticker": ticker, "timeframe": timeframe, "timestamp": time.time(), } ) ) try: initial_state: ComprehensiveState = { "ticker": ticker, "timeframe": timeframe, "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents "contradictions": [], } final_state = self.workflow.invoke(initial_state) execution_time = time.time() - start_time logger.info( json.dumps( { "workflow": "comprehensive", "action": "complete", "execution_time": execution_time, "ticker": ticker, "recommendation": final_state.get("portfolio_decision", {}) .get("decision", {}) .get("recommendation", "N/A"), "timestamp": time.time(), } ) ) return final_state except Exception as e: logger.error( json.dumps( { "workflow": "comprehensive", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) raise # Data fetching nodes def _fetch_market_data(self, state: ComprehensiveState) -> dict: """Fetch market data from Yahoo Finance.""" ticker = state["ticker"] timeframe = state["timeframe"] try: # Calculate date range based on timeframe from datetime import datetime, timedelta end_date = datetime.now() # Map timeframe to appropriate lookback period if timeframe in ["1m", "5m", "15m", "30m"]: start_date = end_date - timedelta(days=7) # 1 week for intraday elif timeframe in ["1h", "4h"]: start_date = end_date - timedelta(days=60) # 2 months for hourly elif timeframe == "1d": start_date = end_date - timedelta(days=365) # 1 year for daily elif timeframe == "1w": start_date = end_date - timedelta(days=730) # 2 years for weekly elif timeframe == "1mo": start_date = end_date - timedelta( days=1825 ) # 5 years for monthly (60 data points) elif timeframe == "3mo": start_date = end_date - timedelta( days=3650 ) # 10 years for quarterly (40 points) elif timeframe == "1y": start_date = end_date - timedelta(days=3650) # 10 years for yearly elif timeframe == "5y": start_date = end_date - timedelta(days=7300) # 20 years for 5-year else: start_date = end_date - timedelta(days=730) # Default 2 years df = self.data_provider.fetch_ohlc( ticker=ticker, timeframe=timeframe, start_date=start_date.strftime("%Y-%m-%d"), end_date=end_date.strftime("%Y-%m-%d"), ) df = validate_ohlc(df) return {"market_data": df} except Exception as e: logger.error( json.dumps( { "node": "fetch_market_data", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return { "error": f"Failed to fetch market data: {str(e)}", "market_data": pd.DataFrame(), } def _fetch_fundamental_data(self, state: ComprehensiveState) -> dict: """Fetch fundamental data from Yahoo Finance with asset-type awareness.""" ticker = state["ticker"] try: # Detect asset type asset_type = DataProvider.detect_asset_type(ticker) asset_characteristics = DataProvider.get_asset_characteristics(asset_type) logger.info( json.dumps( { "node": "fetch_fundamental_data", "action": "asset_type_detected", "ticker": ticker, "asset_type": asset_type, "has_fundamentals": asset_characteristics["has_fundamentals"], "timestamp": time.time(), } ) ) # Only fetch traditional fundamentals for stocks if asset_type == "stock" and asset_characteristics["has_fundamentals"]: fundamental_data = self.data_provider.fetch_fundamentals(ticker) logger.info( json.dumps( { "node": "fetch_fundamental_data", "action": "fetched_stock_fundamentals", "ticker": ticker, "timestamp": time.time(), } ) ) else: # For non-stock assets, create placeholder with asset type info fundamental_data = { "asset_type": asset_type, "has_traditional_fundamentals": False, "note": f"Traditional fundamental data not applicable for {asset_type} assets. " f"Analysis will focus on {', '.join(asset_characteristics['analysis_focus'][:3])}.", } logger.info( json.dumps( { "node": "fetch_fundamental_data", "action": "skipped_fundamentals", "ticker": ticker, "asset_type": asset_type, "reason": "Not applicable for this asset type", "timestamp": time.time(), } ) ) return {"fundamental_data": fundamental_data} except Exception as e: logger.error( json.dumps( { "node": "fetch_fundamental_data", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"fundamental_data": {}} def _fetch_news_data(self, state: ComprehensiveState) -> dict: """Fetch news data from Yahoo Finance.""" ticker = state["ticker"] try: news_data = self.data_provider.fetch_news(ticker) return {"news_data": news_data} except Exception as e: logger.error( json.dumps( { "node": "fetch_news_data", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"news_data": []} # Technical analysis nodes def _run_indicator_agent(self, state: ComprehensiveState) -> dict: """Run indicator analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "market_data": { "ohlc_data": state["market_data"].to_dict(orient="records") }, "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.indicator_agent.run(technical_state) result = result_state.get("indicators", {}) return { "indicator_analysis": result, "messages": result_state.get( "messages", [] ), # Pass through agent messages } except Exception as e: logger.error( json.dumps( { "node": "indicator_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"indicator_analysis": {}} def _run_pattern_agent(self, state: ComprehensiveState) -> dict: """Run pattern analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "market_data": { "ohlc_data": state["market_data"].to_dict(orient="records") }, "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.pattern_agent.run(technical_state) result = result_state.get("patterns", {}) return { "pattern_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "pattern_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"pattern_analysis": {}} def _run_trend_agent(self, state: ComprehensiveState) -> dict: """Run trend analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "market_data": { "ohlc_data": state["market_data"].to_dict(orient="records") }, "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.trend_agent.run(technical_state) result = result_state.get("trends", {}) return { "trend_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "trend_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"trend_analysis": {}} def _run_decision_agent(self, state: ComprehensiveState) -> dict: """Run decision analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "market_data": { "ohlc_data": state["market_data"].to_dict(orient="records") }, "indicators": state.get("indicator_analysis", {}), "patterns": state.get("pattern_analysis", {}), "trends": state.get("trend_analysis", {}), "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.decision_agent.run(technical_state) result = result_state.get("decision", {}) return { "decision_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "decision_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"decision_analysis": {}} # Fundamental analysis nodes def _run_fundamentals_agent(self, state: ComprehensiveState) -> dict: """Run fundamentals analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "fundamental_data": state.get("fundamental_data", {}), "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.fundamentals_agent.run(technical_state) result = result_state.get("fundamentals", {}) return { "fundamentals_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "fundamentals_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"fundamentals_analysis": {}} def _run_sentiment_agent(self, state: ComprehensiveState) -> dict: """Run sentiment analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "news_data": state.get("news_data", []), "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.sentiment_agent.run(technical_state) result = result_state.get("sentiment", {}) return { "sentiment_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "sentiment_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"sentiment_analysis": {}} def _run_news_agent(self, state: ComprehensiveState) -> dict: """Run news analysis.""" try: # Create technical workflow state for the agent technical_state: TechnicalWorkflowState = { "ticker": state["ticker"], "timeframe": state["timeframe"], "news_data": state.get("news_data", []), "messages": [], "_cost_tracker": state.get( "_cost_tracker" ), # Pass cost tracker to agents } result_state = self.news_agent.run(technical_state) result = result_state.get("news", {}) return { "news_analysis": result, "messages": result_state.get("messages", []), } except Exception as e: logger.error( json.dumps( { "node": "news_analysis", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"news_analysis": {}} # Synthesis node def _run_technical_analyst(self, state: ComprehensiveState) -> dict: """Run technical analyst synthesis.""" try: # Extract investment style if available investment_style = None if "config" in state: config = state["config"] if isinstance(config, dict): investment_style = config.get("investment_style") if not investment_style: investment_style = state.get("investment_style") result = self.technical_analyst.analyze( state["ticker"], state["timeframe"], state.get("indicator_analysis", {}), state.get("pattern_analysis", {}), state.get("trend_analysis", {}), state.get("fundamentals_analysis", {}), investment_style=investment_style, ) return { "technical_analyst": result, "messages": [result.get("assessment", "")], } except Exception as e: logger.error( json.dumps( { "node": "technical_analyst", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"technical_analyst": {}} # Research debate node def _run_researcher_team(self, state: ComprehensiveState) -> dict: """Run researcher team debate.""" try: # Extract investment style if available investment_style = None if "config" in state: config = state["config"] if isinstance(config, dict): investment_style = config.get("investment_style") if not investment_style: investment_style = state.get("investment_style") # Combine all analysis for debate all_analysis = { "indicator_analysis": state.get("indicator_analysis", {}), "pattern_analysis": state.get("pattern_analysis", {}), "trend_analysis": state.get("trend_analysis", {}), "decision_analysis": state.get("decision_analysis", {}), "fundamentals_analysis": state.get("fundamentals_analysis", {}), "sentiment_analysis": state.get("sentiment_analysis", {}), "news_analysis": state.get("news_analysis", {}), "technical_analyst": state.get("technical_analyst", {}), } result = self.researcher_team.analyze( state["ticker"], state["timeframe"], all_analysis, investment_style=investment_style, ) # Format the complete debate output as markdown debate_output = "# Research Team Investment Debate\n\n" # Bull Case bullish_case = result.get("bullish_case", {}) debate_output += "## 🐂 Bull Researcher\n\n" debate_output += ( bullish_case.get("argument", "No bullish case generated") + "\n\n" ) # Bear Case bearish_case = result.get("bearish_case", {}) debate_output += "## 🐻 Bear Researcher\n\n" debate_output += ( bearish_case.get("argument", "No bearish case generated") + "\n\n" ) # Synthesis synthesis = result.get("synthesis", {}) debate_output += "## ⚖️ Neutral Moderator - Balanced Assessment\n\n" debate_output += ( synthesis.get("synthesis", "No synthesis generated") + "\n\n" ) # Signal summary debate_output += "### Signal Summary\n\n" debate_output += ( f"- **Bullish Signals**: {bullish_case.get('signal_count', 0)}\n" ) debate_output += ( f"- **Bearish Signals**: {bearish_case.get('signal_count', 0)}\n" ) debate_output += f"- **Overall Lean**: {synthesis.get('overall_lean', 'neutral').upper()}\n" return { "researcher_synthesis": result, "messages": state.get("messages", []) + [ { "agent_name": "researcher_team", "content": debate_output, "metadata": { "bullish_case": bullish_case, "bearish_case": bearish_case, "synthesis": synthesis, }, } ], } except Exception as e: logger.error( json.dumps( { "node": "researcher_debate", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"researcher_synthesis": {}} # Risk assessment node def _run_risk_manager(self, state: ComprehensiveState) -> dict: """Run risk manager assessment.""" try: # Extract investment style if available investment_style = None if "config" in state: config = state["config"] if isinstance(config, dict): investment_style = config.get("investment_style") if not investment_style: investment_style = state.get("investment_style") all_analysis = { "indicator_analysis": state.get("indicator_analysis", {}), "pattern_analysis": state.get("pattern_analysis", {}), "trend_analysis": state.get("trend_analysis", {}), "decision_analysis": state.get("decision_analysis", {}), "fundamentals_analysis": state.get("fundamentals_analysis", {}), "sentiment_analysis": state.get("sentiment_analysis", {}), "news_analysis": state.get("news_analysis", {}), "technical_analyst": state.get("technical_analyst", {}), } result = self.risk_manager.analyze( state["ticker"], state["timeframe"], state.get("market_data", pd.DataFrame()), all_analysis, state.get("researcher_synthesis", {}), investment_style=investment_style, ) return { "risk_assessment": result, "messages": [result.get("assessment", "")], } except Exception as e: logger.error( json.dumps( { "node": "risk_assessment", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"risk_assessment": {}} # Final decision node def _run_portfolio_manager(self, state: ComprehensiveState) -> dict: """Run portfolio manager final decision.""" try: # Extract investment style if available investment_style = None if "config" in state: config = state["config"] if isinstance(config, dict): investment_style = config.get("investment_style") if not investment_style: investment_style = state.get("investment_style") all_analysis = { "indicator_analysis": state.get("indicator_analysis", {}), "pattern_analysis": state.get("pattern_analysis", {}), "trend_analysis": state.get("trend_analysis", {}), "decision_analysis": state.get("decision_analysis", {}), "fundamentals_analysis": state.get("fundamentals_analysis", {}), "sentiment_analysis": state.get("sentiment_analysis", {}), "news_analysis": state.get("news_analysis", {}), "technical_analyst": state.get("technical_analyst", {}), } result = self.portfolio_manager.analyze( state["ticker"], state["timeframe"], all_analysis, state.get("researcher_synthesis", {}), state.get("risk_assessment", {}), investment_style=investment_style, ) return { "portfolio_decision": result, "messages": [result.get("rationale", "")], } except Exception as e: logger.error( json.dumps( { "node": "portfolio_decision", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"portfolio_decision": {}} # Cross-validation node def _detect_contradictions(self, state: ComprehensiveState) -> dict: """Detect contradictions between agents.""" contradictions = [] # Check technical vs fundamental alignment technical_analyst = state.get("technical_analyst", {}) alignment = technical_analyst.get("alignment", {}) alignment_score = alignment.get("alignment_score", 1.0) if alignment_score < 0.5: contradictions.append( { "type": "technical_fundamental_divergence", "severity": "high" if alignment_score == 0 else "moderate", "description": f"Technical and fundamental analysis diverge (alignment: {alignment_score:.2f})", } ) # Check research debate balance researcher_synthesis = state.get("researcher_synthesis", {}) synthesis = researcher_synthesis.get("synthesis", {}) signal_ratio = synthesis.get("signal_ratio", 0.5) if 0.4 < signal_ratio < 0.6: contradictions.append( { "type": "mixed_signals", "severity": "moderate", "description": f"Mixed signals from research team (ratio: {signal_ratio:.2f})", } ) # Check recommendation vs risk portfolio_decision = state.get("portfolio_decision", {}) decision = portfolio_decision.get("decision", {}) recommendation = decision.get("recommendation", "hold") risk_assessment = state.get("risk_assessment", {}) risk_score = risk_assessment.get("risk_score", 50) if recommendation == "buy" and risk_score > 70: contradictions.append( { "type": "recommendation_risk_mismatch", "severity": "high", "description": f"Buy recommendation despite high risk score ({risk_score:.1f}/100)", } ) return {"contradictions": contradictions} # Chart generation node def _generate_chart(self, state: ComprehensiveState) -> dict: """Generate analysis chart.""" try: market_data = state.get("market_data", pd.DataFrame()) # Skip chart generation if market data is empty or missing if market_data.empty: logger.warning( json.dumps( { "node": "generate_chart", "action": "skipped", "ticker": state["ticker"], "reason": "market_data is empty", "timestamp": time.time(), } ) ) return {"chart_path": ""} # generate_candlestick_chart returns (Figure, filepath) tuple fig, chart_path = self.chart_generator.generate_candlestick_chart( df=market_data, ticker=state["ticker"], timeframe=state["timeframe"], title=f"{state['ticker']} - {state['timeframe']} Analysis", ) return {"chart_path": chart_path or ""} except Exception as e: logger.error( json.dumps( { "node": "generate_chart", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"chart_path": ""}