Spaces:
Sleeping
Sleeping
| """ | |
| Conditional Comprehensive Stock Analysis Workflow | |
| This workflow extends ComprehensiveWorkflow with conditional phase execution. | |
| Users can selectively enable/disable 6 analysis phases: | |
| - Technical: Indicator, Pattern, Trend, Decision agents | |
| - Fundamental: Fundamentals, Sentiment analysis | |
| - Sentiment: News analysis and sentiment tracking | |
| - Research Synthesis: Technical Analyst (bridges technical + fundamental) | |
| - Risk: Risk Manager assessment | |
| - Decision: Portfolio Manager final decision | |
| The workflow uses PhaseConfiguration to control which phases execute, | |
| and tracks progress through EnhancedWorkflowState. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| import traceback | |
| from typing import Any, Dict, List, Literal, Optional | |
| import pandas as pd | |
| from langgraph.graph import END, StateGraph | |
| from config.models import AnalysisPhase, InvestmentStyle, PhaseConfiguration | |
| from graph.state.trading_state import ( | |
| EnhancedWorkflowState, | |
| create_initial_enhanced_state, | |
| ) | |
| from graph.workflows.comprehensive_workflow import ComprehensiveWorkflow | |
| from utils.callbacks.cost_tracking_callback import WorkflowCostTracker | |
| from utils.markdown_validator import validate_agent_response | |
| from utils.phase_scoring import ( | |
| calculate_fundamental_phase_score, | |
| calculate_research_synthesis_phase_score, | |
| calculate_risk_phase_score, | |
| calculate_sentiment_phase_score, | |
| calculate_technical_phase_score, | |
| ) | |
| from utils.report_models import AgentOutput, AnalysisReport, PhaseOutput | |
| logger = logging.getLogger(__name__) | |
| class ConditionalComprehensiveWorkflow(ComprehensiveWorkflow): | |
| """ | |
| Conditional comprehensive workflow with phase-based execution. | |
| Extends ComprehensiveWorkflow to support selective phase execution | |
| based on user configuration. | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """ | |
| Initialize the conditional comprehensive workflow. | |
| Args: | |
| config: Optional configuration override | |
| """ | |
| super().__init__(config) | |
| budget_config = config.get("budget_config") if config else None | |
| self.cost_tracker = WorkflowCostTracker(budget_config=budget_config) | |
| self.workflow = self._build_conditional_workflow() | |
| def _validate_agent_output(self, content: str, agent_name: str) -> None: | |
| """ | |
| Validate agent output markdown structure and log results. | |
| Args: | |
| content: Agent response markdown content | |
| agent_name: Name of the agent for logging context | |
| """ | |
| try: | |
| result = validate_agent_response(content, agent_name, strict=False) | |
| if not result.is_valid: | |
| logger.warning( | |
| f"Agent {agent_name} response quality below threshold: " | |
| f"score={result.score:.1f}%, issues={len(result.issues)}" | |
| ) | |
| for issue in result.issues[:3]: # Log first 3 issues | |
| logger.warning(f" - {issue}") | |
| else: | |
| logger.debug( | |
| f"Agent {agent_name} response validated: score={result.score:.1f}%" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to validate {agent_name} response: {e}") | |
| def _build_conditional_workflow(self) -> StateGraph: | |
| """Build the conditional LangGraph workflow with phase routing.""" | |
| workflow = StateGraph(EnhancedWorkflowState) | |
| # Data fetching node (always runs) | |
| workflow.add_node("fetch_data", self._fetch_all_data) | |
| # Phase-specific nodes | |
| workflow.add_node("technical_phase", self._run_technical_phase) | |
| workflow.add_node("fundamental_phase", self._run_fundamental_phase) | |
| workflow.add_node("sentiment_phase", self._run_sentiment_phase) | |
| workflow.add_node( | |
| "research_synthesis_phase", self._run_research_synthesis_phase | |
| ) | |
| workflow.add_node("risk_phase", self._run_risk_phase) | |
| workflow.add_node("decision_phase", self._run_decision_phase) | |
| # Chart generation node (always runs) | |
| workflow.add_node("generate_chart", self._generate_chart_enhanced) | |
| # Finalization node (always runs) | |
| workflow.add_node("finalize_report", self._finalize_report) | |
| # Define workflow edges with conditional routing | |
| workflow.set_entry_point("fetch_data") | |
| # After data fetch, route to appropriate phases | |
| workflow.add_conditional_edges( | |
| "fetch_data", | |
| self._phase_router, | |
| { | |
| "technical": "technical_phase", | |
| "fundamental": "fundamental_phase", | |
| "sentiment": "sentiment_phase", | |
| "research_synthesis": "research_synthesis_phase", | |
| "risk": "risk_phase", | |
| "decision": "decision_phase", | |
| "finalize": "generate_chart", | |
| }, | |
| ) | |
| # After each phase, route to next phase or generate chart | |
| for phase_node in [ | |
| "technical_phase", | |
| "fundamental_phase", | |
| "sentiment_phase", | |
| "research_synthesis_phase", | |
| "risk_phase", | |
| "decision_phase", | |
| ]: | |
| workflow.add_conditional_edges( | |
| phase_node, | |
| self._phase_router, | |
| { | |
| "technical": "technical_phase", | |
| "fundamental": "fundamental_phase", | |
| "sentiment": "sentiment_phase", | |
| "research_synthesis": "research_synthesis_phase", | |
| "risk": "risk_phase", | |
| "decision": "decision_phase", | |
| "finalize": "generate_chart", | |
| }, | |
| ) | |
| # After chart generation, go to finalization | |
| workflow.add_edge("generate_chart", "finalize_report") | |
| # End after finalization | |
| workflow.add_edge("finalize_report", END) | |
| return workflow.compile() | |
| def _check_budget_threshold( | |
| self, phase_name: str, state: EnhancedWorkflowState | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Check if budget threshold has been exceeded after a phase. | |
| Args: | |
| phase_name: Name of the completed phase | |
| state: Current workflow state | |
| Returns: | |
| None if budget OK, or dict with budget alert info if threshold exceeded | |
| """ | |
| threshold_exceeded, alert_message, threshold_percent = ( | |
| self.cost_tracker.cost_tracker.check_budget_threshold() | |
| ) | |
| if threshold_exceeded: | |
| logger.warning( | |
| json.dumps( | |
| { | |
| "event": "budget_threshold_exceeded", | |
| "phase": phase_name, | |
| "threshold": threshold_percent, | |
| "total_cost": self.cost_tracker.cost_tracker.total_cost, | |
| "message": alert_message, | |
| } | |
| ) | |
| ) | |
| # Add alert to state messages | |
| return { | |
| "budget_alert": { | |
| "threshold": threshold_percent, | |
| "message": alert_message, | |
| "phase": phase_name, | |
| "exceeded": self.cost_tracker.cost_tracker.budget_exceeded, | |
| } | |
| } | |
| return None | |
| def _phase_router( | |
| self, state: EnhancedWorkflowState | |
| ) -> Literal[ | |
| "technical", | |
| "fundamental", | |
| "sentiment", | |
| "research_synthesis", | |
| "risk", | |
| "decision", | |
| "finalize", | |
| ]: | |
| """ | |
| Route to the next enabled phase that hasn't been completed yet. | |
| Args: | |
| state: Current workflow state | |
| Returns: | |
| Next phase to execute or "finalize" if all phases are complete | |
| """ | |
| enabled_phases = state.get("enabled_phases", []) | |
| completed_phases = state.get("completed_phases", []) | |
| # Check phase dependencies and determine next phase | |
| for phase in enabled_phases: | |
| if phase in completed_phases: | |
| continue | |
| # Check dependencies | |
| if phase == AnalysisPhase.RESEARCH_SYNTHESIS.value: | |
| # Research synthesis requires at least one of technical/fundamental/sentiment | |
| required = [ | |
| AnalysisPhase.TECHNICAL.value, | |
| AnalysisPhase.FUNDAMENTAL.value, | |
| AnalysisPhase.SENTIMENT.value, | |
| ] | |
| if not any(p in completed_phases for p in required): | |
| continue | |
| elif phase == AnalysisPhase.DECISION.value: | |
| # Decision requires research synthesis | |
| if AnalysisPhase.RESEARCH_SYNTHESIS.value not in completed_phases: | |
| # If research synthesis is not enabled, skip decision dependency check | |
| if AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases: | |
| continue | |
| # Return the phase to execute | |
| return phase # type: ignore | |
| # All phases complete, finalize | |
| return "finalize" | |
| def run( | |
| self, | |
| ticker: str, | |
| timeframe: str = "1w", | |
| phase_config: Optional[PhaseConfiguration] = None, | |
| user_query: Optional[str] = None, | |
| ) -> EnhancedWorkflowState: | |
| """ | |
| Run conditional comprehensive analysis workflow. | |
| Args: | |
| ticker: Stock ticker symbol | |
| timeframe: Analysis timeframe (default: 1w) | |
| phase_config: Phase configuration (default: all phases enabled) | |
| user_query: Optional user query for context | |
| Returns: | |
| Final state with all analysis results | |
| """ | |
| start_time = time.time() | |
| # Use default configuration if not provided | |
| if phase_config is None: | |
| phase_config = PhaseConfiguration() | |
| # Validate configuration | |
| validation_errors = phase_config.validate() | |
| if validation_errors: | |
| raise ValueError( | |
| f"Invalid phase configuration: {', '.join(validation_errors)}" | |
| ) | |
| logger.info( | |
| json.dumps( | |
| { | |
| "workflow": "conditional_comprehensive", | |
| "action": "start", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "investment_style": phase_config.investment_style.value, | |
| "enabled_phases": [p.value for p in phase_config.enabled_phases], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Convert PhaseConfiguration to dict for state | |
| config_dict = { | |
| "investment_style": phase_config.investment_style.value, | |
| "enabled_phases": [p.value for p in phase_config.enabled_phases], | |
| "timeframe": phase_config.timeframe, | |
| "chart_period_days": phase_config.chart_period_days, | |
| "educational_mode": phase_config.educational_mode, | |
| } | |
| initial_state = create_initial_enhanced_state( | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| config=config_dict, | |
| user_query=user_query or "", | |
| ) | |
| # Add cost tracker to state for agents to use | |
| initial_state["_cost_tracker"] = self.cost_tracker | |
| final_state = self.workflow.invoke(initial_state) | |
| # Add cost tracking summary to final state | |
| cost_summary = self.cost_tracker.get_summary() | |
| final_state["cost_summary"] = cost_summary | |
| execution_time = time.time() - start_time | |
| logger.info( | |
| json.dumps( | |
| { | |
| "workflow": "conditional_comprehensive", | |
| "action": "complete", | |
| "execution_time": execution_time, | |
| "ticker": ticker, | |
| "completed_phases": final_state.get("completed_phases", []), | |
| "total_cost": cost_summary.get("total_cost", 0.0), | |
| "total_tokens": cost_summary.get("total_tokens", 0), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return final_state | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "workflow": "conditional_comprehensive", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| raise | |
| def _fetch_all_data(self, state: EnhancedWorkflowState) -> dict: | |
| """ | |
| Fetch all required data (market, fundamental, news). | |
| This node always runs regardless of phase configuration. | |
| """ | |
| ticker = state["ticker"] | |
| timeframe = state.get("timeframe", "1w") | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": "data_fetch", | |
| "action": "start", | |
| "ticker": ticker, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Fetch market data (always needed) | |
| market_data_result = self._fetch_market_data( | |
| {"ticker": ticker, "timeframe": timeframe} | |
| ) | |
| # Fetch fundamental data if fundamental or research synthesis is enabled | |
| enabled_phases = state.get("enabled_phases", []) | |
| fundamental_data = {} | |
| if ( | |
| AnalysisPhase.FUNDAMENTAL.value in enabled_phases | |
| or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases | |
| ): | |
| fundamental_result = self._fetch_fundamental_data({"ticker": ticker}) | |
| fundamental_data = fundamental_result.get("fundamental_data", {}) | |
| # Fetch news data if sentiment or research synthesis is enabled | |
| news_data = [] | |
| if ( | |
| AnalysisPhase.SENTIMENT.value in enabled_phases | |
| or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases | |
| ): | |
| news_result = self._fetch_news_data({"ticker": ticker}) | |
| news_data = news_result.get("news_data", []) | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": "data_fetch", | |
| "action": "complete", | |
| "ticker": ticker, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return { | |
| "market_data": market_data_result.get("market_data", pd.DataFrame()), | |
| "fundamental_data": fundamental_data, | |
| "news_data": news_data, | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": "data_fetch", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return { | |
| "market_data": pd.DataFrame(), | |
| "fundamental_data": {}, | |
| "news_data": [], | |
| } | |
| def _run_technical_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run technical analysis phase (Indicator, Pattern, Trend, Decision agents).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.TECHNICAL.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run technical agents in sequence (respecting dependencies) | |
| indicator_result = self._run_indicator_agent(state) | |
| pattern_result = self._run_pattern_agent(state) | |
| trend_result = self._run_trend_agent(state) | |
| # Update state with technical results | |
| updated_state = { | |
| **state, | |
| "indicator_analysis": indicator_result.get("indicator_analysis", {}), | |
| "pattern_analysis": pattern_result.get("pattern_analysis", {}), | |
| "trend_analysis": trend_result.get("trend_analysis", {}), | |
| } | |
| # Run decision agent with all technical results | |
| decision_result = self._run_decision_agent(updated_state) | |
| # Create phase output - extract LLM-generated messages from agent results | |
| # Get the last message from each agent (the LLM response) | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| # Get the last message (most recent agent output) | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| content = msg.get("content", "No report generated") | |
| # Validate agent output | |
| self._validate_agent_output(content, agent_name) | |
| return content | |
| # If no matching agent message, return the last message content | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| content = last_msg.get("content", str(last_msg)) | |
| self._validate_agent_output(content, agent_name) | |
| return content | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="indicator_agent", | |
| report=get_agent_report(indicator_result, "indicator_agent"), | |
| charts=get_agent_charts(indicator_result, "indicator_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(indicator_result, "indicator_agent"), | |
| ), | |
| AgentOutput( | |
| agent_name="pattern_agent", | |
| report=get_agent_report(pattern_result, "pattern_agent"), | |
| charts=get_agent_charts(pattern_result, "pattern_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(pattern_result, "pattern_agent"), | |
| ), | |
| AgentOutput( | |
| agent_name="trend_agent", | |
| report=get_agent_report(trend_result, "trend_agent"), | |
| charts=get_agent_charts(trend_result, "trend_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(trend_result, "trend_agent"), | |
| ), | |
| AgentOutput( | |
| agent_name="decision_agent", | |
| report=get_agent_report(decision_result, "decision_agent"), | |
| charts=get_agent_charts(decision_result, "decision_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(decision_result, "decision_agent"), | |
| ), | |
| ] | |
| # Calculate phase score | |
| phase_score = calculate_technical_phase_score(updated_state) | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.TECHNICAL, | |
| agents=agent_outputs, | |
| phase_summary="Technical analysis phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| score=phase_score, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| # Performance logging with detailed metrics | |
| phase_execution_time = time.time() - phase_start | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": phase_execution_time, | |
| "agent_count": 4, # Indicator, Pattern, Trend, Decision | |
| "phase_score": phase_score, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Log individual agent performance | |
| for agent_output in agent_outputs: | |
| logger.debug( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "agent": agent_output.agent_name, | |
| "execution_time": agent_output.execution_time_seconds, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "indicator_analysis": indicator_result.get("indicator_analysis", {}), | |
| "pattern_analysis": pattern_result.get("pattern_analysis", {}), | |
| "trend_analysis": trend_result.get("trend_analysis", {}), | |
| "decision_analysis": decision_result.get("decision_analysis", {}), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Create error phase output instead of failing completely | |
| error_output = PhaseOutput( | |
| phase=AnalysisPhase.TECHNICAL, | |
| agents=[ | |
| AgentOutput( | |
| agent_name="TechnicalPhaseError", | |
| report=f"⚠️ Technical phase encountered an error: {str(e)}", | |
| execution_time_seconds=time.time() - phase_start, | |
| ) | |
| ], | |
| score=0.0, | |
| execution_time_seconds=time.time() - phase_start, | |
| ) | |
| return {"current_phase": None, "phase_outputs": {phase_name: error_output}} | |
| def _run_fundamental_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run fundamental analysis phase (Fundamentals agent only).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.FUNDAMENTAL.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run fundamentals agent only (sentiment is in separate phase) | |
| fundamentals_result = self._run_fundamentals_agent(state) | |
| # Create phase output - extract LLM-generated messages | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| return msg.get("content", "No report generated") | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| return last_msg.get("content", str(last_msg)) | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| # Update state with fundamental results for scoring | |
| updated_state = { | |
| **state, | |
| "fundamentals_analysis": fundamentals_result.get( | |
| "fundamentals_analysis", {} | |
| ), | |
| } | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="fundamentals_agent", | |
| report=get_agent_report(fundamentals_result, "fundamentals_agent"), | |
| charts=get_agent_charts(fundamentals_result, "fundamentals_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata( | |
| fundamentals_result, "fundamentals_agent" | |
| ), | |
| ), | |
| ] | |
| # Calculate phase score | |
| phase_score = calculate_fundamental_phase_score(updated_state) | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.FUNDAMENTAL, | |
| agents=agent_outputs, | |
| phase_summary="Fundamental analysis phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| score=phase_score, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": time.time() - phase_start, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "fundamentals_analysis": fundamentals_result.get( | |
| "fundamentals_analysis", {} | |
| ), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Create error phase output instead of failing completely | |
| error_output = PhaseOutput( | |
| phase=AnalysisPhase.FUNDAMENTAL, | |
| agents=[ | |
| AgentOutput( | |
| agent_name="FundamentalPhaseError", | |
| report=f"⚠️ Fundamental phase encountered an error: {str(e)}", | |
| execution_time_seconds=time.time() - phase_start, | |
| ) | |
| ], | |
| score=0.0, | |
| execution_time_seconds=time.time() - phase_start, | |
| ) | |
| return {"current_phase": None, "phase_outputs": {phase_name: error_output}} | |
| def _run_sentiment_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run sentiment analysis phase (Sentiment and News agents).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.SENTIMENT.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run sentiment and news agents in parallel | |
| sentiment_result = self._run_sentiment_agent(state) | |
| news_result = self._run_news_agent(state) | |
| # Create phase output - extract LLM-generated messages | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| return msg.get("content", "No report generated") | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| return last_msg.get("content", str(last_msg)) | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| # Update state with sentiment and news results for scoring | |
| updated_state = { | |
| **state, | |
| "sentiment_analysis": sentiment_result.get("sentiment_analysis", {}), | |
| "news_analysis": news_result.get("news_analysis", {}), | |
| } | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="sentiment_agent", | |
| report=get_agent_report(sentiment_result, "sentiment_agent"), | |
| charts=get_agent_charts(sentiment_result, "sentiment_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(sentiment_result, "sentiment_agent"), | |
| ), | |
| AgentOutput( | |
| agent_name="news_agent", | |
| report=get_agent_report(news_result, "news_agent"), | |
| charts=get_agent_charts(news_result, "news_agent"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(news_result, "news_agent"), | |
| ), | |
| ] | |
| # Calculate phase score | |
| phase_score = calculate_sentiment_phase_score(updated_state) | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.SENTIMENT, | |
| agents=agent_outputs, | |
| phase_summary="Sentiment analysis phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| score=phase_score, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": time.time() - phase_start, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "sentiment_analysis": sentiment_result.get("sentiment_analysis", {}), | |
| "news_analysis": news_result.get("news_analysis", {}), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"current_phase": None} | |
| def _run_research_synthesis_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run research synthesis phase (Technical Analyst, Researcher Team).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.RESEARCH_SYNTHESIS.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run technical analyst | |
| technical_analyst_result = self._run_technical_analyst(state) | |
| # Run researcher team | |
| researcher_result = self._run_researcher_team( | |
| {**state, **technical_analyst_result} | |
| ) | |
| # Create phase output - extract LLM-generated messages | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| return msg.get("content", "No report generated") | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| return last_msg.get("content", str(last_msg)) | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| # Update state with synthesis results for scoring | |
| updated_state = { | |
| **state, | |
| "technical_analyst": technical_analyst_result.get( | |
| "technical_analyst", {} | |
| ), | |
| "researcher_synthesis": researcher_result.get( | |
| "researcher_synthesis", {} | |
| ), | |
| } | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="technical_analyst", | |
| report=get_agent_report( | |
| technical_analyst_result, "technical_analyst" | |
| ), | |
| charts=get_agent_charts( | |
| technical_analyst_result, "technical_analyst" | |
| ), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata( | |
| technical_analyst_result, "technical_analyst" | |
| ), | |
| ), | |
| AgentOutput( | |
| agent_name="researcher_team", | |
| report=get_agent_report(researcher_result, "researcher_team"), | |
| charts=get_agent_charts(researcher_result, "researcher_team"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(researcher_result, "researcher_team"), | |
| ), | |
| ] | |
| # Calculate phase score | |
| phase_score = calculate_research_synthesis_phase_score(updated_state) | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.RESEARCH_SYNTHESIS, | |
| agents=agent_outputs, | |
| phase_summary="Research synthesis phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| score=phase_score, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": time.time() - phase_start, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "technical_analyst": technical_analyst_result.get( | |
| "technical_analyst", {} | |
| ), | |
| "researcher_synthesis": researcher_result.get( | |
| "researcher_synthesis", {} | |
| ), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"current_phase": None} | |
| def _run_risk_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run risk assessment phase (Risk Manager).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.RISK.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run risk manager | |
| risk_result = self._run_risk_manager(state) | |
| # Create phase output - extract LLM-generated messages | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| return msg.get("content", "No report generated") | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| return last_msg.get("content", str(last_msg)) | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| # Update state with risk results for scoring | |
| updated_state = { | |
| **state, | |
| "risk_assessment": risk_result.get("risk_assessment", {}), | |
| } | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="risk_manager", | |
| report=get_agent_report(risk_result, "risk_manager"), | |
| charts=get_agent_charts(risk_result, "risk_manager"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(risk_result, "risk_manager"), | |
| ), | |
| ] | |
| # Calculate phase score | |
| phase_score = calculate_risk_phase_score(updated_state) | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.RISK, | |
| agents=agent_outputs, | |
| phase_summary="Risk assessment phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| score=phase_score, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": time.time() - phase_start, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "risk_assessment": risk_result.get("risk_assessment", {}), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"current_phase": None} | |
| def _run_decision_phase(self, state: EnhancedWorkflowState) -> dict: | |
| """Run final decision phase (Portfolio Manager).""" | |
| phase_start = time.time() | |
| phase_name = AnalysisPhase.DECISION.value | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Run portfolio manager | |
| portfolio_result = self._run_portfolio_manager(state) | |
| # Create phase output - extract LLM-generated messages | |
| def get_agent_report(result: dict, agent_name: str) -> str: | |
| """Extract the last agent message content as the report.""" | |
| messages = result.get("messages", []) | |
| if messages: | |
| for msg in reversed(messages): | |
| if ( | |
| isinstance(msg, dict) | |
| and msg.get("agent_name") == agent_name | |
| ): | |
| return msg.get("content", "No report generated") | |
| last_msg = messages[-1] | |
| if isinstance(last_msg, dict): | |
| return last_msg.get("content", str(last_msg)) | |
| return str(last_msg) | |
| return "No report generated" | |
| def get_agent_charts(result: dict, agent_name: str) -> list: | |
| """Extract chart paths from agent metadata and convert to base64.""" | |
| import base64 | |
| from pathlib import Path | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| metadata = msg.get("metadata", {}) | |
| if not metadata: | |
| continue | |
| # Get chart paths | |
| chart_paths = metadata.get("charts") or metadata.get( | |
| "chart_paths", [] | |
| ) | |
| if not chart_paths: | |
| continue | |
| # Convert file paths to base64-encoded images | |
| base64_charts = [] | |
| for chart_path in chart_paths: | |
| try: | |
| chart_file = Path(chart_path) | |
| if chart_file.exists(): | |
| with open(chart_file, "rb") as f: | |
| chart_data = f.read() | |
| base64_str = base64.b64encode(chart_data).decode( | |
| "utf-8" | |
| ) | |
| base64_charts.append(base64_str) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to encode chart {chart_path}: {str(e)}" | |
| ) | |
| continue | |
| return base64_charts | |
| return [] | |
| def get_agent_metadata(result: dict, agent_name: str) -> dict: | |
| """Extract metadata from agent messages.""" | |
| messages = result.get("messages", []) | |
| if not messages: | |
| return {} | |
| for msg in reversed(messages): | |
| if isinstance(msg, dict) and msg.get("agent_name") == agent_name: | |
| return msg.get("metadata", {}) | |
| return {} | |
| agent_outputs = [ | |
| AgentOutput( | |
| agent_name="portfolio_manager", | |
| report=get_agent_report(portfolio_result, "portfolio_manager"), | |
| charts=get_agent_charts(portfolio_result, "portfolio_manager"), | |
| execution_time_seconds=0.0, | |
| metadata=get_agent_metadata(portfolio_result, "portfolio_manager"), | |
| ), | |
| ] | |
| phase_output = PhaseOutput( | |
| phase=AnalysisPhase.DECISION, | |
| agents=agent_outputs, | |
| phase_summary="Decision phase completed", | |
| execution_time_seconds=time.time() - phase_start, | |
| ) | |
| # Update completed phases and phase outputs | |
| completed_phases = state.get("completed_phases", []) + [phase_name] | |
| phase_outputs = state.get("phase_outputs", {}) | |
| phase_outputs[phase_name] = phase_output | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "execution_time": time.time() - phase_start, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Check budget threshold after phase completion | |
| result = { | |
| "portfolio_decision": portfolio_result.get("portfolio_decision", {}), | |
| "final_decision": portfolio_result.get("portfolio_decision", {}) | |
| .get("decision", {}) | |
| .get("recommendation", "HOLD"), | |
| "completed_phases": completed_phases, | |
| "phase_outputs": phase_outputs, | |
| "current_phase": None, | |
| } | |
| budget_alert = self._check_budget_threshold(phase_name, state) | |
| if budget_alert: | |
| result.update(budget_alert) | |
| return result | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": phase_name, | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"current_phase": None} | |
| def _generate_chart_enhanced(self, state: EnhancedWorkflowState) -> dict: | |
| """ | |
| Generate candlestick chart for the analysis. | |
| Args: | |
| state: Current enhanced workflow state | |
| Returns: | |
| Dict with chart_path | |
| """ | |
| try: | |
| market_data = state.get("market_data") | |
| if market_data is None or market_data.empty: | |
| logger.warning("No market data available for chart generation") | |
| return {"chart_path": ""} | |
| # Generate candlestick chart | |
| fig, chart_path = self.chart_generator.generate_candlestick_chart( | |
| df=market_data, | |
| ticker=state["ticker"], | |
| timeframe=state.get("timeframe", "1d"), | |
| title=f"{state['ticker']} - {state.get('timeframe', '1d')} Analysis", | |
| ) | |
| logger.info(f"Chart generated: {chart_path}") | |
| return {"chart_path": chart_path or ""} | |
| except Exception as e: | |
| logger.error(f"Chart generation failed: {str(e)}") | |
| return {"chart_path": ""} | |
| def _finalize_report(self, state: EnhancedWorkflowState) -> dict: | |
| """ | |
| Finalize the analysis report. | |
| Collects all phase outputs and creates the final AnalysisReport. | |
| """ | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": "finalization", | |
| "action": "start", | |
| "ticker": state["ticker"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Create AnalysisReport from phase outputs | |
| phase_outputs_dict = state.get("phase_outputs", {}) | |
| phase_outputs_list = [ | |
| phase_outputs_dict[phase] | |
| for phase in state.get("completed_phases", []) | |
| if phase in phase_outputs_dict | |
| ] | |
| config_dict = state.get("config", {}) | |
| investment_style = InvestmentStyle( | |
| config_dict.get("investment_style", InvestmentStyle.LONG_TERM.value) | |
| ) | |
| # Get cost summary from cost tracker if available | |
| cost_summary = None | |
| if self.cost_tracker: | |
| cost_summary = self.cost_tracker.get_summary() | |
| analysis_report = AnalysisReport( | |
| ticker=state["ticker"], | |
| timeframe=state.get("timeframe", "1w"), | |
| investment_style=investment_style, | |
| phases=phase_outputs_list, | |
| cost_summary=cost_summary, | |
| ) | |
| logger.info( | |
| json.dumps( | |
| { | |
| "phase": "finalization", | |
| "action": "complete", | |
| "ticker": state["ticker"], | |
| "phases_completed": len(phase_outputs_list), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return { | |
| "analysis_report": analysis_report.to_dict(), | |
| "executive_summary": self._generate_executive_summary( | |
| state, analysis_report | |
| ), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "phase": "finalization", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {} | |
| def _generate_executive_summary( | |
| self, state: EnhancedWorkflowState, report: AnalysisReport | |
| ) -> str: | |
| """ | |
| Generate executive summary from analysis report. | |
| Args: | |
| state: Current workflow state | |
| report: Completed analysis report | |
| Returns: | |
| Executive summary text | |
| """ | |
| summary_parts = [ | |
| f"# Executive Summary: {report.ticker}", | |
| f"**Timeframe**: {report.timeframe}", | |
| f"**Investment Style**: {report.investment_style.value.replace('_', ' ').title()}", | |
| f"**Phases Completed**: {len(report.phases)}", | |
| "", | |
| ] | |
| # Add final decision if available | |
| final_decision = state.get("final_decision") | |
| if final_decision: | |
| summary_parts.append(f"**Final Recommendation**: {final_decision}") | |
| summary_parts.append("") | |
| # Add phase summaries | |
| summary_parts.append("## Phase Summaries") | |
| for phase_output in report.phases: | |
| summary_parts.append( | |
| f"- **{phase_output.phase.value.replace('_', ' ').title()}**: " | |
| f"{phase_output.phase_summary} ({len(phase_output.agents)} agents)" | |
| ) | |
| return "\n".join(summary_parts) | |