Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive Stock Analysis Workflow | |
| This workflow orchestrates all 11 agents for comprehensive stock research: | |
| - Technical Analysis: Indicator, Pattern, Trend, Decision agents | |
| - Fundamental Analysis: Fundamentals, Sentiment, News agents | |
| - Synthesis: Technical Analyst (bridges technical + fundamental) | |
| - Research Debate: Researcher Team (bull vs bear) | |
| - Risk Assessment: Risk Manager | |
| - Final Decision: Portfolio Manager | |
| The workflow uses LangGraph for parallel execution and state management. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| import traceback | |
| from typing import Annotated, Any, Dict, Optional, TypedDict | |
| import pandas as pd | |
| from langgraph.graph import END, StateGraph | |
| from langgraph.graph.message import add_messages | |
| from agents.fundamental.fundamentals_agent import FundamentalsAgent | |
| from agents.fundamental.news_agent import NewsAgent | |
| from agents.fundamental.sentiment_agent import SentimentAgent | |
| from agents.fundamental.technical_analyst import TechnicalAnalystAgent | |
| from agents.management.portfolio_manager import PortfolioManagerAgent | |
| from agents.management.researcher_team import ResearcherTeamAgent | |
| from agents.management.risk_manager import RiskManagerAgent | |
| from agents.technical.decision_agent import DecisionAgent | |
| from agents.technical.indicator_agent import IndicatorAgent | |
| from agents.technical.pattern_agent import PatternAgent | |
| from agents.technical.trend_agent import TrendAgent | |
| from config.default_config import DEFAULT_CONFIG | |
| from data.providers.base import DataProvider | |
| from data.providers.yahoo_finance import YahooFinanceProvider | |
| from data.schemas.market_data import validate_ohlc | |
| from graph.state.trading_state import TechnicalWorkflowState | |
| from utils.charts.chart_generator import ChartGenerator | |
| logger = logging.getLogger(__name__) | |
| class ComprehensiveState(TypedDict, total=False): | |
| """State for comprehensive workflow.""" | |
| # Input | |
| ticker: str | |
| timeframe: str | |
| # Market data | |
| market_data: pd.DataFrame | |
| fundamental_data: Dict[str, Any] | |
| news_data: list | |
| # Technical analysis results (4 agents) | |
| indicator_analysis: Dict[str, Any] | |
| pattern_analysis: Dict[str, Any] | |
| trend_analysis: Dict[str, Any] | |
| decision_analysis: Dict[str, Any] | |
| # Fundamental analysis results (3 agents) | |
| fundamentals_analysis: Dict[str, Any] | |
| sentiment_analysis: Dict[str, Any] | |
| news_analysis: Dict[str, Any] | |
| # Synthesis results | |
| technical_analyst: Dict[str, Any] # Bridges technical + fundamental | |
| # Research debate results | |
| researcher_synthesis: Dict[str, Any] # Bull vs bear debate | |
| # Risk assessment results | |
| risk_assessment: Dict[str, Any] | |
| # Final decision | |
| portfolio_decision: Dict[str, Any] | |
| # Cross-validation | |
| contradictions: list | |
| # Output artifacts | |
| chart_path: str | |
| messages: Annotated[list, add_messages] # Use reducer to handle concurrent updates | |
| error: str | None | |
| class ComprehensiveWorkflow: | |
| """Comprehensive stock analysis workflow with 11 agents.""" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """ | |
| Initialize the comprehensive workflow. | |
| Args: | |
| config: Optional configuration override | |
| """ | |
| self.config = config or DEFAULT_CONFIG | |
| self.data_provider = YahooFinanceProvider() | |
| self.chart_generator = ChartGenerator() | |
| # Technical agents - pass config for indicator parameters | |
| self.indicator_agent = IndicatorAgent(config=self.config) | |
| self.pattern_agent = PatternAgent(config=self.config) | |
| self.trend_agent = TrendAgent(config=self.config) | |
| self.decision_agent = DecisionAgent(config=self.config) | |
| # Fundamental agents - pass config for LLM settings | |
| self.fundamentals_agent = FundamentalsAgent(config=self.config) | |
| self.sentiment_agent = SentimentAgent(config=self.config) | |
| self.news_agent = NewsAgent(config=self.config) | |
| # Synthesis agent - pass config for runtime LLM overrides | |
| self.technical_analyst = TechnicalAnalystAgent(config=self.config) | |
| # Management agents - pass config for runtime LLM overrides | |
| self.researcher_team = ResearcherTeamAgent(config=self.config) | |
| self.risk_manager = RiskManagerAgent(config=self.config) | |
| self.portfolio_manager = PortfolioManagerAgent(config=self.config) | |
| self.workflow = self._build_workflow() | |
| def _build_workflow(self) -> StateGraph: | |
| """Build the LangGraph workflow.""" | |
| workflow = StateGraph(ComprehensiveState) | |
| # Data fetching nodes (parallel) | |
| workflow.add_node("fetch_market_data", self._fetch_market_data) | |
| workflow.add_node("fetch_fundamental_data", self._fetch_fundamental_data) | |
| workflow.add_node("fetch_news_data", self._fetch_news_data) | |
| # Technical analysis nodes (parallel) | |
| workflow.add_node("indicator_analysis", self._run_indicator_agent) | |
| workflow.add_node("pattern_analysis", self._run_pattern_agent) | |
| workflow.add_node("trend_analysis", self._run_trend_agent) | |
| workflow.add_node("decision_analysis", self._run_decision_agent) | |
| # Fundamental analysis nodes (parallel) | |
| workflow.add_node("fundamentals_analysis", self._run_fundamentals_agent) | |
| workflow.add_node("sentiment_analysis", self._run_sentiment_agent) | |
| workflow.add_node("news_analysis", self._run_news_agent) | |
| # Synthesis node (waits for all 7 analysis agents) | |
| workflow.add_node("technical_analyst", self._run_technical_analyst) | |
| # Research debate node | |
| workflow.add_node("researcher_debate", self._run_researcher_team) | |
| # Risk assessment node | |
| workflow.add_node("risk_assessment", self._run_risk_manager) | |
| # Final decision node | |
| workflow.add_node("portfolio_decision", self._run_portfolio_manager) | |
| # Cross-validation node | |
| workflow.add_node("cross_validation", self._detect_contradictions) | |
| # Chart generation node | |
| workflow.add_node("generate_chart", self._generate_chart) | |
| # Define workflow edges | |
| workflow.set_entry_point("fetch_market_data") | |
| # After market data, fetch fundamental and news in parallel | |
| workflow.add_edge("fetch_market_data", "fetch_fundamental_data") | |
| workflow.add_edge("fetch_market_data", "fetch_news_data") | |
| # After fundamental data, run fundamental agents in parallel | |
| workflow.add_edge("fetch_fundamental_data", "fundamentals_analysis") | |
| workflow.add_edge("fetch_fundamental_data", "sentiment_analysis") | |
| # After news data, run news agent | |
| workflow.add_edge("fetch_news_data", "news_analysis") | |
| # After market data, run technical agents in parallel | |
| workflow.add_edge("fetch_market_data", "indicator_analysis") | |
| workflow.add_edge("fetch_market_data", "pattern_analysis") | |
| workflow.add_edge("fetch_market_data", "trend_analysis") | |
| # Decision agent waits for indicator, pattern, trend | |
| workflow.add_edge("indicator_analysis", "decision_analysis") | |
| workflow.add_edge("pattern_analysis", "decision_analysis") | |
| workflow.add_edge("trend_analysis", "decision_analysis") | |
| # Technical analyst waits for all 7 analysis agents | |
| workflow.add_edge("decision_analysis", "technical_analyst") | |
| workflow.add_edge("fundamentals_analysis", "technical_analyst") | |
| workflow.add_edge("sentiment_analysis", "technical_analyst") | |
| workflow.add_edge("news_analysis", "technical_analyst") | |
| # Researcher debate waits for technical analyst | |
| workflow.add_edge("technical_analyst", "researcher_debate") | |
| # Risk manager waits for researcher debate | |
| workflow.add_edge("researcher_debate", "risk_assessment") | |
| # Portfolio manager waits for risk manager | |
| workflow.add_edge("risk_assessment", "portfolio_decision") | |
| # Cross-validation waits for portfolio decision | |
| workflow.add_edge("portfolio_decision", "cross_validation") | |
| # Chart generation waits for cross-validation | |
| workflow.add_edge("cross_validation", "generate_chart") | |
| # End after chart generation | |
| workflow.add_edge("generate_chart", END) | |
| return workflow.compile() | |
| def run(self, ticker: str, timeframe: str = "1y") -> ComprehensiveState: | |
| """ | |
| Run comprehensive analysis workflow. | |
| Args: | |
| ticker: Stock ticker symbol | |
| timeframe: Analysis timeframe (default: 1y) | |
| Returns: | |
| Final state with all analysis results | |
| """ | |
| start_time = time.time() | |
| logger.info( | |
| json.dumps( | |
| { | |
| "workflow": "comprehensive", | |
| "action": "start", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| initial_state: ComprehensiveState = { | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| "contradictions": [], | |
| } | |
| final_state = self.workflow.invoke(initial_state) | |
| execution_time = time.time() - start_time | |
| logger.info( | |
| json.dumps( | |
| { | |
| "workflow": "comprehensive", | |
| "action": "complete", | |
| "execution_time": execution_time, | |
| "ticker": ticker, | |
| "recommendation": final_state.get("portfolio_decision", {}) | |
| .get("decision", {}) | |
| .get("recommendation", "N/A"), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return final_state | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "workflow": "comprehensive", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| raise | |
| # Data fetching nodes | |
| def _fetch_market_data(self, state: ComprehensiveState) -> dict: | |
| """Fetch market data from Yahoo Finance.""" | |
| ticker = state["ticker"] | |
| timeframe = state["timeframe"] | |
| try: | |
| # Calculate date range based on timeframe | |
| from datetime import datetime, timedelta | |
| end_date = datetime.now() | |
| # Map timeframe to appropriate lookback period | |
| if timeframe in ["1m", "5m", "15m", "30m"]: | |
| start_date = end_date - timedelta(days=7) # 1 week for intraday | |
| elif timeframe in ["1h", "4h"]: | |
| start_date = end_date - timedelta(days=60) # 2 months for hourly | |
| elif timeframe == "1d": | |
| start_date = end_date - timedelta(days=365) # 1 year for daily | |
| elif timeframe == "1w": | |
| start_date = end_date - timedelta(days=730) # 2 years for weekly | |
| elif timeframe == "1mo": | |
| start_date = end_date - timedelta( | |
| days=1825 | |
| ) # 5 years for monthly (60 data points) | |
| elif timeframe == "3mo": | |
| start_date = end_date - timedelta( | |
| days=3650 | |
| ) # 10 years for quarterly (40 points) | |
| elif timeframe == "1y": | |
| start_date = end_date - timedelta(days=3650) # 10 years for yearly | |
| elif timeframe == "5y": | |
| start_date = end_date - timedelta(days=7300) # 20 years for 5-year | |
| else: | |
| start_date = end_date - timedelta(days=730) # Default 2 years | |
| df = self.data_provider.fetch_ohlc( | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| start_date=start_date.strftime("%Y-%m-%d"), | |
| end_date=end_date.strftime("%Y-%m-%d"), | |
| ) | |
| df = validate_ohlc(df) | |
| return {"market_data": df} | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "fetch_market_data", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return { | |
| "error": f"Failed to fetch market data: {str(e)}", | |
| "market_data": pd.DataFrame(), | |
| } | |
| def _fetch_fundamental_data(self, state: ComprehensiveState) -> dict: | |
| """Fetch fundamental data from Yahoo Finance with asset-type awareness.""" | |
| ticker = state["ticker"] | |
| try: | |
| # Detect asset type | |
| asset_type = DataProvider.detect_asset_type(ticker) | |
| asset_characteristics = DataProvider.get_asset_characteristics(asset_type) | |
| logger.info( | |
| json.dumps( | |
| { | |
| "node": "fetch_fundamental_data", | |
| "action": "asset_type_detected", | |
| "ticker": ticker, | |
| "asset_type": asset_type, | |
| "has_fundamentals": asset_characteristics["has_fundamentals"], | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Only fetch traditional fundamentals for stocks | |
| if asset_type == "stock" and asset_characteristics["has_fundamentals"]: | |
| fundamental_data = self.data_provider.fetch_fundamentals(ticker) | |
| logger.info( | |
| json.dumps( | |
| { | |
| "node": "fetch_fundamental_data", | |
| "action": "fetched_stock_fundamentals", | |
| "ticker": ticker, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| else: | |
| # For non-stock assets, create placeholder with asset type info | |
| fundamental_data = { | |
| "asset_type": asset_type, | |
| "has_traditional_fundamentals": False, | |
| "note": f"Traditional fundamental data not applicable for {asset_type} assets. " | |
| f"Analysis will focus on {', '.join(asset_characteristics['analysis_focus'][:3])}.", | |
| } | |
| logger.info( | |
| json.dumps( | |
| { | |
| "node": "fetch_fundamental_data", | |
| "action": "skipped_fundamentals", | |
| "ticker": ticker, | |
| "asset_type": asset_type, | |
| "reason": "Not applicable for this asset type", | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"fundamental_data": fundamental_data} | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "fetch_fundamental_data", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"fundamental_data": {}} | |
| def _fetch_news_data(self, state: ComprehensiveState) -> dict: | |
| """Fetch news data from Yahoo Finance.""" | |
| ticker = state["ticker"] | |
| try: | |
| news_data = self.data_provider.fetch_news(ticker) | |
| return {"news_data": news_data} | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "fetch_news_data", | |
| "action": "error", | |
| "ticker": ticker, | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"news_data": []} | |
| # Technical analysis nodes | |
| def _run_indicator_agent(self, state: ComprehensiveState) -> dict: | |
| """Run indicator analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "market_data": { | |
| "ohlc_data": state["market_data"].to_dict(orient="records") | |
| }, | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.indicator_agent.run(technical_state) | |
| result = result_state.get("indicators", {}) | |
| return { | |
| "indicator_analysis": result, | |
| "messages": result_state.get( | |
| "messages", [] | |
| ), # Pass through agent messages | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "indicator_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"indicator_analysis": {}} | |
| def _run_pattern_agent(self, state: ComprehensiveState) -> dict: | |
| """Run pattern analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "market_data": { | |
| "ohlc_data": state["market_data"].to_dict(orient="records") | |
| }, | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.pattern_agent.run(technical_state) | |
| result = result_state.get("patterns", {}) | |
| return { | |
| "pattern_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "pattern_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"pattern_analysis": {}} | |
| def _run_trend_agent(self, state: ComprehensiveState) -> dict: | |
| """Run trend analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "market_data": { | |
| "ohlc_data": state["market_data"].to_dict(orient="records") | |
| }, | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.trend_agent.run(technical_state) | |
| result = result_state.get("trends", {}) | |
| return { | |
| "trend_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "trend_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"trend_analysis": {}} | |
| def _run_decision_agent(self, state: ComprehensiveState) -> dict: | |
| """Run decision analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "market_data": { | |
| "ohlc_data": state["market_data"].to_dict(orient="records") | |
| }, | |
| "indicators": state.get("indicator_analysis", {}), | |
| "patterns": state.get("pattern_analysis", {}), | |
| "trends": state.get("trend_analysis", {}), | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.decision_agent.run(technical_state) | |
| result = result_state.get("decision", {}) | |
| return { | |
| "decision_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "decision_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"decision_analysis": {}} | |
| # Fundamental analysis nodes | |
| def _run_fundamentals_agent(self, state: ComprehensiveState) -> dict: | |
| """Run fundamentals analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "fundamental_data": state.get("fundamental_data", {}), | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.fundamentals_agent.run(technical_state) | |
| result = result_state.get("fundamentals", {}) | |
| return { | |
| "fundamentals_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "fundamentals_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"fundamentals_analysis": {}} | |
| def _run_sentiment_agent(self, state: ComprehensiveState) -> dict: | |
| """Run sentiment analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "news_data": state.get("news_data", []), | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.sentiment_agent.run(technical_state) | |
| result = result_state.get("sentiment", {}) | |
| return { | |
| "sentiment_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "sentiment_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"sentiment_analysis": {}} | |
| def _run_news_agent(self, state: ComprehensiveState) -> dict: | |
| """Run news analysis.""" | |
| try: | |
| # Create technical workflow state for the agent | |
| technical_state: TechnicalWorkflowState = { | |
| "ticker": state["ticker"], | |
| "timeframe": state["timeframe"], | |
| "news_data": state.get("news_data", []), | |
| "messages": [], | |
| "_cost_tracker": state.get( | |
| "_cost_tracker" | |
| ), # Pass cost tracker to agents | |
| } | |
| result_state = self.news_agent.run(technical_state) | |
| result = result_state.get("news", {}) | |
| return { | |
| "news_analysis": result, | |
| "messages": result_state.get("messages", []), | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "news_analysis", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"news_analysis": {}} | |
| # Synthesis node | |
| def _run_technical_analyst(self, state: ComprehensiveState) -> dict: | |
| """Run technical analyst synthesis.""" | |
| try: | |
| # Extract investment style if available | |
| investment_style = None | |
| if "config" in state: | |
| config = state["config"] | |
| if isinstance(config, dict): | |
| investment_style = config.get("investment_style") | |
| if not investment_style: | |
| investment_style = state.get("investment_style") | |
| result = self.technical_analyst.analyze( | |
| state["ticker"], | |
| state["timeframe"], | |
| state.get("indicator_analysis", {}), | |
| state.get("pattern_analysis", {}), | |
| state.get("trend_analysis", {}), | |
| state.get("fundamentals_analysis", {}), | |
| investment_style=investment_style, | |
| ) | |
| return { | |
| "technical_analyst": result, | |
| "messages": [result.get("assessment", "")], | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "technical_analyst", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"technical_analyst": {}} | |
| # Research debate node | |
| def _run_researcher_team(self, state: ComprehensiveState) -> dict: | |
| """Run researcher team debate.""" | |
| try: | |
| # Extract investment style if available | |
| investment_style = None | |
| if "config" in state: | |
| config = state["config"] | |
| if isinstance(config, dict): | |
| investment_style = config.get("investment_style") | |
| if not investment_style: | |
| investment_style = state.get("investment_style") | |
| # Combine all analysis for debate | |
| all_analysis = { | |
| "indicator_analysis": state.get("indicator_analysis", {}), | |
| "pattern_analysis": state.get("pattern_analysis", {}), | |
| "trend_analysis": state.get("trend_analysis", {}), | |
| "decision_analysis": state.get("decision_analysis", {}), | |
| "fundamentals_analysis": state.get("fundamentals_analysis", {}), | |
| "sentiment_analysis": state.get("sentiment_analysis", {}), | |
| "news_analysis": state.get("news_analysis", {}), | |
| "technical_analyst": state.get("technical_analyst", {}), | |
| } | |
| result = self.researcher_team.analyze( | |
| state["ticker"], | |
| state["timeframe"], | |
| all_analysis, | |
| investment_style=investment_style, | |
| ) | |
| # Format the complete debate output as markdown | |
| debate_output = "# Research Team Investment Debate\n\n" | |
| # Bull Case | |
| bullish_case = result.get("bullish_case", {}) | |
| debate_output += "## 🐂 Bull Researcher\n\n" | |
| debate_output += ( | |
| bullish_case.get("argument", "No bullish case generated") + "\n\n" | |
| ) | |
| # Bear Case | |
| bearish_case = result.get("bearish_case", {}) | |
| debate_output += "## 🐻 Bear Researcher\n\n" | |
| debate_output += ( | |
| bearish_case.get("argument", "No bearish case generated") + "\n\n" | |
| ) | |
| # Synthesis | |
| synthesis = result.get("synthesis", {}) | |
| debate_output += "## ⚖️ Neutral Moderator - Balanced Assessment\n\n" | |
| debate_output += ( | |
| synthesis.get("synthesis", "No synthesis generated") + "\n\n" | |
| ) | |
| # Signal summary | |
| debate_output += "### Signal Summary\n\n" | |
| debate_output += ( | |
| f"- **Bullish Signals**: {bullish_case.get('signal_count', 0)}\n" | |
| ) | |
| debate_output += ( | |
| f"- **Bearish Signals**: {bearish_case.get('signal_count', 0)}\n" | |
| ) | |
| debate_output += f"- **Overall Lean**: {synthesis.get('overall_lean', 'neutral').upper()}\n" | |
| return { | |
| "researcher_synthesis": result, | |
| "messages": state.get("messages", []) | |
| + [ | |
| { | |
| "agent_name": "researcher_team", | |
| "content": debate_output, | |
| "metadata": { | |
| "bullish_case": bullish_case, | |
| "bearish_case": bearish_case, | |
| "synthesis": synthesis, | |
| }, | |
| } | |
| ], | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "researcher_debate", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"researcher_synthesis": {}} | |
| # Risk assessment node | |
| def _run_risk_manager(self, state: ComprehensiveState) -> dict: | |
| """Run risk manager assessment.""" | |
| try: | |
| # Extract investment style if available | |
| investment_style = None | |
| if "config" in state: | |
| config = state["config"] | |
| if isinstance(config, dict): | |
| investment_style = config.get("investment_style") | |
| if not investment_style: | |
| investment_style = state.get("investment_style") | |
| all_analysis = { | |
| "indicator_analysis": state.get("indicator_analysis", {}), | |
| "pattern_analysis": state.get("pattern_analysis", {}), | |
| "trend_analysis": state.get("trend_analysis", {}), | |
| "decision_analysis": state.get("decision_analysis", {}), | |
| "fundamentals_analysis": state.get("fundamentals_analysis", {}), | |
| "sentiment_analysis": state.get("sentiment_analysis", {}), | |
| "news_analysis": state.get("news_analysis", {}), | |
| "technical_analyst": state.get("technical_analyst", {}), | |
| } | |
| result = self.risk_manager.analyze( | |
| state["ticker"], | |
| state["timeframe"], | |
| state.get("market_data", pd.DataFrame()), | |
| all_analysis, | |
| state.get("researcher_synthesis", {}), | |
| investment_style=investment_style, | |
| ) | |
| return { | |
| "risk_assessment": result, | |
| "messages": [result.get("assessment", "")], | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "risk_assessment", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"risk_assessment": {}} | |
| # Final decision node | |
| def _run_portfolio_manager(self, state: ComprehensiveState) -> dict: | |
| """Run portfolio manager final decision.""" | |
| try: | |
| # Extract investment style if available | |
| investment_style = None | |
| if "config" in state: | |
| config = state["config"] | |
| if isinstance(config, dict): | |
| investment_style = config.get("investment_style") | |
| if not investment_style: | |
| investment_style = state.get("investment_style") | |
| all_analysis = { | |
| "indicator_analysis": state.get("indicator_analysis", {}), | |
| "pattern_analysis": state.get("pattern_analysis", {}), | |
| "trend_analysis": state.get("trend_analysis", {}), | |
| "decision_analysis": state.get("decision_analysis", {}), | |
| "fundamentals_analysis": state.get("fundamentals_analysis", {}), | |
| "sentiment_analysis": state.get("sentiment_analysis", {}), | |
| "news_analysis": state.get("news_analysis", {}), | |
| "technical_analyst": state.get("technical_analyst", {}), | |
| } | |
| result = self.portfolio_manager.analyze( | |
| state["ticker"], | |
| state["timeframe"], | |
| all_analysis, | |
| state.get("researcher_synthesis", {}), | |
| state.get("risk_assessment", {}), | |
| investment_style=investment_style, | |
| ) | |
| return { | |
| "portfolio_decision": result, | |
| "messages": [result.get("rationale", "")], | |
| } | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "portfolio_decision", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"portfolio_decision": {}} | |
| # Cross-validation node | |
| def _detect_contradictions(self, state: ComprehensiveState) -> dict: | |
| """Detect contradictions between agents.""" | |
| contradictions = [] | |
| # Check technical vs fundamental alignment | |
| technical_analyst = state.get("technical_analyst", {}) | |
| alignment = technical_analyst.get("alignment", {}) | |
| alignment_score = alignment.get("alignment_score", 1.0) | |
| if alignment_score < 0.5: | |
| contradictions.append( | |
| { | |
| "type": "technical_fundamental_divergence", | |
| "severity": "high" if alignment_score == 0 else "moderate", | |
| "description": f"Technical and fundamental analysis diverge (alignment: {alignment_score:.2f})", | |
| } | |
| ) | |
| # Check research debate balance | |
| researcher_synthesis = state.get("researcher_synthesis", {}) | |
| synthesis = researcher_synthesis.get("synthesis", {}) | |
| signal_ratio = synthesis.get("signal_ratio", 0.5) | |
| if 0.4 < signal_ratio < 0.6: | |
| contradictions.append( | |
| { | |
| "type": "mixed_signals", | |
| "severity": "moderate", | |
| "description": f"Mixed signals from research team (ratio: {signal_ratio:.2f})", | |
| } | |
| ) | |
| # Check recommendation vs risk | |
| portfolio_decision = state.get("portfolio_decision", {}) | |
| decision = portfolio_decision.get("decision", {}) | |
| recommendation = decision.get("recommendation", "hold") | |
| risk_assessment = state.get("risk_assessment", {}) | |
| risk_score = risk_assessment.get("risk_score", 50) | |
| if recommendation == "buy" and risk_score > 70: | |
| contradictions.append( | |
| { | |
| "type": "recommendation_risk_mismatch", | |
| "severity": "high", | |
| "description": f"Buy recommendation despite high risk score ({risk_score:.1f}/100)", | |
| } | |
| ) | |
| return {"contradictions": contradictions} | |
| # Chart generation node | |
| def _generate_chart(self, state: ComprehensiveState) -> dict: | |
| """Generate analysis chart.""" | |
| try: | |
| market_data = state.get("market_data", pd.DataFrame()) | |
| # Skip chart generation if market data is empty or missing | |
| if market_data.empty: | |
| logger.warning( | |
| json.dumps( | |
| { | |
| "node": "generate_chart", | |
| "action": "skipped", | |
| "ticker": state["ticker"], | |
| "reason": "market_data is empty", | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"chart_path": ""} | |
| # generate_candlestick_chart returns (Figure, filepath) tuple | |
| fig, chart_path = self.chart_generator.generate_candlestick_chart( | |
| df=market_data, | |
| ticker=state["ticker"], | |
| timeframe=state["timeframe"], | |
| title=f"{state['ticker']} - {state['timeframe']} Analysis", | |
| ) | |
| return {"chart_path": chart_path or ""} | |
| except Exception as e: | |
| logger.error( | |
| json.dumps( | |
| { | |
| "node": "generate_chart", | |
| "action": "error", | |
| "ticker": state["ticker"], | |
| "error": str(e), | |
| "traceback": traceback.format_exc(), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return {"chart_path": ""} | |