trading-tools / graph /workflows /comprehensive_workflow.py
Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""
Comprehensive Stock Analysis Workflow
This workflow orchestrates all 11 agents for comprehensive stock research:
- Technical Analysis: Indicator, Pattern, Trend, Decision agents
- Fundamental Analysis: Fundamentals, Sentiment, News agents
- Synthesis: Technical Analyst (bridges technical + fundamental)
- Research Debate: Researcher Team (bull vs bear)
- Risk Assessment: Risk Manager
- Final Decision: Portfolio Manager
The workflow uses LangGraph for parallel execution and state management.
"""
import json
import logging
import time
import traceback
from typing import Annotated, Any, Dict, Optional, TypedDict
import pandas as pd
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from agents.fundamental.fundamentals_agent import FundamentalsAgent
from agents.fundamental.news_agent import NewsAgent
from agents.fundamental.sentiment_agent import SentimentAgent
from agents.fundamental.technical_analyst import TechnicalAnalystAgent
from agents.management.portfolio_manager import PortfolioManagerAgent
from agents.management.researcher_team import ResearcherTeamAgent
from agents.management.risk_manager import RiskManagerAgent
from agents.technical.decision_agent import DecisionAgent
from agents.technical.indicator_agent import IndicatorAgent
from agents.technical.pattern_agent import PatternAgent
from agents.technical.trend_agent import TrendAgent
from config.default_config import DEFAULT_CONFIG
from data.providers.base import DataProvider
from data.providers.yahoo_finance import YahooFinanceProvider
from data.schemas.market_data import validate_ohlc
from graph.state.trading_state import TechnicalWorkflowState
from utils.charts.chart_generator import ChartGenerator
logger = logging.getLogger(__name__)
class ComprehensiveState(TypedDict, total=False):
"""State for comprehensive workflow."""
# Input
ticker: str
timeframe: str
# Market data
market_data: pd.DataFrame
fundamental_data: Dict[str, Any]
news_data: list
# Technical analysis results (4 agents)
indicator_analysis: Dict[str, Any]
pattern_analysis: Dict[str, Any]
trend_analysis: Dict[str, Any]
decision_analysis: Dict[str, Any]
# Fundamental analysis results (3 agents)
fundamentals_analysis: Dict[str, Any]
sentiment_analysis: Dict[str, Any]
news_analysis: Dict[str, Any]
# Synthesis results
technical_analyst: Dict[str, Any] # Bridges technical + fundamental
# Research debate results
researcher_synthesis: Dict[str, Any] # Bull vs bear debate
# Risk assessment results
risk_assessment: Dict[str, Any]
# Final decision
portfolio_decision: Dict[str, Any]
# Cross-validation
contradictions: list
# Output artifacts
chart_path: str
messages: Annotated[list, add_messages] # Use reducer to handle concurrent updates
error: str | None
class ComprehensiveWorkflow:
"""Comprehensive stock analysis workflow with 11 agents."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the comprehensive workflow.
Args:
config: Optional configuration override
"""
self.config = config or DEFAULT_CONFIG
self.data_provider = YahooFinanceProvider()
self.chart_generator = ChartGenerator()
# Technical agents - pass config for indicator parameters
self.indicator_agent = IndicatorAgent(config=self.config)
self.pattern_agent = PatternAgent(config=self.config)
self.trend_agent = TrendAgent(config=self.config)
self.decision_agent = DecisionAgent(config=self.config)
# Fundamental agents - pass config for LLM settings
self.fundamentals_agent = FundamentalsAgent(config=self.config)
self.sentiment_agent = SentimentAgent(config=self.config)
self.news_agent = NewsAgent(config=self.config)
# Synthesis agent - pass config for runtime LLM overrides
self.technical_analyst = TechnicalAnalystAgent(config=self.config)
# Management agents - pass config for runtime LLM overrides
self.researcher_team = ResearcherTeamAgent(config=self.config)
self.risk_manager = RiskManagerAgent(config=self.config)
self.portfolio_manager = PortfolioManagerAgent(config=self.config)
self.workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the LangGraph workflow."""
workflow = StateGraph(ComprehensiveState)
# Data fetching nodes (parallel)
workflow.add_node("fetch_market_data", self._fetch_market_data)
workflow.add_node("fetch_fundamental_data", self._fetch_fundamental_data)
workflow.add_node("fetch_news_data", self._fetch_news_data)
# Technical analysis nodes (parallel)
workflow.add_node("indicator_analysis", self._run_indicator_agent)
workflow.add_node("pattern_analysis", self._run_pattern_agent)
workflow.add_node("trend_analysis", self._run_trend_agent)
workflow.add_node("decision_analysis", self._run_decision_agent)
# Fundamental analysis nodes (parallel)
workflow.add_node("fundamentals_analysis", self._run_fundamentals_agent)
workflow.add_node("sentiment_analysis", self._run_sentiment_agent)
workflow.add_node("news_analysis", self._run_news_agent)
# Synthesis node (waits for all 7 analysis agents)
workflow.add_node("technical_analyst", self._run_technical_analyst)
# Research debate node
workflow.add_node("researcher_debate", self._run_researcher_team)
# Risk assessment node
workflow.add_node("risk_assessment", self._run_risk_manager)
# Final decision node
workflow.add_node("portfolio_decision", self._run_portfolio_manager)
# Cross-validation node
workflow.add_node("cross_validation", self._detect_contradictions)
# Chart generation node
workflow.add_node("generate_chart", self._generate_chart)
# Define workflow edges
workflow.set_entry_point("fetch_market_data")
# After market data, fetch fundamental and news in parallel
workflow.add_edge("fetch_market_data", "fetch_fundamental_data")
workflow.add_edge("fetch_market_data", "fetch_news_data")
# After fundamental data, run fundamental agents in parallel
workflow.add_edge("fetch_fundamental_data", "fundamentals_analysis")
workflow.add_edge("fetch_fundamental_data", "sentiment_analysis")
# After news data, run news agent
workflow.add_edge("fetch_news_data", "news_analysis")
# After market data, run technical agents in parallel
workflow.add_edge("fetch_market_data", "indicator_analysis")
workflow.add_edge("fetch_market_data", "pattern_analysis")
workflow.add_edge("fetch_market_data", "trend_analysis")
# Decision agent waits for indicator, pattern, trend
workflow.add_edge("indicator_analysis", "decision_analysis")
workflow.add_edge("pattern_analysis", "decision_analysis")
workflow.add_edge("trend_analysis", "decision_analysis")
# Technical analyst waits for all 7 analysis agents
workflow.add_edge("decision_analysis", "technical_analyst")
workflow.add_edge("fundamentals_analysis", "technical_analyst")
workflow.add_edge("sentiment_analysis", "technical_analyst")
workflow.add_edge("news_analysis", "technical_analyst")
# Researcher debate waits for technical analyst
workflow.add_edge("technical_analyst", "researcher_debate")
# Risk manager waits for researcher debate
workflow.add_edge("researcher_debate", "risk_assessment")
# Portfolio manager waits for risk manager
workflow.add_edge("risk_assessment", "portfolio_decision")
# Cross-validation waits for portfolio decision
workflow.add_edge("portfolio_decision", "cross_validation")
# Chart generation waits for cross-validation
workflow.add_edge("cross_validation", "generate_chart")
# End after chart generation
workflow.add_edge("generate_chart", END)
return workflow.compile()
def run(self, ticker: str, timeframe: str = "1y") -> ComprehensiveState:
"""
Run comprehensive analysis workflow.
Args:
ticker: Stock ticker symbol
timeframe: Analysis timeframe (default: 1y)
Returns:
Final state with all analysis results
"""
start_time = time.time()
logger.info(
json.dumps(
{
"workflow": "comprehensive",
"action": "start",
"ticker": ticker,
"timeframe": timeframe,
"timestamp": time.time(),
}
)
)
try:
initial_state: ComprehensiveState = {
"ticker": ticker,
"timeframe": timeframe,
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
"contradictions": [],
}
final_state = self.workflow.invoke(initial_state)
execution_time = time.time() - start_time
logger.info(
json.dumps(
{
"workflow": "comprehensive",
"action": "complete",
"execution_time": execution_time,
"ticker": ticker,
"recommendation": final_state.get("portfolio_decision", {})
.get("decision", {})
.get("recommendation", "N/A"),
"timestamp": time.time(),
}
)
)
return final_state
except Exception as e:
logger.error(
json.dumps(
{
"workflow": "comprehensive",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
raise
# Data fetching nodes
def _fetch_market_data(self, state: ComprehensiveState) -> dict:
"""Fetch market data from Yahoo Finance."""
ticker = state["ticker"]
timeframe = state["timeframe"]
try:
# Calculate date range based on timeframe
from datetime import datetime, timedelta
end_date = datetime.now()
# Map timeframe to appropriate lookback period
if timeframe in ["1m", "5m", "15m", "30m"]:
start_date = end_date - timedelta(days=7) # 1 week for intraday
elif timeframe in ["1h", "4h"]:
start_date = end_date - timedelta(days=60) # 2 months for hourly
elif timeframe == "1d":
start_date = end_date - timedelta(days=365) # 1 year for daily
elif timeframe == "1w":
start_date = end_date - timedelta(days=730) # 2 years for weekly
elif timeframe == "1mo":
start_date = end_date - timedelta(
days=1825
) # 5 years for monthly (60 data points)
elif timeframe == "3mo":
start_date = end_date - timedelta(
days=3650
) # 10 years for quarterly (40 points)
elif timeframe == "1y":
start_date = end_date - timedelta(days=3650) # 10 years for yearly
elif timeframe == "5y":
start_date = end_date - timedelta(days=7300) # 20 years for 5-year
else:
start_date = end_date - timedelta(days=730) # Default 2 years
df = self.data_provider.fetch_ohlc(
ticker=ticker,
timeframe=timeframe,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)
df = validate_ohlc(df)
return {"market_data": df}
except Exception as e:
logger.error(
json.dumps(
{
"node": "fetch_market_data",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {
"error": f"Failed to fetch market data: {str(e)}",
"market_data": pd.DataFrame(),
}
def _fetch_fundamental_data(self, state: ComprehensiveState) -> dict:
"""Fetch fundamental data from Yahoo Finance with asset-type awareness."""
ticker = state["ticker"]
try:
# Detect asset type
asset_type = DataProvider.detect_asset_type(ticker)
asset_characteristics = DataProvider.get_asset_characteristics(asset_type)
logger.info(
json.dumps(
{
"node": "fetch_fundamental_data",
"action": "asset_type_detected",
"ticker": ticker,
"asset_type": asset_type,
"has_fundamentals": asset_characteristics["has_fundamentals"],
"timestamp": time.time(),
}
)
)
# Only fetch traditional fundamentals for stocks
if asset_type == "stock" and asset_characteristics["has_fundamentals"]:
fundamental_data = self.data_provider.fetch_fundamentals(ticker)
logger.info(
json.dumps(
{
"node": "fetch_fundamental_data",
"action": "fetched_stock_fundamentals",
"ticker": ticker,
"timestamp": time.time(),
}
)
)
else:
# For non-stock assets, create placeholder with asset type info
fundamental_data = {
"asset_type": asset_type,
"has_traditional_fundamentals": False,
"note": f"Traditional fundamental data not applicable for {asset_type} assets. "
f"Analysis will focus on {', '.join(asset_characteristics['analysis_focus'][:3])}.",
}
logger.info(
json.dumps(
{
"node": "fetch_fundamental_data",
"action": "skipped_fundamentals",
"ticker": ticker,
"asset_type": asset_type,
"reason": "Not applicable for this asset type",
"timestamp": time.time(),
}
)
)
return {"fundamental_data": fundamental_data}
except Exception as e:
logger.error(
json.dumps(
{
"node": "fetch_fundamental_data",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"fundamental_data": {}}
def _fetch_news_data(self, state: ComprehensiveState) -> dict:
"""Fetch news data from Yahoo Finance."""
ticker = state["ticker"]
try:
news_data = self.data_provider.fetch_news(ticker)
return {"news_data": news_data}
except Exception as e:
logger.error(
json.dumps(
{
"node": "fetch_news_data",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"news_data": []}
# Technical analysis nodes
def _run_indicator_agent(self, state: ComprehensiveState) -> dict:
"""Run indicator analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"market_data": {
"ohlc_data": state["market_data"].to_dict(orient="records")
},
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.indicator_agent.run(technical_state)
result = result_state.get("indicators", {})
return {
"indicator_analysis": result,
"messages": result_state.get(
"messages", []
), # Pass through agent messages
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "indicator_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"indicator_analysis": {}}
def _run_pattern_agent(self, state: ComprehensiveState) -> dict:
"""Run pattern analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"market_data": {
"ohlc_data": state["market_data"].to_dict(orient="records")
},
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.pattern_agent.run(technical_state)
result = result_state.get("patterns", {})
return {
"pattern_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "pattern_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"pattern_analysis": {}}
def _run_trend_agent(self, state: ComprehensiveState) -> dict:
"""Run trend analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"market_data": {
"ohlc_data": state["market_data"].to_dict(orient="records")
},
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.trend_agent.run(technical_state)
result = result_state.get("trends", {})
return {
"trend_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "trend_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"trend_analysis": {}}
def _run_decision_agent(self, state: ComprehensiveState) -> dict:
"""Run decision analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"market_data": {
"ohlc_data": state["market_data"].to_dict(orient="records")
},
"indicators": state.get("indicator_analysis", {}),
"patterns": state.get("pattern_analysis", {}),
"trends": state.get("trend_analysis", {}),
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.decision_agent.run(technical_state)
result = result_state.get("decision", {})
return {
"decision_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "decision_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"decision_analysis": {}}
# Fundamental analysis nodes
def _run_fundamentals_agent(self, state: ComprehensiveState) -> dict:
"""Run fundamentals analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"fundamental_data": state.get("fundamental_data", {}),
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.fundamentals_agent.run(technical_state)
result = result_state.get("fundamentals", {})
return {
"fundamentals_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "fundamentals_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"fundamentals_analysis": {}}
def _run_sentiment_agent(self, state: ComprehensiveState) -> dict:
"""Run sentiment analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"news_data": state.get("news_data", []),
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.sentiment_agent.run(technical_state)
result = result_state.get("sentiment", {})
return {
"sentiment_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "sentiment_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"sentiment_analysis": {}}
def _run_news_agent(self, state: ComprehensiveState) -> dict:
"""Run news analysis."""
try:
# Create technical workflow state for the agent
technical_state: TechnicalWorkflowState = {
"ticker": state["ticker"],
"timeframe": state["timeframe"],
"news_data": state.get("news_data", []),
"messages": [],
"_cost_tracker": state.get(
"_cost_tracker"
), # Pass cost tracker to agents
}
result_state = self.news_agent.run(technical_state)
result = result_state.get("news", {})
return {
"news_analysis": result,
"messages": result_state.get("messages", []),
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "news_analysis",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"news_analysis": {}}
# Synthesis node
def _run_technical_analyst(self, state: ComprehensiveState) -> dict:
"""Run technical analyst synthesis."""
try:
# Extract investment style if available
investment_style = None
if "config" in state:
config = state["config"]
if isinstance(config, dict):
investment_style = config.get("investment_style")
if not investment_style:
investment_style = state.get("investment_style")
result = self.technical_analyst.analyze(
state["ticker"],
state["timeframe"],
state.get("indicator_analysis", {}),
state.get("pattern_analysis", {}),
state.get("trend_analysis", {}),
state.get("fundamentals_analysis", {}),
investment_style=investment_style,
)
return {
"technical_analyst": result,
"messages": [result.get("assessment", "")],
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "technical_analyst",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"technical_analyst": {}}
# Research debate node
def _run_researcher_team(self, state: ComprehensiveState) -> dict:
"""Run researcher team debate."""
try:
# Extract investment style if available
investment_style = None
if "config" in state:
config = state["config"]
if isinstance(config, dict):
investment_style = config.get("investment_style")
if not investment_style:
investment_style = state.get("investment_style")
# Combine all analysis for debate
all_analysis = {
"indicator_analysis": state.get("indicator_analysis", {}),
"pattern_analysis": state.get("pattern_analysis", {}),
"trend_analysis": state.get("trend_analysis", {}),
"decision_analysis": state.get("decision_analysis", {}),
"fundamentals_analysis": state.get("fundamentals_analysis", {}),
"sentiment_analysis": state.get("sentiment_analysis", {}),
"news_analysis": state.get("news_analysis", {}),
"technical_analyst": state.get("technical_analyst", {}),
}
result = self.researcher_team.analyze(
state["ticker"],
state["timeframe"],
all_analysis,
investment_style=investment_style,
)
# Format the complete debate output as markdown
debate_output = "# Research Team Investment Debate\n\n"
# Bull Case
bullish_case = result.get("bullish_case", {})
debate_output += "## 🐂 Bull Researcher\n\n"
debate_output += (
bullish_case.get("argument", "No bullish case generated") + "\n\n"
)
# Bear Case
bearish_case = result.get("bearish_case", {})
debate_output += "## 🐻 Bear Researcher\n\n"
debate_output += (
bearish_case.get("argument", "No bearish case generated") + "\n\n"
)
# Synthesis
synthesis = result.get("synthesis", {})
debate_output += "## ⚖️ Neutral Moderator - Balanced Assessment\n\n"
debate_output += (
synthesis.get("synthesis", "No synthesis generated") + "\n\n"
)
# Signal summary
debate_output += "### Signal Summary\n\n"
debate_output += (
f"- **Bullish Signals**: {bullish_case.get('signal_count', 0)}\n"
)
debate_output += (
f"- **Bearish Signals**: {bearish_case.get('signal_count', 0)}\n"
)
debate_output += f"- **Overall Lean**: {synthesis.get('overall_lean', 'neutral').upper()}\n"
return {
"researcher_synthesis": result,
"messages": state.get("messages", [])
+ [
{
"agent_name": "researcher_team",
"content": debate_output,
"metadata": {
"bullish_case": bullish_case,
"bearish_case": bearish_case,
"synthesis": synthesis,
},
}
],
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "researcher_debate",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"researcher_synthesis": {}}
# Risk assessment node
def _run_risk_manager(self, state: ComprehensiveState) -> dict:
"""Run risk manager assessment."""
try:
# Extract investment style if available
investment_style = None
if "config" in state:
config = state["config"]
if isinstance(config, dict):
investment_style = config.get("investment_style")
if not investment_style:
investment_style = state.get("investment_style")
all_analysis = {
"indicator_analysis": state.get("indicator_analysis", {}),
"pattern_analysis": state.get("pattern_analysis", {}),
"trend_analysis": state.get("trend_analysis", {}),
"decision_analysis": state.get("decision_analysis", {}),
"fundamentals_analysis": state.get("fundamentals_analysis", {}),
"sentiment_analysis": state.get("sentiment_analysis", {}),
"news_analysis": state.get("news_analysis", {}),
"technical_analyst": state.get("technical_analyst", {}),
}
result = self.risk_manager.analyze(
state["ticker"],
state["timeframe"],
state.get("market_data", pd.DataFrame()),
all_analysis,
state.get("researcher_synthesis", {}),
investment_style=investment_style,
)
return {
"risk_assessment": result,
"messages": [result.get("assessment", "")],
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "risk_assessment",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"risk_assessment": {}}
# Final decision node
def _run_portfolio_manager(self, state: ComprehensiveState) -> dict:
"""Run portfolio manager final decision."""
try:
# Extract investment style if available
investment_style = None
if "config" in state:
config = state["config"]
if isinstance(config, dict):
investment_style = config.get("investment_style")
if not investment_style:
investment_style = state.get("investment_style")
all_analysis = {
"indicator_analysis": state.get("indicator_analysis", {}),
"pattern_analysis": state.get("pattern_analysis", {}),
"trend_analysis": state.get("trend_analysis", {}),
"decision_analysis": state.get("decision_analysis", {}),
"fundamentals_analysis": state.get("fundamentals_analysis", {}),
"sentiment_analysis": state.get("sentiment_analysis", {}),
"news_analysis": state.get("news_analysis", {}),
"technical_analyst": state.get("technical_analyst", {}),
}
result = self.portfolio_manager.analyze(
state["ticker"],
state["timeframe"],
all_analysis,
state.get("researcher_synthesis", {}),
state.get("risk_assessment", {}),
investment_style=investment_style,
)
return {
"portfolio_decision": result,
"messages": [result.get("rationale", "")],
}
except Exception as e:
logger.error(
json.dumps(
{
"node": "portfolio_decision",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"portfolio_decision": {}}
# Cross-validation node
def _detect_contradictions(self, state: ComprehensiveState) -> dict:
"""Detect contradictions between agents."""
contradictions = []
# Check technical vs fundamental alignment
technical_analyst = state.get("technical_analyst", {})
alignment = technical_analyst.get("alignment", {})
alignment_score = alignment.get("alignment_score", 1.0)
if alignment_score < 0.5:
contradictions.append(
{
"type": "technical_fundamental_divergence",
"severity": "high" if alignment_score == 0 else "moderate",
"description": f"Technical and fundamental analysis diverge (alignment: {alignment_score:.2f})",
}
)
# Check research debate balance
researcher_synthesis = state.get("researcher_synthesis", {})
synthesis = researcher_synthesis.get("synthesis", {})
signal_ratio = synthesis.get("signal_ratio", 0.5)
if 0.4 < signal_ratio < 0.6:
contradictions.append(
{
"type": "mixed_signals",
"severity": "moderate",
"description": f"Mixed signals from research team (ratio: {signal_ratio:.2f})",
}
)
# Check recommendation vs risk
portfolio_decision = state.get("portfolio_decision", {})
decision = portfolio_decision.get("decision", {})
recommendation = decision.get("recommendation", "hold")
risk_assessment = state.get("risk_assessment", {})
risk_score = risk_assessment.get("risk_score", 50)
if recommendation == "buy" and risk_score > 70:
contradictions.append(
{
"type": "recommendation_risk_mismatch",
"severity": "high",
"description": f"Buy recommendation despite high risk score ({risk_score:.1f}/100)",
}
)
return {"contradictions": contradictions}
# Chart generation node
def _generate_chart(self, state: ComprehensiveState) -> dict:
"""Generate analysis chart."""
try:
market_data = state.get("market_data", pd.DataFrame())
# Skip chart generation if market data is empty or missing
if market_data.empty:
logger.warning(
json.dumps(
{
"node": "generate_chart",
"action": "skipped",
"ticker": state["ticker"],
"reason": "market_data is empty",
"timestamp": time.time(),
}
)
)
return {"chart_path": ""}
# generate_candlestick_chart returns (Figure, filepath) tuple
fig, chart_path = self.chart_generator.generate_candlestick_chart(
df=market_data,
ticker=state["ticker"],
timeframe=state["timeframe"],
title=f"{state['ticker']} - {state['timeframe']} Analysis",
)
return {"chart_path": chart_path or ""}
except Exception as e:
logger.error(
json.dumps(
{
"node": "generate_chart",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"chart_path": ""}