Aarya003's picture
Upload folder using huggingface_hub
4464b01 verified
import asyncio
import logging
import json
from typing import Literal, TypeAlias
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from beeai_framework.workflows import Workflow, WorkflowReservedStepName
from src.orchestration.schemas import UserContext, AnalystThesis, FinalReport
from src.agents.bull_agent import create_bull_agent
from src.agents.bear_agent import create_bear_agent
from src.agents.cio_agent import create_cio_agent
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
# Define the explicit steps in our Agentic Reflection Loop
WorkflowStep: TypeAlias = Literal["drafting", "critique", "revision", "judgment"]
class TradingState(BaseModel):
"""The global state object passed between agents during the workflow."""
context: UserContext
simulated_date: str | None = Field(default=None, description="The point-in-time cutoff (YYYY-MM-DD) for backtesting.")
# Iteration 0
bull_draft: str | None = Field(default=None, description="Draft 1 from the Bull")
bear_draft: str | None = Field(default=None, description="Draft 1 from the Bear")
# Iteration 1
cio_critique: str | None = Field(default=None, description="CIO's critique of the drafts")
# Iteration 2
bull_revision: str | None = Field(default=None, description="Revised Bull Thesis")
bear_revision: str | None = Field(default=None, description="Revised Bear Thesis")
# Final Output
final_report: str | None = Field(default=None, description="The final synthesized report from the CIO")
from src.tools.openbb_fetcher import (
get_momentum_indicators,
get_volatility_indicators,
get_growth_metrics,
get_risk_metrics,
get_insider_activity,
get_short_interest_data,
get_analyst_upgrades
)
async def step_drafting(state: TradingState) -> WorkflowStep:
"""Step 1: Bull and Bear independently draft their initial theses with pre-fetched data."""
logger.info(f"--- Phase: Drafting (Iteration 0) for {state.context.ticker} ---")
ticker = state.context.ticker
# Sync simulated_date to context for agents
state.context.simulated_date = state.simulated_date
end_date = state.simulated_date
# Pre-fetch all data with simulated end_date if provided
logger.info(f"Pre-fetching data for {ticker} (End Date: {end_date})...")
data_packet = {
"momentum": get_momentum_indicators(ticker, end_date=end_date),
"volatility": get_volatility_indicators(ticker, end_date=end_date),
"growth": get_growth_metrics(ticker, end_date=end_date),
"risk": get_risk_metrics(ticker, end_date=end_date),
"insider": get_insider_activity(ticker, end_date=end_date),
"short_interest": get_short_interest_data(ticker, end_date=end_date),
"analyst_recs": get_analyst_upgrades(ticker, end_date=end_date)
}
bull_agent = create_bull_agent(state.context)
bear_agent = create_bear_agent(state.context)
# Update chart with end_date
from src.tools.chart_generator import generate_multimodal_chart
generate_multimodal_chart(ticker, timeframe="daily", end_date=end_date)
data_str = json.dumps(data_packet, indent=2)
prompt = f"""
Analyze the following pre-fetched data and chart for {ticker}.
Current Simulated Date: {end_date if end_date else "Present Day"}
Generate your initial AnalystThesis based on your persona instructions.
[MARKET DATA]
{data_str}
Note: A candlestick chart image has been generated in data/{ticker}_daily_chart.png.
"""
logger.info("Triggering Bull and Bear analysts in parallel...")
# Parallel execution using expected_output for schema enforcement
bull_task = bull_agent.run(prompt, max_iterations=10, expected_output=AnalystThesis)
bear_task = bear_agent.run(prompt, max_iterations=10, expected_output=AnalystThesis)
bull_resp, bear_resp = await asyncio.gather(bull_task, bear_task)
state.bull_draft = bull_resp.last_message.text
state.bear_draft = bear_resp.last_message.text
logger.info(f"Bull Draft: {state.bull_draft[:500]}...")
logger.info(f"Bear Draft: {state.bear_draft[:500]}...")
logger.info("Drafting complete. Moving to Critique.")
return "critique"
async def step_critique(state: TradingState) -> WorkflowStep:
"""Step 2: CIO reviews the drafts and acts as the 'Red Team'."""
logger.info("--- Phase: Critique (Iteration 1) ---")
cio_agent = create_cio_agent(state.context)
prompt = f"""
Review the following initial drafts from your analysts.
[BULL DRAFT]
{state.bull_draft}
[BEAR DRAFT]
{state.bear_draft}
Act as the Red Team. Attack their key vulnerabilities, flag unconfirmed volume,
and provide explicit feedback for their required revisions. Do NOT output the Final Report yet.
"""
logger.info("Triggering CIO for critique...")
resp = await cio_agent.run(prompt, max_iterations=15)
state.cio_critique = resp.last_message.text
logger.info("CIO critique generated. Moving to Revision.")
return "revision"
async def step_revision(state: TradingState) -> WorkflowStep:
"""Step 3: Analysts revise their theses based on CIO feedback."""
logger.info("Starting Phase: Revision (Iteration 2)")
bull_agent = create_bull_agent(state.context)
bear_agent = create_bear_agent(state.context)
bull_prompt = f"Your original thesis:\n{state.bull_draft}\n\nCIO Feedback:\n{state.cio_critique}\n\nPlease revise your AnalystThesis."
bear_prompt = f"Your original thesis:\n{state.bear_draft}\n\nCIO Feedback:\n{state.cio_critique}\n\nPlease revise your AnalystThesis."
logger.info("Triggering Analyst revisions in parallel...")
bull_task = bull_agent.run(bull_prompt, max_iterations=10, expected_output=AnalystThesis)
bear_task = bear_agent.run(bear_prompt, max_iterations=10, expected_output=AnalystThesis)
bull_resp, bear_resp = await asyncio.gather(bull_task, bear_task)
state.bull_revision = bull_resp.last_message.text
state.bear_revision = bear_resp.last_message.text
logger.info("Revisions complete. Moving to Final Judgment.")
return "judgment"
async def step_judgment(state: TradingState) -> WorkflowReservedStepName:
"""Step 4: CIO enforces rules, calculates exact targets, and outputs Final Report."""
logger.info("Starting Phase: Final Judgment")
cio_agent = create_cio_agent(state.context)
prompt = f"""
Here are the revised theses from your analysts:
[REVISED BULL THESIS]
{state.bull_revision}
[REVISED BEAR THESIS]
{state.bear_revision}
You must now execute your quantitative guardrails:
1. Check Market Regime (SPY 200 SMA).
2. Check Liquidity Gate.
3. Run Calculate Risk/Reward math.
Output the final, structured FinalReport.
"""
resp = await cio_agent.run(prompt, max_iterations=15, expected_output=FinalReport)
state.final_report = resp.last_message.text
logger.info("Workflow Complete.")
return Workflow.END
def create_trading_workflow() -> Workflow:
"""Instantiates the stateful routing graph for the Multi-Agent Council."""
workflow = Workflow[TradingState, WorkflowStep](name="TradingCouncilWorkflow", schema=TradingState)
# Register the steps
workflow.add_step("drafting", step_drafting)
workflow.add_step("critique", step_critique)
workflow.add_step("revision", step_revision)
workflow.add_step("judgment", step_judgment)
return workflow