""" Conditional Comprehensive Stock Analysis Workflow This workflow extends ComprehensiveWorkflow with conditional phase execution. Users can selectively enable/disable 6 analysis phases: - Technical: Indicator, Pattern, Trend, Decision agents - Fundamental: Fundamentals, Sentiment analysis - Sentiment: News analysis and sentiment tracking - Research Synthesis: Technical Analyst (bridges technical + fundamental) - Risk: Risk Manager assessment - Decision: Portfolio Manager final decision The workflow uses PhaseConfiguration to control which phases execute, and tracks progress through EnhancedWorkflowState. """ import json import logging import time import traceback from typing import Any, Dict, List, Literal, Optional import pandas as pd from langgraph.graph import END, StateGraph from config.models import AnalysisPhase, InvestmentStyle, PhaseConfiguration from graph.state.trading_state import ( EnhancedWorkflowState, create_initial_enhanced_state, ) from graph.workflows.comprehensive_workflow import ComprehensiveWorkflow from utils.callbacks.cost_tracking_callback import WorkflowCostTracker from utils.markdown_validator import validate_agent_response from utils.phase_scoring import ( calculate_fundamental_phase_score, calculate_research_synthesis_phase_score, calculate_risk_phase_score, calculate_sentiment_phase_score, calculate_technical_phase_score, ) from utils.report_models import AgentOutput, AnalysisReport, PhaseOutput logger = logging.getLogger(__name__) class ConditionalComprehensiveWorkflow(ComprehensiveWorkflow): """ Conditional comprehensive workflow with phase-based execution. Extends ComprehensiveWorkflow to support selective phase execution based on user configuration. """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the conditional comprehensive workflow. Args: config: Optional configuration override """ super().__init__(config) budget_config = config.get("budget_config") if config else None self.cost_tracker = WorkflowCostTracker(budget_config=budget_config) self.workflow = self._build_conditional_workflow() def _validate_agent_output(self, content: str, agent_name: str) -> None: """ Validate agent output markdown structure and log results. Args: content: Agent response markdown content agent_name: Name of the agent for logging context """ try: result = validate_agent_response(content, agent_name, strict=False) if not result.is_valid: logger.warning( f"Agent {agent_name} response quality below threshold: " f"score={result.score:.1f}%, issues={len(result.issues)}" ) for issue in result.issues[:3]: # Log first 3 issues logger.warning(f" - {issue}") else: logger.debug( f"Agent {agent_name} response validated: score={result.score:.1f}%" ) except Exception as e: logger.warning(f"Failed to validate {agent_name} response: {e}") def _build_conditional_workflow(self) -> StateGraph: """Build the conditional LangGraph workflow with phase routing.""" workflow = StateGraph(EnhancedWorkflowState) # Data fetching node (always runs) workflow.add_node("fetch_data", self._fetch_all_data) # Phase-specific nodes workflow.add_node("technical_phase", self._run_technical_phase) workflow.add_node("fundamental_phase", self._run_fundamental_phase) workflow.add_node("sentiment_phase", self._run_sentiment_phase) workflow.add_node( "research_synthesis_phase", self._run_research_synthesis_phase ) workflow.add_node("risk_phase", self._run_risk_phase) workflow.add_node("decision_phase", self._run_decision_phase) # Chart generation node (always runs) workflow.add_node("generate_chart", self._generate_chart_enhanced) # Finalization node (always runs) workflow.add_node("finalize_report", self._finalize_report) # Define workflow edges with conditional routing workflow.set_entry_point("fetch_data") # After data fetch, route to appropriate phases workflow.add_conditional_edges( "fetch_data", self._phase_router, { "technical": "technical_phase", "fundamental": "fundamental_phase", "sentiment": "sentiment_phase", "research_synthesis": "research_synthesis_phase", "risk": "risk_phase", "decision": "decision_phase", "finalize": "generate_chart", }, ) # After each phase, route to next phase or generate chart for phase_node in [ "technical_phase", "fundamental_phase", "sentiment_phase", "research_synthesis_phase", "risk_phase", "decision_phase", ]: workflow.add_conditional_edges( phase_node, self._phase_router, { "technical": "technical_phase", "fundamental": "fundamental_phase", "sentiment": "sentiment_phase", "research_synthesis": "research_synthesis_phase", "risk": "risk_phase", "decision": "decision_phase", "finalize": "generate_chart", }, ) # After chart generation, go to finalization workflow.add_edge("generate_chart", "finalize_report") # End after finalization workflow.add_edge("finalize_report", END) return workflow.compile() def _check_budget_threshold( self, phase_name: str, state: EnhancedWorkflowState ) -> Optional[Dict[str, Any]]: """ Check if budget threshold has been exceeded after a phase. Args: phase_name: Name of the completed phase state: Current workflow state Returns: None if budget OK, or dict with budget alert info if threshold exceeded """ threshold_exceeded, alert_message, threshold_percent = ( self.cost_tracker.cost_tracker.check_budget_threshold() ) if threshold_exceeded: logger.warning( json.dumps( { "event": "budget_threshold_exceeded", "phase": phase_name, "threshold": threshold_percent, "total_cost": self.cost_tracker.cost_tracker.total_cost, "message": alert_message, } ) ) # Add alert to state messages return { "budget_alert": { "threshold": threshold_percent, "message": alert_message, "phase": phase_name, "exceeded": self.cost_tracker.cost_tracker.budget_exceeded, } } return None def _phase_router( self, state: EnhancedWorkflowState ) -> Literal[ "technical", "fundamental", "sentiment", "research_synthesis", "risk", "decision", "finalize", ]: """ Route to the next enabled phase that hasn't been completed yet. Args: state: Current workflow state Returns: Next phase to execute or "finalize" if all phases are complete """ enabled_phases = state.get("enabled_phases", []) completed_phases = state.get("completed_phases", []) # Check phase dependencies and determine next phase for phase in enabled_phases: if phase in completed_phases: continue # Check dependencies if phase == AnalysisPhase.RESEARCH_SYNTHESIS.value: # Research synthesis requires at least one of technical/fundamental/sentiment required = [ AnalysisPhase.TECHNICAL.value, AnalysisPhase.FUNDAMENTAL.value, AnalysisPhase.SENTIMENT.value, ] if not any(p in completed_phases for p in required): continue elif phase == AnalysisPhase.DECISION.value: # Decision requires research synthesis if AnalysisPhase.RESEARCH_SYNTHESIS.value not in completed_phases: # If research synthesis is not enabled, skip decision dependency check if AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases: continue # Return the phase to execute return phase # type: ignore # All phases complete, finalize return "finalize" def run( self, ticker: str, timeframe: str = "1w", phase_config: Optional[PhaseConfiguration] = None, user_query: Optional[str] = None, ) -> EnhancedWorkflowState: """ Run conditional comprehensive analysis workflow. Args: ticker: Stock ticker symbol timeframe: Analysis timeframe (default: 1w) phase_config: Phase configuration (default: all phases enabled) user_query: Optional user query for context Returns: Final state with all analysis results """ start_time = time.time() # Use default configuration if not provided if phase_config is None: phase_config = PhaseConfiguration() # Validate configuration validation_errors = phase_config.validate() if validation_errors: raise ValueError( f"Invalid phase configuration: {', '.join(validation_errors)}" ) logger.info( json.dumps( { "workflow": "conditional_comprehensive", "action": "start", "ticker": ticker, "timeframe": timeframe, "investment_style": phase_config.investment_style.value, "enabled_phases": [p.value for p in phase_config.enabled_phases], "timestamp": time.time(), } ) ) try: # Convert PhaseConfiguration to dict for state config_dict = { "investment_style": phase_config.investment_style.value, "enabled_phases": [p.value for p in phase_config.enabled_phases], "timeframe": phase_config.timeframe, "chart_period_days": phase_config.chart_period_days, "educational_mode": phase_config.educational_mode, } initial_state = create_initial_enhanced_state( ticker=ticker, timeframe=timeframe, config=config_dict, user_query=user_query or "", ) # Add cost tracker to state for agents to use initial_state["_cost_tracker"] = self.cost_tracker final_state = self.workflow.invoke(initial_state) # Add cost tracking summary to final state cost_summary = self.cost_tracker.get_summary() final_state["cost_summary"] = cost_summary execution_time = time.time() - start_time logger.info( json.dumps( { "workflow": "conditional_comprehensive", "action": "complete", "execution_time": execution_time, "ticker": ticker, "completed_phases": final_state.get("completed_phases", []), "total_cost": cost_summary.get("total_cost", 0.0), "total_tokens": cost_summary.get("total_tokens", 0), "timestamp": time.time(), } ) ) return final_state except Exception as e: logger.error( json.dumps( { "workflow": "conditional_comprehensive", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) raise def _fetch_all_data(self, state: EnhancedWorkflowState) -> dict: """ Fetch all required data (market, fundamental, news). This node always runs regardless of phase configuration. """ ticker = state["ticker"] timeframe = state.get("timeframe", "1w") logger.info( json.dumps( { "phase": "data_fetch", "action": "start", "ticker": ticker, "timestamp": time.time(), } ) ) try: # Fetch market data (always needed) market_data_result = self._fetch_market_data( {"ticker": ticker, "timeframe": timeframe} ) # Fetch fundamental data if fundamental or research synthesis is enabled enabled_phases = state.get("enabled_phases", []) fundamental_data = {} if ( AnalysisPhase.FUNDAMENTAL.value in enabled_phases or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases ): fundamental_result = self._fetch_fundamental_data({"ticker": ticker}) fundamental_data = fundamental_result.get("fundamental_data", {}) # Fetch news data if sentiment or research synthesis is enabled news_data = [] if ( AnalysisPhase.SENTIMENT.value in enabled_phases or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases ): news_result = self._fetch_news_data({"ticker": ticker}) news_data = news_result.get("news_data", []) logger.info( json.dumps( { "phase": "data_fetch", "action": "complete", "ticker": ticker, "timestamp": time.time(), } ) ) return { "market_data": market_data_result.get("market_data", pd.DataFrame()), "fundamental_data": fundamental_data, "news_data": news_data, } except Exception as e: logger.error( json.dumps( { "phase": "data_fetch", "action": "error", "ticker": ticker, "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return { "market_data": pd.DataFrame(), "fundamental_data": {}, "news_data": [], } def _run_technical_phase(self, state: EnhancedWorkflowState) -> dict: """Run technical analysis phase (Indicator, Pattern, Trend, Decision agents).""" phase_start = time.time() phase_name = AnalysisPhase.TECHNICAL.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run technical agents in sequence (respecting dependencies) indicator_result = self._run_indicator_agent(state) pattern_result = self._run_pattern_agent(state) trend_result = self._run_trend_agent(state) # Update state with technical results updated_state = { **state, "indicator_analysis": indicator_result.get("indicator_analysis", {}), "pattern_analysis": pattern_result.get("pattern_analysis", {}), "trend_analysis": trend_result.get("trend_analysis", {}), } # Run decision agent with all technical results decision_result = self._run_decision_agent(updated_state) # Create phase output - extract LLM-generated messages from agent results # Get the last message from each agent (the LLM response) def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: # Get the last message (most recent agent output) for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): content = msg.get("content", "No report generated") # Validate agent output self._validate_agent_output(content, agent_name) return content # If no matching agent message, return the last message content last_msg = messages[-1] if isinstance(last_msg, dict): content = last_msg.get("content", str(last_msg)) self._validate_agent_output(content, agent_name) return content return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} agent_outputs = [ AgentOutput( agent_name="indicator_agent", report=get_agent_report(indicator_result, "indicator_agent"), charts=get_agent_charts(indicator_result, "indicator_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(indicator_result, "indicator_agent"), ), AgentOutput( agent_name="pattern_agent", report=get_agent_report(pattern_result, "pattern_agent"), charts=get_agent_charts(pattern_result, "pattern_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(pattern_result, "pattern_agent"), ), AgentOutput( agent_name="trend_agent", report=get_agent_report(trend_result, "trend_agent"), charts=get_agent_charts(trend_result, "trend_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(trend_result, "trend_agent"), ), AgentOutput( agent_name="decision_agent", report=get_agent_report(decision_result, "decision_agent"), charts=get_agent_charts(decision_result, "decision_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(decision_result, "decision_agent"), ), ] # Calculate phase score phase_score = calculate_technical_phase_score(updated_state) phase_output = PhaseOutput( phase=AnalysisPhase.TECHNICAL, agents=agent_outputs, phase_summary="Technical analysis phase completed", execution_time_seconds=time.time() - phase_start, score=phase_score, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output # Performance logging with detailed metrics phase_execution_time = time.time() - phase_start logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": phase_execution_time, "agent_count": 4, # Indicator, Pattern, Trend, Decision "phase_score": phase_score, "timestamp": time.time(), } ) ) # Log individual agent performance for agent_output in agent_outputs: logger.debug( json.dumps( { "phase": phase_name, "agent": agent_output.agent_name, "execution_time": agent_output.execution_time_seconds, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "indicator_analysis": indicator_result.get("indicator_analysis", {}), "pattern_analysis": pattern_result.get("pattern_analysis", {}), "trend_analysis": trend_result.get("trend_analysis", {}), "decision_analysis": decision_result.get("decision_analysis", {}), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) # Create error phase output instead of failing completely error_output = PhaseOutput( phase=AnalysisPhase.TECHNICAL, agents=[ AgentOutput( agent_name="TechnicalPhaseError", report=f"⚠️ Technical phase encountered an error: {str(e)}", execution_time_seconds=time.time() - phase_start, ) ], score=0.0, execution_time_seconds=time.time() - phase_start, ) return {"current_phase": None, "phase_outputs": {phase_name: error_output}} def _run_fundamental_phase(self, state: EnhancedWorkflowState) -> dict: """Run fundamental analysis phase (Fundamentals agent only).""" phase_start = time.time() phase_name = AnalysisPhase.FUNDAMENTAL.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run fundamentals agent only (sentiment is in separate phase) fundamentals_result = self._run_fundamentals_agent(state) # Create phase output - extract LLM-generated messages def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): return msg.get("content", "No report generated") last_msg = messages[-1] if isinstance(last_msg, dict): return last_msg.get("content", str(last_msg)) return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} # Update state with fundamental results for scoring updated_state = { **state, "fundamentals_analysis": fundamentals_result.get( "fundamentals_analysis", {} ), } agent_outputs = [ AgentOutput( agent_name="fundamentals_agent", report=get_agent_report(fundamentals_result, "fundamentals_agent"), charts=get_agent_charts(fundamentals_result, "fundamentals_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata( fundamentals_result, "fundamentals_agent" ), ), ] # Calculate phase score phase_score = calculate_fundamental_phase_score(updated_state) phase_output = PhaseOutput( phase=AnalysisPhase.FUNDAMENTAL, agents=agent_outputs, phase_summary="Fundamental analysis phase completed", execution_time_seconds=time.time() - phase_start, score=phase_score, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": time.time() - phase_start, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "fundamentals_analysis": fundamentals_result.get( "fundamentals_analysis", {} ), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) # Create error phase output instead of failing completely error_output = PhaseOutput( phase=AnalysisPhase.FUNDAMENTAL, agents=[ AgentOutput( agent_name="FundamentalPhaseError", report=f"⚠️ Fundamental phase encountered an error: {str(e)}", execution_time_seconds=time.time() - phase_start, ) ], score=0.0, execution_time_seconds=time.time() - phase_start, ) return {"current_phase": None, "phase_outputs": {phase_name: error_output}} def _run_sentiment_phase(self, state: EnhancedWorkflowState) -> dict: """Run sentiment analysis phase (Sentiment and News agents).""" phase_start = time.time() phase_name = AnalysisPhase.SENTIMENT.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run sentiment and news agents in parallel sentiment_result = self._run_sentiment_agent(state) news_result = self._run_news_agent(state) # Create phase output - extract LLM-generated messages def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): return msg.get("content", "No report generated") last_msg = messages[-1] if isinstance(last_msg, dict): return last_msg.get("content", str(last_msg)) return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} # Update state with sentiment and news results for scoring updated_state = { **state, "sentiment_analysis": sentiment_result.get("sentiment_analysis", {}), "news_analysis": news_result.get("news_analysis", {}), } agent_outputs = [ AgentOutput( agent_name="sentiment_agent", report=get_agent_report(sentiment_result, "sentiment_agent"), charts=get_agent_charts(sentiment_result, "sentiment_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(sentiment_result, "sentiment_agent"), ), AgentOutput( agent_name="news_agent", report=get_agent_report(news_result, "news_agent"), charts=get_agent_charts(news_result, "news_agent"), execution_time_seconds=0.0, metadata=get_agent_metadata(news_result, "news_agent"), ), ] # Calculate phase score phase_score = calculate_sentiment_phase_score(updated_state) phase_output = PhaseOutput( phase=AnalysisPhase.SENTIMENT, agents=agent_outputs, phase_summary="Sentiment analysis phase completed", execution_time_seconds=time.time() - phase_start, score=phase_score, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": time.time() - phase_start, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "sentiment_analysis": sentiment_result.get("sentiment_analysis", {}), "news_analysis": news_result.get("news_analysis", {}), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"current_phase": None} def _run_research_synthesis_phase(self, state: EnhancedWorkflowState) -> dict: """Run research synthesis phase (Technical Analyst, Researcher Team).""" phase_start = time.time() phase_name = AnalysisPhase.RESEARCH_SYNTHESIS.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run technical analyst technical_analyst_result = self._run_technical_analyst(state) # Run researcher team researcher_result = self._run_researcher_team( {**state, **technical_analyst_result} ) # Create phase output - extract LLM-generated messages def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): return msg.get("content", "No report generated") last_msg = messages[-1] if isinstance(last_msg, dict): return last_msg.get("content", str(last_msg)) return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} # Update state with synthesis results for scoring updated_state = { **state, "technical_analyst": technical_analyst_result.get( "technical_analyst", {} ), "researcher_synthesis": researcher_result.get( "researcher_synthesis", {} ), } agent_outputs = [ AgentOutput( agent_name="technical_analyst", report=get_agent_report( technical_analyst_result, "technical_analyst" ), charts=get_agent_charts( technical_analyst_result, "technical_analyst" ), execution_time_seconds=0.0, metadata=get_agent_metadata( technical_analyst_result, "technical_analyst" ), ), AgentOutput( agent_name="researcher_team", report=get_agent_report(researcher_result, "researcher_team"), charts=get_agent_charts(researcher_result, "researcher_team"), execution_time_seconds=0.0, metadata=get_agent_metadata(researcher_result, "researcher_team"), ), ] # Calculate phase score phase_score = calculate_research_synthesis_phase_score(updated_state) phase_output = PhaseOutput( phase=AnalysisPhase.RESEARCH_SYNTHESIS, agents=agent_outputs, phase_summary="Research synthesis phase completed", execution_time_seconds=time.time() - phase_start, score=phase_score, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": time.time() - phase_start, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "technical_analyst": technical_analyst_result.get( "technical_analyst", {} ), "researcher_synthesis": researcher_result.get( "researcher_synthesis", {} ), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"current_phase": None} def _run_risk_phase(self, state: EnhancedWorkflowState) -> dict: """Run risk assessment phase (Risk Manager).""" phase_start = time.time() phase_name = AnalysisPhase.RISK.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run risk manager risk_result = self._run_risk_manager(state) # Create phase output - extract LLM-generated messages def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): return msg.get("content", "No report generated") last_msg = messages[-1] if isinstance(last_msg, dict): return last_msg.get("content", str(last_msg)) return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} # Update state with risk results for scoring updated_state = { **state, "risk_assessment": risk_result.get("risk_assessment", {}), } agent_outputs = [ AgentOutput( agent_name="risk_manager", report=get_agent_report(risk_result, "risk_manager"), charts=get_agent_charts(risk_result, "risk_manager"), execution_time_seconds=0.0, metadata=get_agent_metadata(risk_result, "risk_manager"), ), ] # Calculate phase score phase_score = calculate_risk_phase_score(updated_state) phase_output = PhaseOutput( phase=AnalysisPhase.RISK, agents=agent_outputs, phase_summary="Risk assessment phase completed", execution_time_seconds=time.time() - phase_start, score=phase_score, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": time.time() - phase_start, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "risk_assessment": risk_result.get("risk_assessment", {}), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"current_phase": None} def _run_decision_phase(self, state: EnhancedWorkflowState) -> dict: """Run final decision phase (Portfolio Manager).""" phase_start = time.time() phase_name = AnalysisPhase.DECISION.value logger.info( json.dumps( { "phase": phase_name, "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Run portfolio manager portfolio_result = self._run_portfolio_manager(state) # Create phase output - extract LLM-generated messages def get_agent_report(result: dict, agent_name: str) -> str: """Extract the last agent message content as the report.""" messages = result.get("messages", []) if messages: for msg in reversed(messages): if ( isinstance(msg, dict) and msg.get("agent_name") == agent_name ): return msg.get("content", "No report generated") last_msg = messages[-1] if isinstance(last_msg, dict): return last_msg.get("content", str(last_msg)) return str(last_msg) return "No report generated" def get_agent_charts(result: dict, agent_name: str) -> list: """Extract chart paths from agent metadata and convert to base64.""" import base64 from pathlib import Path messages = result.get("messages", []) if not messages: return [] for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: metadata = msg.get("metadata", {}) if not metadata: continue # Get chart paths chart_paths = metadata.get("charts") or metadata.get( "chart_paths", [] ) if not chart_paths: continue # Convert file paths to base64-encoded images base64_charts = [] for chart_path in chart_paths: try: chart_file = Path(chart_path) if chart_file.exists(): with open(chart_file, "rb") as f: chart_data = f.read() base64_str = base64.b64encode(chart_data).decode( "utf-8" ) base64_charts.append(base64_str) except Exception as e: logger.warning( f"Failed to encode chart {chart_path}: {str(e)}" ) continue return base64_charts return [] def get_agent_metadata(result: dict, agent_name: str) -> dict: """Extract metadata from agent messages.""" messages = result.get("messages", []) if not messages: return {} for msg in reversed(messages): if isinstance(msg, dict) and msg.get("agent_name") == agent_name: return msg.get("metadata", {}) return {} agent_outputs = [ AgentOutput( agent_name="portfolio_manager", report=get_agent_report(portfolio_result, "portfolio_manager"), charts=get_agent_charts(portfolio_result, "portfolio_manager"), execution_time_seconds=0.0, metadata=get_agent_metadata(portfolio_result, "portfolio_manager"), ), ] phase_output = PhaseOutput( phase=AnalysisPhase.DECISION, agents=agent_outputs, phase_summary="Decision phase completed", execution_time_seconds=time.time() - phase_start, ) # Update completed phases and phase outputs completed_phases = state.get("completed_phases", []) + [phase_name] phase_outputs = state.get("phase_outputs", {}) phase_outputs[phase_name] = phase_output logger.info( json.dumps( { "phase": phase_name, "action": "complete", "ticker": state["ticker"], "execution_time": time.time() - phase_start, "timestamp": time.time(), } ) ) # Check budget threshold after phase completion result = { "portfolio_decision": portfolio_result.get("portfolio_decision", {}), "final_decision": portfolio_result.get("portfolio_decision", {}) .get("decision", {}) .get("recommendation", "HOLD"), "completed_phases": completed_phases, "phase_outputs": phase_outputs, "current_phase": None, } budget_alert = self._check_budget_threshold(phase_name, state) if budget_alert: result.update(budget_alert) return result except Exception as e: logger.error( json.dumps( { "phase": phase_name, "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {"current_phase": None} def _generate_chart_enhanced(self, state: EnhancedWorkflowState) -> dict: """ Generate candlestick chart for the analysis. Args: state: Current enhanced workflow state Returns: Dict with chart_path """ try: market_data = state.get("market_data") if market_data is None or market_data.empty: logger.warning("No market data available for chart generation") return {"chart_path": ""} # Generate candlestick chart fig, chart_path = self.chart_generator.generate_candlestick_chart( df=market_data, ticker=state["ticker"], timeframe=state.get("timeframe", "1d"), title=f"{state['ticker']} - {state.get('timeframe', '1d')} Analysis", ) logger.info(f"Chart generated: {chart_path}") return {"chart_path": chart_path or ""} except Exception as e: logger.error(f"Chart generation failed: {str(e)}") return {"chart_path": ""} def _finalize_report(self, state: EnhancedWorkflowState) -> dict: """ Finalize the analysis report. Collects all phase outputs and creates the final AnalysisReport. """ logger.info( json.dumps( { "phase": "finalization", "action": "start", "ticker": state["ticker"], "timestamp": time.time(), } ) ) try: # Create AnalysisReport from phase outputs phase_outputs_dict = state.get("phase_outputs", {}) phase_outputs_list = [ phase_outputs_dict[phase] for phase in state.get("completed_phases", []) if phase in phase_outputs_dict ] config_dict = state.get("config", {}) investment_style = InvestmentStyle( config_dict.get("investment_style", InvestmentStyle.LONG_TERM.value) ) # Get cost summary from cost tracker if available cost_summary = None if self.cost_tracker: cost_summary = self.cost_tracker.get_summary() analysis_report = AnalysisReport( ticker=state["ticker"], timeframe=state.get("timeframe", "1w"), investment_style=investment_style, phases=phase_outputs_list, cost_summary=cost_summary, ) logger.info( json.dumps( { "phase": "finalization", "action": "complete", "ticker": state["ticker"], "phases_completed": len(phase_outputs_list), "timestamp": time.time(), } ) ) return { "analysis_report": analysis_report.to_dict(), "executive_summary": self._generate_executive_summary( state, analysis_report ), } except Exception as e: logger.error( json.dumps( { "phase": "finalization", "action": "error", "ticker": state["ticker"], "error": str(e), "traceback": traceback.format_exc(), "timestamp": time.time(), } ) ) return {} def _generate_executive_summary( self, state: EnhancedWorkflowState, report: AnalysisReport ) -> str: """ Generate executive summary from analysis report. Args: state: Current workflow state report: Completed analysis report Returns: Executive summary text """ summary_parts = [ f"# Executive Summary: {report.ticker}", f"**Timeframe**: {report.timeframe}", f"**Investment Style**: {report.investment_style.value.replace('_', ' ').title()}", f"**Phases Completed**: {len(report.phases)}", "", ] # Add final decision if available final_decision = state.get("final_decision") if final_decision: summary_parts.append(f"**Final Recommendation**: {final_decision}") summary_parts.append("") # Add phase summaries summary_parts.append("## Phase Summaries") for phase_output in report.phases: summary_parts.append( f"- **{phase_output.phase.value.replace('_', ' ').title()}**: " f"{phase_output.phase_summary} ({len(phase_output.agents)} agents)" ) return "\n".join(summary_parts)