trading-tools / graph /workflows /conditional_workflow.py
Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""
Conditional Comprehensive Stock Analysis Workflow
This workflow extends ComprehensiveWorkflow with conditional phase execution.
Users can selectively enable/disable 6 analysis phases:
- Technical: Indicator, Pattern, Trend, Decision agents
- Fundamental: Fundamentals, Sentiment analysis
- Sentiment: News analysis and sentiment tracking
- Research Synthesis: Technical Analyst (bridges technical + fundamental)
- Risk: Risk Manager assessment
- Decision: Portfolio Manager final decision
The workflow uses PhaseConfiguration to control which phases execute,
and tracks progress through EnhancedWorkflowState.
"""
import json
import logging
import time
import traceback
from typing import Any, Dict, List, Literal, Optional
import pandas as pd
from langgraph.graph import END, StateGraph
from config.models import AnalysisPhase, InvestmentStyle, PhaseConfiguration
from graph.state.trading_state import (
EnhancedWorkflowState,
create_initial_enhanced_state,
)
from graph.workflows.comprehensive_workflow import ComprehensiveWorkflow
from utils.callbacks.cost_tracking_callback import WorkflowCostTracker
from utils.markdown_validator import validate_agent_response
from utils.phase_scoring import (
calculate_fundamental_phase_score,
calculate_research_synthesis_phase_score,
calculate_risk_phase_score,
calculate_sentiment_phase_score,
calculate_technical_phase_score,
)
from utils.report_models import AgentOutput, AnalysisReport, PhaseOutput
logger = logging.getLogger(__name__)
class ConditionalComprehensiveWorkflow(ComprehensiveWorkflow):
"""
Conditional comprehensive workflow with phase-based execution.
Extends ComprehensiveWorkflow to support selective phase execution
based on user configuration.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the conditional comprehensive workflow.
Args:
config: Optional configuration override
"""
super().__init__(config)
budget_config = config.get("budget_config") if config else None
self.cost_tracker = WorkflowCostTracker(budget_config=budget_config)
self.workflow = self._build_conditional_workflow()
def _validate_agent_output(self, content: str, agent_name: str) -> None:
"""
Validate agent output markdown structure and log results.
Args:
content: Agent response markdown content
agent_name: Name of the agent for logging context
"""
try:
result = validate_agent_response(content, agent_name, strict=False)
if not result.is_valid:
logger.warning(
f"Agent {agent_name} response quality below threshold: "
f"score={result.score:.1f}%, issues={len(result.issues)}"
)
for issue in result.issues[:3]: # Log first 3 issues
logger.warning(f" - {issue}")
else:
logger.debug(
f"Agent {agent_name} response validated: score={result.score:.1f}%"
)
except Exception as e:
logger.warning(f"Failed to validate {agent_name} response: {e}")
def _build_conditional_workflow(self) -> StateGraph:
"""Build the conditional LangGraph workflow with phase routing."""
workflow = StateGraph(EnhancedWorkflowState)
# Data fetching node (always runs)
workflow.add_node("fetch_data", self._fetch_all_data)
# Phase-specific nodes
workflow.add_node("technical_phase", self._run_technical_phase)
workflow.add_node("fundamental_phase", self._run_fundamental_phase)
workflow.add_node("sentiment_phase", self._run_sentiment_phase)
workflow.add_node(
"research_synthesis_phase", self._run_research_synthesis_phase
)
workflow.add_node("risk_phase", self._run_risk_phase)
workflow.add_node("decision_phase", self._run_decision_phase)
# Chart generation node (always runs)
workflow.add_node("generate_chart", self._generate_chart_enhanced)
# Finalization node (always runs)
workflow.add_node("finalize_report", self._finalize_report)
# Define workflow edges with conditional routing
workflow.set_entry_point("fetch_data")
# After data fetch, route to appropriate phases
workflow.add_conditional_edges(
"fetch_data",
self._phase_router,
{
"technical": "technical_phase",
"fundamental": "fundamental_phase",
"sentiment": "sentiment_phase",
"research_synthesis": "research_synthesis_phase",
"risk": "risk_phase",
"decision": "decision_phase",
"finalize": "generate_chart",
},
)
# After each phase, route to next phase or generate chart
for phase_node in [
"technical_phase",
"fundamental_phase",
"sentiment_phase",
"research_synthesis_phase",
"risk_phase",
"decision_phase",
]:
workflow.add_conditional_edges(
phase_node,
self._phase_router,
{
"technical": "technical_phase",
"fundamental": "fundamental_phase",
"sentiment": "sentiment_phase",
"research_synthesis": "research_synthesis_phase",
"risk": "risk_phase",
"decision": "decision_phase",
"finalize": "generate_chart",
},
)
# After chart generation, go to finalization
workflow.add_edge("generate_chart", "finalize_report")
# End after finalization
workflow.add_edge("finalize_report", END)
return workflow.compile()
def _check_budget_threshold(
self, phase_name: str, state: EnhancedWorkflowState
) -> Optional[Dict[str, Any]]:
"""
Check if budget threshold has been exceeded after a phase.
Args:
phase_name: Name of the completed phase
state: Current workflow state
Returns:
None if budget OK, or dict with budget alert info if threshold exceeded
"""
threshold_exceeded, alert_message, threshold_percent = (
self.cost_tracker.cost_tracker.check_budget_threshold()
)
if threshold_exceeded:
logger.warning(
json.dumps(
{
"event": "budget_threshold_exceeded",
"phase": phase_name,
"threshold": threshold_percent,
"total_cost": self.cost_tracker.cost_tracker.total_cost,
"message": alert_message,
}
)
)
# Add alert to state messages
return {
"budget_alert": {
"threshold": threshold_percent,
"message": alert_message,
"phase": phase_name,
"exceeded": self.cost_tracker.cost_tracker.budget_exceeded,
}
}
return None
def _phase_router(
self, state: EnhancedWorkflowState
) -> Literal[
"technical",
"fundamental",
"sentiment",
"research_synthesis",
"risk",
"decision",
"finalize",
]:
"""
Route to the next enabled phase that hasn't been completed yet.
Args:
state: Current workflow state
Returns:
Next phase to execute or "finalize" if all phases are complete
"""
enabled_phases = state.get("enabled_phases", [])
completed_phases = state.get("completed_phases", [])
# Check phase dependencies and determine next phase
for phase in enabled_phases:
if phase in completed_phases:
continue
# Check dependencies
if phase == AnalysisPhase.RESEARCH_SYNTHESIS.value:
# Research synthesis requires at least one of technical/fundamental/sentiment
required = [
AnalysisPhase.TECHNICAL.value,
AnalysisPhase.FUNDAMENTAL.value,
AnalysisPhase.SENTIMENT.value,
]
if not any(p in completed_phases for p in required):
continue
elif phase == AnalysisPhase.DECISION.value:
# Decision requires research synthesis
if AnalysisPhase.RESEARCH_SYNTHESIS.value not in completed_phases:
# If research synthesis is not enabled, skip decision dependency check
if AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases:
continue
# Return the phase to execute
return phase # type: ignore
# All phases complete, finalize
return "finalize"
def run(
self,
ticker: str,
timeframe: str = "1w",
phase_config: Optional[PhaseConfiguration] = None,
user_query: Optional[str] = None,
) -> EnhancedWorkflowState:
"""
Run conditional comprehensive analysis workflow.
Args:
ticker: Stock ticker symbol
timeframe: Analysis timeframe (default: 1w)
phase_config: Phase configuration (default: all phases enabled)
user_query: Optional user query for context
Returns:
Final state with all analysis results
"""
start_time = time.time()
# Use default configuration if not provided
if phase_config is None:
phase_config = PhaseConfiguration()
# Validate configuration
validation_errors = phase_config.validate()
if validation_errors:
raise ValueError(
f"Invalid phase configuration: {', '.join(validation_errors)}"
)
logger.info(
json.dumps(
{
"workflow": "conditional_comprehensive",
"action": "start",
"ticker": ticker,
"timeframe": timeframe,
"investment_style": phase_config.investment_style.value,
"enabled_phases": [p.value for p in phase_config.enabled_phases],
"timestamp": time.time(),
}
)
)
try:
# Convert PhaseConfiguration to dict for state
config_dict = {
"investment_style": phase_config.investment_style.value,
"enabled_phases": [p.value for p in phase_config.enabled_phases],
"timeframe": phase_config.timeframe,
"chart_period_days": phase_config.chart_period_days,
"educational_mode": phase_config.educational_mode,
}
initial_state = create_initial_enhanced_state(
ticker=ticker,
timeframe=timeframe,
config=config_dict,
user_query=user_query or "",
)
# Add cost tracker to state for agents to use
initial_state["_cost_tracker"] = self.cost_tracker
final_state = self.workflow.invoke(initial_state)
# Add cost tracking summary to final state
cost_summary = self.cost_tracker.get_summary()
final_state["cost_summary"] = cost_summary
execution_time = time.time() - start_time
logger.info(
json.dumps(
{
"workflow": "conditional_comprehensive",
"action": "complete",
"execution_time": execution_time,
"ticker": ticker,
"completed_phases": final_state.get("completed_phases", []),
"total_cost": cost_summary.get("total_cost", 0.0),
"total_tokens": cost_summary.get("total_tokens", 0),
"timestamp": time.time(),
}
)
)
return final_state
except Exception as e:
logger.error(
json.dumps(
{
"workflow": "conditional_comprehensive",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
raise
def _fetch_all_data(self, state: EnhancedWorkflowState) -> dict:
"""
Fetch all required data (market, fundamental, news).
This node always runs regardless of phase configuration.
"""
ticker = state["ticker"]
timeframe = state.get("timeframe", "1w")
logger.info(
json.dumps(
{
"phase": "data_fetch",
"action": "start",
"ticker": ticker,
"timestamp": time.time(),
}
)
)
try:
# Fetch market data (always needed)
market_data_result = self._fetch_market_data(
{"ticker": ticker, "timeframe": timeframe}
)
# Fetch fundamental data if fundamental or research synthesis is enabled
enabled_phases = state.get("enabled_phases", [])
fundamental_data = {}
if (
AnalysisPhase.FUNDAMENTAL.value in enabled_phases
or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases
):
fundamental_result = self._fetch_fundamental_data({"ticker": ticker})
fundamental_data = fundamental_result.get("fundamental_data", {})
# Fetch news data if sentiment or research synthesis is enabled
news_data = []
if (
AnalysisPhase.SENTIMENT.value in enabled_phases
or AnalysisPhase.RESEARCH_SYNTHESIS.value in enabled_phases
):
news_result = self._fetch_news_data({"ticker": ticker})
news_data = news_result.get("news_data", [])
logger.info(
json.dumps(
{
"phase": "data_fetch",
"action": "complete",
"ticker": ticker,
"timestamp": time.time(),
}
)
)
return {
"market_data": market_data_result.get("market_data", pd.DataFrame()),
"fundamental_data": fundamental_data,
"news_data": news_data,
}
except Exception as e:
logger.error(
json.dumps(
{
"phase": "data_fetch",
"action": "error",
"ticker": ticker,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {
"market_data": pd.DataFrame(),
"fundamental_data": {},
"news_data": [],
}
def _run_technical_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run technical analysis phase (Indicator, Pattern, Trend, Decision agents)."""
phase_start = time.time()
phase_name = AnalysisPhase.TECHNICAL.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run technical agents in sequence (respecting dependencies)
indicator_result = self._run_indicator_agent(state)
pattern_result = self._run_pattern_agent(state)
trend_result = self._run_trend_agent(state)
# Update state with technical results
updated_state = {
**state,
"indicator_analysis": indicator_result.get("indicator_analysis", {}),
"pattern_analysis": pattern_result.get("pattern_analysis", {}),
"trend_analysis": trend_result.get("trend_analysis", {}),
}
# Run decision agent with all technical results
decision_result = self._run_decision_agent(updated_state)
# Create phase output - extract LLM-generated messages from agent results
# Get the last message from each agent (the LLM response)
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
# Get the last message (most recent agent output)
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
content = msg.get("content", "No report generated")
# Validate agent output
self._validate_agent_output(content, agent_name)
return content
# If no matching agent message, return the last message content
last_msg = messages[-1]
if isinstance(last_msg, dict):
content = last_msg.get("content", str(last_msg))
self._validate_agent_output(content, agent_name)
return content
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
agent_outputs = [
AgentOutput(
agent_name="indicator_agent",
report=get_agent_report(indicator_result, "indicator_agent"),
charts=get_agent_charts(indicator_result, "indicator_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(indicator_result, "indicator_agent"),
),
AgentOutput(
agent_name="pattern_agent",
report=get_agent_report(pattern_result, "pattern_agent"),
charts=get_agent_charts(pattern_result, "pattern_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(pattern_result, "pattern_agent"),
),
AgentOutput(
agent_name="trend_agent",
report=get_agent_report(trend_result, "trend_agent"),
charts=get_agent_charts(trend_result, "trend_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(trend_result, "trend_agent"),
),
AgentOutput(
agent_name="decision_agent",
report=get_agent_report(decision_result, "decision_agent"),
charts=get_agent_charts(decision_result, "decision_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(decision_result, "decision_agent"),
),
]
# Calculate phase score
phase_score = calculate_technical_phase_score(updated_state)
phase_output = PhaseOutput(
phase=AnalysisPhase.TECHNICAL,
agents=agent_outputs,
phase_summary="Technical analysis phase completed",
execution_time_seconds=time.time() - phase_start,
score=phase_score,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
# Performance logging with detailed metrics
phase_execution_time = time.time() - phase_start
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": phase_execution_time,
"agent_count": 4, # Indicator, Pattern, Trend, Decision
"phase_score": phase_score,
"timestamp": time.time(),
}
)
)
# Log individual agent performance
for agent_output in agent_outputs:
logger.debug(
json.dumps(
{
"phase": phase_name,
"agent": agent_output.agent_name,
"execution_time": agent_output.execution_time_seconds,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"indicator_analysis": indicator_result.get("indicator_analysis", {}),
"pattern_analysis": pattern_result.get("pattern_analysis", {}),
"trend_analysis": trend_result.get("trend_analysis", {}),
"decision_analysis": decision_result.get("decision_analysis", {}),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
# Create error phase output instead of failing completely
error_output = PhaseOutput(
phase=AnalysisPhase.TECHNICAL,
agents=[
AgentOutput(
agent_name="TechnicalPhaseError",
report=f"⚠️ Technical phase encountered an error: {str(e)}",
execution_time_seconds=time.time() - phase_start,
)
],
score=0.0,
execution_time_seconds=time.time() - phase_start,
)
return {"current_phase": None, "phase_outputs": {phase_name: error_output}}
def _run_fundamental_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run fundamental analysis phase (Fundamentals agent only)."""
phase_start = time.time()
phase_name = AnalysisPhase.FUNDAMENTAL.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run fundamentals agent only (sentiment is in separate phase)
fundamentals_result = self._run_fundamentals_agent(state)
# Create phase output - extract LLM-generated messages
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
return msg.get("content", "No report generated")
last_msg = messages[-1]
if isinstance(last_msg, dict):
return last_msg.get("content", str(last_msg))
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
# Update state with fundamental results for scoring
updated_state = {
**state,
"fundamentals_analysis": fundamentals_result.get(
"fundamentals_analysis", {}
),
}
agent_outputs = [
AgentOutput(
agent_name="fundamentals_agent",
report=get_agent_report(fundamentals_result, "fundamentals_agent"),
charts=get_agent_charts(fundamentals_result, "fundamentals_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(
fundamentals_result, "fundamentals_agent"
),
),
]
# Calculate phase score
phase_score = calculate_fundamental_phase_score(updated_state)
phase_output = PhaseOutput(
phase=AnalysisPhase.FUNDAMENTAL,
agents=agent_outputs,
phase_summary="Fundamental analysis phase completed",
execution_time_seconds=time.time() - phase_start,
score=phase_score,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": time.time() - phase_start,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"fundamentals_analysis": fundamentals_result.get(
"fundamentals_analysis", {}
),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
# Create error phase output instead of failing completely
error_output = PhaseOutput(
phase=AnalysisPhase.FUNDAMENTAL,
agents=[
AgentOutput(
agent_name="FundamentalPhaseError",
report=f"⚠️ Fundamental phase encountered an error: {str(e)}",
execution_time_seconds=time.time() - phase_start,
)
],
score=0.0,
execution_time_seconds=time.time() - phase_start,
)
return {"current_phase": None, "phase_outputs": {phase_name: error_output}}
def _run_sentiment_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run sentiment analysis phase (Sentiment and News agents)."""
phase_start = time.time()
phase_name = AnalysisPhase.SENTIMENT.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run sentiment and news agents in parallel
sentiment_result = self._run_sentiment_agent(state)
news_result = self._run_news_agent(state)
# Create phase output - extract LLM-generated messages
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
return msg.get("content", "No report generated")
last_msg = messages[-1]
if isinstance(last_msg, dict):
return last_msg.get("content", str(last_msg))
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
# Update state with sentiment and news results for scoring
updated_state = {
**state,
"sentiment_analysis": sentiment_result.get("sentiment_analysis", {}),
"news_analysis": news_result.get("news_analysis", {}),
}
agent_outputs = [
AgentOutput(
agent_name="sentiment_agent",
report=get_agent_report(sentiment_result, "sentiment_agent"),
charts=get_agent_charts(sentiment_result, "sentiment_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(sentiment_result, "sentiment_agent"),
),
AgentOutput(
agent_name="news_agent",
report=get_agent_report(news_result, "news_agent"),
charts=get_agent_charts(news_result, "news_agent"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(news_result, "news_agent"),
),
]
# Calculate phase score
phase_score = calculate_sentiment_phase_score(updated_state)
phase_output = PhaseOutput(
phase=AnalysisPhase.SENTIMENT,
agents=agent_outputs,
phase_summary="Sentiment analysis phase completed",
execution_time_seconds=time.time() - phase_start,
score=phase_score,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": time.time() - phase_start,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"sentiment_analysis": sentiment_result.get("sentiment_analysis", {}),
"news_analysis": news_result.get("news_analysis", {}),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"current_phase": None}
def _run_research_synthesis_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run research synthesis phase (Technical Analyst, Researcher Team)."""
phase_start = time.time()
phase_name = AnalysisPhase.RESEARCH_SYNTHESIS.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run technical analyst
technical_analyst_result = self._run_technical_analyst(state)
# Run researcher team
researcher_result = self._run_researcher_team(
{**state, **technical_analyst_result}
)
# Create phase output - extract LLM-generated messages
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
return msg.get("content", "No report generated")
last_msg = messages[-1]
if isinstance(last_msg, dict):
return last_msg.get("content", str(last_msg))
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
# Update state with synthesis results for scoring
updated_state = {
**state,
"technical_analyst": technical_analyst_result.get(
"technical_analyst", {}
),
"researcher_synthesis": researcher_result.get(
"researcher_synthesis", {}
),
}
agent_outputs = [
AgentOutput(
agent_name="technical_analyst",
report=get_agent_report(
technical_analyst_result, "technical_analyst"
),
charts=get_agent_charts(
technical_analyst_result, "technical_analyst"
),
execution_time_seconds=0.0,
metadata=get_agent_metadata(
technical_analyst_result, "technical_analyst"
),
),
AgentOutput(
agent_name="researcher_team",
report=get_agent_report(researcher_result, "researcher_team"),
charts=get_agent_charts(researcher_result, "researcher_team"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(researcher_result, "researcher_team"),
),
]
# Calculate phase score
phase_score = calculate_research_synthesis_phase_score(updated_state)
phase_output = PhaseOutput(
phase=AnalysisPhase.RESEARCH_SYNTHESIS,
agents=agent_outputs,
phase_summary="Research synthesis phase completed",
execution_time_seconds=time.time() - phase_start,
score=phase_score,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": time.time() - phase_start,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"technical_analyst": technical_analyst_result.get(
"technical_analyst", {}
),
"researcher_synthesis": researcher_result.get(
"researcher_synthesis", {}
),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"current_phase": None}
def _run_risk_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run risk assessment phase (Risk Manager)."""
phase_start = time.time()
phase_name = AnalysisPhase.RISK.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run risk manager
risk_result = self._run_risk_manager(state)
# Create phase output - extract LLM-generated messages
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
return msg.get("content", "No report generated")
last_msg = messages[-1]
if isinstance(last_msg, dict):
return last_msg.get("content", str(last_msg))
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
# Update state with risk results for scoring
updated_state = {
**state,
"risk_assessment": risk_result.get("risk_assessment", {}),
}
agent_outputs = [
AgentOutput(
agent_name="risk_manager",
report=get_agent_report(risk_result, "risk_manager"),
charts=get_agent_charts(risk_result, "risk_manager"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(risk_result, "risk_manager"),
),
]
# Calculate phase score
phase_score = calculate_risk_phase_score(updated_state)
phase_output = PhaseOutput(
phase=AnalysisPhase.RISK,
agents=agent_outputs,
phase_summary="Risk assessment phase completed",
execution_time_seconds=time.time() - phase_start,
score=phase_score,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": time.time() - phase_start,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"risk_assessment": risk_result.get("risk_assessment", {}),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"current_phase": None}
def _run_decision_phase(self, state: EnhancedWorkflowState) -> dict:
"""Run final decision phase (Portfolio Manager)."""
phase_start = time.time()
phase_name = AnalysisPhase.DECISION.value
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Run portfolio manager
portfolio_result = self._run_portfolio_manager(state)
# Create phase output - extract LLM-generated messages
def get_agent_report(result: dict, agent_name: str) -> str:
"""Extract the last agent message content as the report."""
messages = result.get("messages", [])
if messages:
for msg in reversed(messages):
if (
isinstance(msg, dict)
and msg.get("agent_name") == agent_name
):
return msg.get("content", "No report generated")
last_msg = messages[-1]
if isinstance(last_msg, dict):
return last_msg.get("content", str(last_msg))
return str(last_msg)
return "No report generated"
def get_agent_charts(result: dict, agent_name: str) -> list:
"""Extract chart paths from agent metadata and convert to base64."""
import base64
from pathlib import Path
messages = result.get("messages", [])
if not messages:
return []
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
metadata = msg.get("metadata", {})
if not metadata:
continue
# Get chart paths
chart_paths = metadata.get("charts") or metadata.get(
"chart_paths", []
)
if not chart_paths:
continue
# Convert file paths to base64-encoded images
base64_charts = []
for chart_path in chart_paths:
try:
chart_file = Path(chart_path)
if chart_file.exists():
with open(chart_file, "rb") as f:
chart_data = f.read()
base64_str = base64.b64encode(chart_data).decode(
"utf-8"
)
base64_charts.append(base64_str)
except Exception as e:
logger.warning(
f"Failed to encode chart {chart_path}: {str(e)}"
)
continue
return base64_charts
return []
def get_agent_metadata(result: dict, agent_name: str) -> dict:
"""Extract metadata from agent messages."""
messages = result.get("messages", [])
if not messages:
return {}
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("agent_name") == agent_name:
return msg.get("metadata", {})
return {}
agent_outputs = [
AgentOutput(
agent_name="portfolio_manager",
report=get_agent_report(portfolio_result, "portfolio_manager"),
charts=get_agent_charts(portfolio_result, "portfolio_manager"),
execution_time_seconds=0.0,
metadata=get_agent_metadata(portfolio_result, "portfolio_manager"),
),
]
phase_output = PhaseOutput(
phase=AnalysisPhase.DECISION,
agents=agent_outputs,
phase_summary="Decision phase completed",
execution_time_seconds=time.time() - phase_start,
)
# Update completed phases and phase outputs
completed_phases = state.get("completed_phases", []) + [phase_name]
phase_outputs = state.get("phase_outputs", {})
phase_outputs[phase_name] = phase_output
logger.info(
json.dumps(
{
"phase": phase_name,
"action": "complete",
"ticker": state["ticker"],
"execution_time": time.time() - phase_start,
"timestamp": time.time(),
}
)
)
# Check budget threshold after phase completion
result = {
"portfolio_decision": portfolio_result.get("portfolio_decision", {}),
"final_decision": portfolio_result.get("portfolio_decision", {})
.get("decision", {})
.get("recommendation", "HOLD"),
"completed_phases": completed_phases,
"phase_outputs": phase_outputs,
"current_phase": None,
}
budget_alert = self._check_budget_threshold(phase_name, state)
if budget_alert:
result.update(budget_alert)
return result
except Exception as e:
logger.error(
json.dumps(
{
"phase": phase_name,
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {"current_phase": None}
def _generate_chart_enhanced(self, state: EnhancedWorkflowState) -> dict:
"""
Generate candlestick chart for the analysis.
Args:
state: Current enhanced workflow state
Returns:
Dict with chart_path
"""
try:
market_data = state.get("market_data")
if market_data is None or market_data.empty:
logger.warning("No market data available for chart generation")
return {"chart_path": ""}
# Generate candlestick chart
fig, chart_path = self.chart_generator.generate_candlestick_chart(
df=market_data,
ticker=state["ticker"],
timeframe=state.get("timeframe", "1d"),
title=f"{state['ticker']} - {state.get('timeframe', '1d')} Analysis",
)
logger.info(f"Chart generated: {chart_path}")
return {"chart_path": chart_path or ""}
except Exception as e:
logger.error(f"Chart generation failed: {str(e)}")
return {"chart_path": ""}
def _finalize_report(self, state: EnhancedWorkflowState) -> dict:
"""
Finalize the analysis report.
Collects all phase outputs and creates the final AnalysisReport.
"""
logger.info(
json.dumps(
{
"phase": "finalization",
"action": "start",
"ticker": state["ticker"],
"timestamp": time.time(),
}
)
)
try:
# Create AnalysisReport from phase outputs
phase_outputs_dict = state.get("phase_outputs", {})
phase_outputs_list = [
phase_outputs_dict[phase]
for phase in state.get("completed_phases", [])
if phase in phase_outputs_dict
]
config_dict = state.get("config", {})
investment_style = InvestmentStyle(
config_dict.get("investment_style", InvestmentStyle.LONG_TERM.value)
)
# Get cost summary from cost tracker if available
cost_summary = None
if self.cost_tracker:
cost_summary = self.cost_tracker.get_summary()
analysis_report = AnalysisReport(
ticker=state["ticker"],
timeframe=state.get("timeframe", "1w"),
investment_style=investment_style,
phases=phase_outputs_list,
cost_summary=cost_summary,
)
logger.info(
json.dumps(
{
"phase": "finalization",
"action": "complete",
"ticker": state["ticker"],
"phases_completed": len(phase_outputs_list),
"timestamp": time.time(),
}
)
)
return {
"analysis_report": analysis_report.to_dict(),
"executive_summary": self._generate_executive_summary(
state, analysis_report
),
}
except Exception as e:
logger.error(
json.dumps(
{
"phase": "finalization",
"action": "error",
"ticker": state["ticker"],
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time(),
}
)
)
return {}
def _generate_executive_summary(
self, state: EnhancedWorkflowState, report: AnalysisReport
) -> str:
"""
Generate executive summary from analysis report.
Args:
state: Current workflow state
report: Completed analysis report
Returns:
Executive summary text
"""
summary_parts = [
f"# Executive Summary: {report.ticker}",
f"**Timeframe**: {report.timeframe}",
f"**Investment Style**: {report.investment_style.value.replace('_', ' ').title()}",
f"**Phases Completed**: {len(report.phases)}",
"",
]
# Add final decision if available
final_decision = state.get("final_decision")
if final_decision:
summary_parts.append(f"**Final Recommendation**: {final_decision}")
summary_parts.append("")
# Add phase summaries
summary_parts.append("## Phase Summaries")
for phase_output in report.phases:
summary_parts.append(
f"- **{phase_output.phase.value.replace('_', ' ').title()}**: "
f"{phase_output.phase_summary} ({len(phase_output.agents)} agents)"
)
return "\n".join(summary_parts)