Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import json | |
| from typing import Literal, TypeAlias | |
| from pydantic import BaseModel, Field | |
| from dotenv import load_dotenv | |
| from beeai_framework.workflows import Workflow, WorkflowReservedStepName | |
| from src.orchestration.schemas import UserContext, AnalystThesis, FinalReport | |
| from src.agents.bull_agent import create_bull_agent | |
| from src.agents.bear_agent import create_bear_agent | |
| from src.agents.cio_agent import create_cio_agent | |
| # Load environment variables | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| # Define the explicit steps in our Agentic Reflection Loop | |
| WorkflowStep: TypeAlias = Literal["drafting", "critique", "revision", "judgment"] | |
| class TradingState(BaseModel): | |
| """The global state object passed between agents during the workflow.""" | |
| context: UserContext | |
| simulated_date: str | None = Field(default=None, description="The point-in-time cutoff (YYYY-MM-DD) for backtesting.") | |
| # Iteration 0 | |
| bull_draft: str | None = Field(default=None, description="Draft 1 from the Bull") | |
| bear_draft: str | None = Field(default=None, description="Draft 1 from the Bear") | |
| # Iteration 1 | |
| cio_critique: str | None = Field(default=None, description="CIO's critique of the drafts") | |
| # Iteration 2 | |
| bull_revision: str | None = Field(default=None, description="Revised Bull Thesis") | |
| bear_revision: str | None = Field(default=None, description="Revised Bear Thesis") | |
| # Final Output | |
| final_report: str | None = Field(default=None, description="The final synthesized report from the CIO") | |
| from src.tools.openbb_fetcher import ( | |
| get_momentum_indicators, | |
| get_volatility_indicators, | |
| get_growth_metrics, | |
| get_risk_metrics, | |
| get_insider_activity, | |
| get_short_interest_data, | |
| get_analyst_upgrades | |
| ) | |
| async def step_drafting(state: TradingState) -> WorkflowStep: | |
| """Step 1: Bull and Bear independently draft their initial theses with pre-fetched data.""" | |
| logger.info(f"--- Phase: Drafting (Iteration 0) for {state.context.ticker} ---") | |
| ticker = state.context.ticker | |
| # Sync simulated_date to context for agents | |
| state.context.simulated_date = state.simulated_date | |
| end_date = state.simulated_date | |
| # Pre-fetch all data with simulated end_date if provided | |
| logger.info(f"Pre-fetching data for {ticker} (End Date: {end_date})...") | |
| data_packet = { | |
| "momentum": get_momentum_indicators(ticker, end_date=end_date), | |
| "volatility": get_volatility_indicators(ticker, end_date=end_date), | |
| "growth": get_growth_metrics(ticker, end_date=end_date), | |
| "risk": get_risk_metrics(ticker, end_date=end_date), | |
| "insider": get_insider_activity(ticker, end_date=end_date), | |
| "short_interest": get_short_interest_data(ticker, end_date=end_date), | |
| "analyst_recs": get_analyst_upgrades(ticker, end_date=end_date) | |
| } | |
| bull_agent = create_bull_agent(state.context) | |
| bear_agent = create_bear_agent(state.context) | |
| # Update chart with end_date | |
| from src.tools.chart_generator import generate_multimodal_chart | |
| generate_multimodal_chart(ticker, timeframe="daily", end_date=end_date) | |
| data_str = json.dumps(data_packet, indent=2) | |
| prompt = f""" | |
| Analyze the following pre-fetched data and chart for {ticker}. | |
| Current Simulated Date: {end_date if end_date else "Present Day"} | |
| Generate your initial AnalystThesis based on your persona instructions. | |
| [MARKET DATA] | |
| {data_str} | |
| Note: A candlestick chart image has been generated in data/{ticker}_daily_chart.png. | |
| """ | |
| logger.info("Triggering Bull and Bear analysts in parallel...") | |
| # Parallel execution using expected_output for schema enforcement | |
| bull_task = bull_agent.run(prompt, max_iterations=10, expected_output=AnalystThesis) | |
| bear_task = bear_agent.run(prompt, max_iterations=10, expected_output=AnalystThesis) | |
| bull_resp, bear_resp = await asyncio.gather(bull_task, bear_task) | |
| state.bull_draft = bull_resp.last_message.text | |
| state.bear_draft = bear_resp.last_message.text | |
| logger.info(f"Bull Draft: {state.bull_draft[:500]}...") | |
| logger.info(f"Bear Draft: {state.bear_draft[:500]}...") | |
| logger.info("Drafting complete. Moving to Critique.") | |
| return "critique" | |
| async def step_critique(state: TradingState) -> WorkflowStep: | |
| """Step 2: CIO reviews the drafts and acts as the 'Red Team'.""" | |
| logger.info("--- Phase: Critique (Iteration 1) ---") | |
| cio_agent = create_cio_agent(state.context) | |
| prompt = f""" | |
| Review the following initial drafts from your analysts. | |
| [BULL DRAFT] | |
| {state.bull_draft} | |
| [BEAR DRAFT] | |
| {state.bear_draft} | |
| Act as the Red Team. Attack their key vulnerabilities, flag unconfirmed volume, | |
| and provide explicit feedback for their required revisions. Do NOT output the Final Report yet. | |
| """ | |
| logger.info("Triggering CIO for critique...") | |
| resp = await cio_agent.run(prompt, max_iterations=15) | |
| state.cio_critique = resp.last_message.text | |
| logger.info("CIO critique generated. Moving to Revision.") | |
| return "revision" | |
| async def step_revision(state: TradingState) -> WorkflowStep: | |
| """Step 3: Analysts revise their theses based on CIO feedback.""" | |
| logger.info("Starting Phase: Revision (Iteration 2)") | |
| bull_agent = create_bull_agent(state.context) | |
| bear_agent = create_bear_agent(state.context) | |
| bull_prompt = f"Your original thesis:\n{state.bull_draft}\n\nCIO Feedback:\n{state.cio_critique}\n\nPlease revise your AnalystThesis." | |
| bear_prompt = f"Your original thesis:\n{state.bear_draft}\n\nCIO Feedback:\n{state.cio_critique}\n\nPlease revise your AnalystThesis." | |
| logger.info("Triggering Analyst revisions in parallel...") | |
| bull_task = bull_agent.run(bull_prompt, max_iterations=10, expected_output=AnalystThesis) | |
| bear_task = bear_agent.run(bear_prompt, max_iterations=10, expected_output=AnalystThesis) | |
| bull_resp, bear_resp = await asyncio.gather(bull_task, bear_task) | |
| state.bull_revision = bull_resp.last_message.text | |
| state.bear_revision = bear_resp.last_message.text | |
| logger.info("Revisions complete. Moving to Final Judgment.") | |
| return "judgment" | |
| async def step_judgment(state: TradingState) -> WorkflowReservedStepName: | |
| """Step 4: CIO enforces rules, calculates exact targets, and outputs Final Report.""" | |
| logger.info("Starting Phase: Final Judgment") | |
| cio_agent = create_cio_agent(state.context) | |
| prompt = f""" | |
| Here are the revised theses from your analysts: | |
| [REVISED BULL THESIS] | |
| {state.bull_revision} | |
| [REVISED BEAR THESIS] | |
| {state.bear_revision} | |
| You must now execute your quantitative guardrails: | |
| 1. Check Market Regime (SPY 200 SMA). | |
| 2. Check Liquidity Gate. | |
| 3. Run Calculate Risk/Reward math. | |
| Output the final, structured FinalReport. | |
| """ | |
| resp = await cio_agent.run(prompt, max_iterations=15, expected_output=FinalReport) | |
| state.final_report = resp.last_message.text | |
| logger.info("Workflow Complete.") | |
| return Workflow.END | |
| def create_trading_workflow() -> Workflow: | |
| """Instantiates the stateful routing graph for the Multi-Agent Council.""" | |
| workflow = Workflow[TradingState, WorkflowStep](name="TradingCouncilWorkflow", schema=TradingState) | |
| # Register the steps | |
| workflow.add_step("drafting", step_drafting) | |
| workflow.add_step("critique", step_critique) | |
| workflow.add_step("revision", step_revision) | |
| workflow.add_step("judgment", step_judgment) | |
| return workflow | |