"""Multi-Agent Financial Analysis Pipeline Subagents: 1. Orchestrator - Plans and delegates tasks 2. Data Quant - Fetches raw financial data 3. Analyst - Runs quantitative analysis (sandbox code execution) 4. Publisher - Generates artifacts (PPTX, XLSX) 5. UI Mapper - Transforms data for React charts """ import contextlib import io import json import logging import re from datetime import datetime from typing import Any, Generator from .claude_agent_service import call_claude_simple as _call_llm from .tools import ( fetch_breaking_news_data, fetch_market_benchmarks_data, fetch_stock_news_data, ) logger = logging.getLogger(__name__) # ------------------------------------------------------------------ # # SSE Helper # # ------------------------------------------------------------------ # def sse_event(event_type: str, data: dict) -> str: payload = json.dumps(data, default=str) return f"event: {event_type}\ndata: {payload}\n\n" def _parse_json(text: str) -> dict | None: body = (text or "").strip() if not body: return None try: parsed = json.loads(body) if isinstance(parsed, dict): return parsed except json.JSONDecodeError: pass start, end = body.find("{"), body.rfind("}") if start != -1 and end > start: try: parsed = json.loads(body[start : end + 1]) if isinstance(parsed, dict): return parsed except json.JSONDecodeError: pass return None # ------------------------------------------------------------------ # # Tool Functions # # ------------------------------------------------------------------ # def get_market_data(ticker: str, period: str = "3mo", interval: str = "1d") -> dict: try: import yfinance as yf except ImportError: return {"error": "yfinance not installed", "ticker": ticker} try: stock = yf.Ticker(ticker) hist = stock.history(period=period, interval=interval) if hist.empty: return {"error": f"No data for {ticker}", "ticker": ticker} records = [] for date, row in hist.iterrows(): records.append({ "date": date.strftime("%Y-%m-%d"), "open": round(row["Open"], 2), "high": round(row["High"], 2), "low": round(row["Low"], 2), "close": round(row["Close"], 2), "volume": int(row["Volume"]), }) info = stock.info return { "ticker": ticker, "period": period, "records": records[-60:], "company_name": info.get("shortName", ticker), "sector": info.get("sector", ""), "industry": info.get("industry", ""), "market_cap": info.get("marketCap"), "current_price": info.get("currentPrice") or info.get("regularMarketPrice"), "pe_ratio": info.get("trailingPE"), "forward_pe": info.get("forwardPE"), "dividend_yield": info.get("dividendYield"), "52w_high": info.get("fiftyTwoWeekHigh"), "52w_low": info.get("fiftyTwoWeekLow"), "beta": info.get("beta"), "revenue": info.get("totalRevenue"), "profit_margin": info.get("profitMargins"), } except Exception as exc: return {"error": str(exc), "ticker": ticker} def get_financial_statements(ticker: str, statement_type: str = "income") -> dict: """Fetch financial statements using yfinance (income, balance sheet, cash flow).""" try: import yfinance as yf stock = yf.Ticker(ticker) df_map = {"income": stock.financials, "balance": stock.balance_sheet, "cashflow": stock.cashflow} df = df_map.get(statement_type) if df is None or df.empty: return {"error": f"No {statement_type} data", "ticker": ticker} records = {} for col in list(df.columns)[:4]: period_key = col.strftime("%Y-%m-%d") if hasattr(col, "strftime") else str(col) records[period_key] = { str(idx): (float(df.loc[idx, col]) if df.loc[idx, col] == df.loc[idx, col] else None) for idx in df.index } return {"ticker": ticker, "statement_type": statement_type, "statements": records} except ImportError: return {"error": "yfinance not installed", "ticker": ticker} except Exception as exc: return {"error": str(exc), "ticker": ticker} def execute_sandbox_code(python_code: str) -> dict: allowed_builtins = { "abs": abs, "round": round, "min": min, "max": max, "sum": sum, "len": len, "range": range, "enumerate": enumerate, "zip": zip, "sorted": sorted, "list": list, "dict": dict, "tuple": tuple, "set": set, "str": str, "int": int, "float": float, "bool": bool, "print": print, "isinstance": isinstance, "map": map, "filter": filter, "any": any, "all": all, "__import__": __import__, } safe_globals: dict[str, Any] = {"__builtins__": allowed_builtins} try: import statistics safe_globals["statistics"] = statistics except ImportError: pass try: import math safe_globals["math"] = math except ImportError: pass result_store: dict[str, Any] = {} safe_globals["_result"] = result_store output_buffer = io.StringIO() try: with contextlib.redirect_stdout(output_buffer): exec(python_code, safe_globals) # noqa: S102 return {"success": True, "stdout": output_buffer.getvalue()[:5000], "result": result_store or None} except Exception as exc: return {"success": False, "error": str(exc), "stdout": output_buffer.getvalue()[:2000]} # ------------------------------------------------------------------ # # Subagent System Prompts # # ------------------------------------------------------------------ # _ORCHESTRATOR_PROMPT = """\ You are a lead portfolio manager. Break the user's request into a step-by-step plan \ and identify what data, analysis, and artifacts are needed. Available statistical models (pick the most relevant): moving_averages, rsi, macd, bollinger_bands, monte_carlo, dcf, beta_capm, risk_metrics, correlation, regression Return ONLY valid JSON (no markdown fences): { "plan_summary": "1-2 sentence overview", "tickers": ["AAPL"], "data_needs": { "market_data": true, "financial_statements": false, "statement_types": ["income"], "news": true, "benchmarks": true }, "artifacts_needed": {"slides": true, "excel": true}, "models_to_run": ["moving_averages", "rsi", "macd", "risk_metrics"], "analysis_requirements": ["moving averages", "volatility", "returns"], "period": "3mo" }""" _UI_MAPPER_PROMPT = """\ You are a frontend data engineer. Transform analysis results into React chart configs. Return ONLY valid JSON (no markdown fences): { "charts": [{"id": "str", "title": "str", "chartType": "line|bar|pie", "data": [{"name": "x", "value": 0}], "xKey": "name", "yKeys": ["value"], "colors": ["#E8440D"]}], "metrics": [{"label": "str", "value": "str", "change": "str", "trend": "up|down|neutral"}], "summary": "Brief dashboard description" }""" # ------------------------------------------------------------------ # # Chart / Artifact Helpers # # ------------------------------------------------------------------ # def _build_base_charts(ticker: str, records: list[dict]) -> list[dict]: charts = [] if not records: return charts tail = records[-30:] charts.append({ "id": "price_trend", "title": f"{ticker} Price Trend", "chartType": "line", "data": [{"date": r["date"], "close": r["close"]} for r in tail], "xKey": "date", "yKeys": ["close"], "colors": ["#E8440D"], }) charts.append({ "id": "volume", "title": f"{ticker} Trading Volume", "chartType": "bar", "data": [{"date": r["date"], "volume": r["volume"]} for r in tail[-20:]], "xKey": "date", "yKeys": ["volume"], "colors": ["#2563EB"], }) return charts def _build_base_metrics(data: dict) -> list[dict]: metrics = [] price = data.get("current_price") if price and isinstance(price, (int, float)): metrics.append({"label": "Current Price", "value": f"${price:,.2f}", "change": "", "trend": "neutral"}) pe = data.get("pe_ratio") if pe and isinstance(pe, (int, float)): metrics.append({"label": "P/E Ratio", "value": f"{pe:.1f}", "change": "", "trend": "neutral"}) mc = data.get("market_cap") if mc and isinstance(mc, (int, float)): if mc >= 1e12: ms = f"${mc / 1e12:.2f}T" elif mc >= 1e9: ms = f"${mc / 1e9:.2f}B" else: ms = f"${mc / 1e6:.0f}M" metrics.append({"label": "Market Cap", "value": ms, "change": "", "trend": "neutral"}) hi = data.get("52w_high") lo = data.get("52w_low") if hi and isinstance(hi, (int, float)): metrics.append({"label": "52-Week High", "value": f"${hi:,.2f}", "change": "", "trend": "neutral"}) if lo and isinstance(lo, (int, float)): metrics.append({"label": "52-Week Low", "value": f"${lo:,.2f}", "change": "", "trend": "neutral"}) return metrics def _build_excel_artifact(data: dict, analysis: dict, title: str) -> dict: sheets = [] records = data.get("records", []) if records: cols = list(records[0].keys()) sheets.append({ "name": "Market Data", "columns": cols, "rows": [[row.get(c, "") for c in cols] for row in records[:50]], }) analysis_rows = [ [str(k), str(v)] for k, v in analysis.items() if k != "_stdout" and not isinstance(v, (dict, list)) ] if analysis_rows: sheets.append({"name": "Analysis", "columns": ["Metric", "Value"], "rows": analysis_rows}) if not sheets: summary = {k: v for k, v in data.items() if not isinstance(v, (dict, list))} sheets.append({ "name": "Summary", "columns": ["Field", "Value"], "rows": [[str(k), str(v)] for k, v in summary.items()][:20], }) return {"title": title, "sheets": sheets, "formulas": [], "insights": []} # ------------------------------------------------------------------ # # Pipeline # # ------------------------------------------------------------------ # def run_multi_agent_pipeline( user_prompt: str, data_context: dict[str, Any] | None = None, ) -> Generator[str, None, None]: """Yield SSE events as the 5-agent pipeline executes.""" pipeline_id = f"p_{int(datetime.now().timestamp() * 1000)}" default_ticker = (data_context or {}).get("ticker", "AAPL") if isinstance(data_context, dict) else "AAPL" collected_data: dict[str, Any] = {} analysis_results: dict[str, Any] = {} artifacts: dict[str, Any] = {} chart_configs: list[dict] = [] metrics: list[dict] = [] # -------------------------------------------------------------- # # 1. Orchestrator # # -------------------------------------------------------------- # yield sse_event("agent_start", { "pipeline_id": pipeline_id, "agent": "orchestrator", "agent_label": "The Orchestrator", "status": "running", "message": "Breaking down your request into an execution plan...", }) ctx_str = "" if data_context: ctx_str = f"\n\nSession context: {json.dumps(data_context, default=str)[:1500]}" try: plan_text = _call_llm(_ORCHESTRATOR_PROMPT, f"User request: {user_prompt}{ctx_str}") plan = _parse_json(plan_text) except Exception as exc: logger.error("Orchestrator LLM call failed: %s", exc) plan = None if not plan: plan = { "plan_summary": f"Analyze {default_ticker} based on user request", "tickers": [default_ticker], "data_needs": {"market_data": True, "financial_statements": True, "statement_types": ["income"], "news": True, "benchmarks": True}, "artifacts_needed": {"slides": True, "excel": True}, "analysis_requirements": ["price trend", "moving averages", "volatility", "key ratios"], "period": "3mo", } tickers = plan.get("tickers") or [default_ticker] if not tickers: tickers = [default_ticker] yield sse_event("agent_complete", { "pipeline_id": pipeline_id, "agent": "orchestrator", "agent_label": "The Orchestrator", "status": "completed", "message": plan.get("plan_summary", "Plan created."), "result": {"plan": plan, "tickers": tickers}, }) # -------------------------------------------------------------- # # 2. Data Quant # # -------------------------------------------------------------- # yield sse_event("agent_start", { "pipeline_id": pipeline_id, "agent": "data_quant", "agent_label": "The Data Quant", "status": "running", "message": f"Fetching market data for {', '.join(tickers[:3])}...", }) data_needs = plan.get("data_needs", {}) period = plan.get("period", "3mo") try: for tkr in tickers[:3]: if data_needs.get("market_data", True): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "data_quant", "message": f"Fetching OHLCV data for {tkr}...", }) collected_data[f"{tkr}_market"] = get_market_data(tkr, period=period) if data_needs.get("financial_statements"): for st in data_needs.get("statement_types", ["income"]): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "data_quant", "message": f"Fetching {st} statement for {tkr}...", }) collected_data[f"{tkr}_{st}"] = get_financial_statements(tkr, st) if data_needs.get("news", True): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "data_quant", "message": "Fetching market news...", }) try: collected_data["stock_news"] = fetch_stock_news_data(tickers[0]) except Exception: collected_data["stock_news"] = "No news available" try: collected_data["breaking_news"] = fetch_breaking_news_data() except Exception: collected_data["breaking_news"] = "No breaking news" if data_needs.get("benchmarks", True): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "data_quant", "message": "Fetching benchmarks...", }) try: collected_data["benchmarks"] = fetch_market_benchmarks_data() except Exception: collected_data["benchmarks"] = "No benchmarks" total_points = sum( len(v.get("records", [])) if isinstance(v, dict) else 0 for v in collected_data.values() ) yield sse_event("agent_complete", { "pipeline_id": pipeline_id, "agent": "data_quant", "agent_label": "The Data Quant", "status": "completed", "message": f"Retrieved {total_points} data points from {len(collected_data)} sources.", "result": {"data_points": total_points, "sources": len(collected_data)}, }) except Exception as exc: logger.error("Data Quant failed: %s", exc) yield sse_event("agent_error", { "pipeline_id": pipeline_id, "agent": "data_quant", "message": f"Data retrieval error: {exc}", }) # -------------------------------------------------------------- # # 3. Analyst — run pre-built statistical models # # -------------------------------------------------------------- # yield sse_event("agent_start", { "pipeline_id": pipeline_id, "agent": "analyst", "agent_label": "The Analyst", "status": "running", "message": "Running quantitative models...", }) primary = tickers[0] primary_data = collected_data.get(f"{primary}_market", {}) records = primary_data.get("records", []) from .financial_models import run_model, ALL_MODELS default_models = ["moving_averages", "rsi", "macd", "bollinger_bands", "risk_metrics", "regression"] ui_requested = (data_context or {}).get("requested_models") if isinstance(data_context, dict) else None requested_models = ui_requested or plan.get("models_to_run", default_models) if not requested_models: requested_models = default_models valid_models = [m for m in requested_models if m in ALL_MODELS] if not valid_models: valid_models = default_models model_results: dict[str, Any] = {} model_period = plan.get("period", "6mo") try: for model_name in valid_models: yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "analyst", "message": f"Running {model_name} on {primary}...", }) kwargs = {"period": model_period} if model_name == "correlation" and len(tickers) >= 2: kwargs["tickers"] = tickers[:5] res = run_model(model_name, primary, **kwargs) model_results[model_name] = res analysis_results = {} all_signals: list[str] = [] all_interpretations: list[str] = [] for name, res in model_results.items(): if "error" in res: analysis_results[f"{name}_error"] = res["error"] continue for k, v in (res.get("metrics") or {}).items(): analysis_results[f"{name}_{k}"] = v all_signals.extend(res.get("signals", [])) interp = res.get("interpretation", "") if interp: all_interpretations.append(interp) analysis_results["_signals"] = all_signals analysis_results["_interpretations"] = all_interpretations analysis_results["_model_results"] = model_results analysis_results["_models_run"] = list(model_results.keys()) yield sse_event("agent_complete", { "pipeline_id": pipeline_id, "agent": "analyst", "agent_label": "The Analyst", "status": "completed", "message": ( f"Analysis complete — {len(valid_models)} models, " f"{len(analysis_results)} metrics, {len(all_signals)} signals." ), "result": { "models_run": list(model_results.keys()), "metrics_count": len(analysis_results), "signals": all_signals[:8], }, }) except Exception as exc: logger.error("Analyst failed: %s", exc) analysis_results = {"error": str(exc)} yield sse_event("agent_error", { "pipeline_id": pipeline_id, "agent": "analyst", "message": f"Analysis error: {exc}", }) # -------------------------------------------------------------- # # 4. Publisher # # -------------------------------------------------------------- # yield sse_event("agent_start", { "pipeline_id": pipeline_id, "agent": "publisher", "agent_label": "The Publisher", "status": "running", "message": "Generating artifacts...", }) artifacts_needed = plan.get("artifacts_needed", {}) try: if artifacts_needed.get("slides", True): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "publisher", "message": "Creating slide deck...", }) from .agent_tools_service import SLIDES_ARTIFACT_SCHEMA, SLIDES_INSTRUCTION pub_context = { "ticker": primary, "company": primary_data.get("company_name", primary), "price": primary_data.get("current_price"), "pe": primary_data.get("pe_ratio"), "market_cap": primary_data.get("market_cap"), "analysis": { k: v for k, v in analysis_results.items() if k != "_stdout" and not isinstance(v, list) }, "news": str(collected_data.get("stock_news", ""))[:800], "request": user_prompt, } slides_prompt = ( f"{SLIDES_INSTRUCTION}\n\n" f"Data:\n{json.dumps(pub_context, default=str)[:3000]}\n\n" f"Return JSON matching: {json.dumps(SLIDES_ARTIFACT_SCHEMA, indent=2)}\n" ) slides_text = _call_llm( "You are a presentation generator. Output ONLY valid JSON.", slides_prompt, ) slides_artifact = _parse_json(slides_text) if slides_artifact: artifacts["slides"] = slides_artifact if artifacts_needed.get("excel", True): yield sse_event("agent_progress", { "pipeline_id": pipeline_id, "agent": "publisher", "message": "Building Excel model...", }) artifacts["excel"] = _build_excel_artifact( primary_data, analysis_results, f"{primary} Financial Model", ) yield sse_event("agent_complete", { "pipeline_id": pipeline_id, "agent": "publisher", "agent_label": "The Publisher", "status": "completed", "message": f"Generated: {', '.join(artifacts.keys())}", "result": { "artifacts": list(artifacts.keys()), "slides_count": len(artifacts.get("slides", {}).get("slides", [])), "excel_sheets": len(artifacts.get("excel", {}).get("sheets", [])), }, }) except Exception as exc: logger.error("Publisher failed: %s", exc) yield sse_event("agent_error", { "pipeline_id": pipeline_id, "agent": "publisher", "message": f"Artifact generation error: {exc}", }) # -------------------------------------------------------------- # # 5. UI Mapper — build charts from model results # # -------------------------------------------------------------- # yield sse_event("agent_start", { "pipeline_id": pipeline_id, "agent": "ui_mapper", "agent_label": "The UI Mapper", "status": "running", "message": "Preparing dashboard visualizations...", }) try: chart_configs = _build_base_charts(primary, records) metrics = _build_base_metrics(primary_data) model_chart_map = { "moving_averages": {"title": f"{primary} Moving Averages", "chartType": "line", "yKeys": ["close", "SMA20", "SMA50"], "xKey": "date", "colors": ["#111111", "#E8440D", "#2563EB"]}, "rsi": {"title": f"{primary} RSI", "chartType": "line", "yKeys": ["RSI"], "xKey": "date", "colors": ["#7C3AED"]}, "macd": {"title": f"{primary} MACD", "chartType": "bar", "yKeys": ["MACD", "Signal", "Histogram"], "xKey": "date", "colors": ["#E8440D", "#2563EB", "#D1D5DB"]}, "bollinger_bands": {"title": f"{primary} Bollinger Bands", "chartType": "line", "yKeys": ["close", "upper", "mid", "lower"], "xKey": "date", "colors": ["#111111", "#E8440D", "#888", "#E8440D"]}, "monte_carlo": {"title": f"{primary} Monte Carlo Forecast", "chartType": "line", "yKeys": ["p5", "p25", "median", "p75", "p95"], "xKey": "day", "colors": ["#FCA5A5", "#F97316", "#E8440D", "#F97316", "#FCA5A5"]}, "regression": {"title": f"{primary} Trend & Forecast", "chartType": "line", "yKeys": ["close", "trend"], "xKey": "date", "colors": ["#111111", "#E8440D"]}, "risk_metrics": {"title": f"{primary} Drawdown", "chartType": "line", "yKeys": ["cumulative_return", "drawdown"], "xKey": "date", "colors": ["#16A34A", "#DC2626"]}, } mr = analysis_results.get("_model_results", {}) for model_name, chart_cfg in model_chart_map.items(): res = mr.get(model_name, {}) cdata = res.get("chart_data") if not cdata: continue chart_configs.append({ "id": f"model_{model_name}", "title": chart_cfg["title"], "chartType": chart_cfg["chartType"], "data": cdata[:60], "xKey": chart_cfg["xKey"], "yKeys": chart_cfg["yKeys"], "colors": chart_cfg["colors"], }) for model_name, res in mr.items(): if "error" in res: continue for sig in (res.get("signals") or [])[:2]: trend = "up" if any(w in sig.lower() for w in ("bullish", "buy", "above", "positive", "upward", "strong", "undervalued")) else \ "down" if any(w in sig.lower() for w in ("bearish", "sell", "below", "negative", "downward", "overvalued", "overbought")) else "neutral" metrics.append({"label": model_name.replace("_", " ").title(), "value": sig[:50], "change": "", "trend": trend}) dcf = mr.get("dcf", {}) if dcf.get("metrics"): iv = dcf["metrics"].get("intrinsic_value_per_share") up = dcf["metrics"].get("upside_pct") if iv is not None: metrics.append({"label": "DCF Intrinsic Value", "value": f"${iv:,.2f}" if isinstance(iv, (int, float)) else str(iv), "change": f"{up:+.1f}%" if isinstance(up, (int, float)) else "", "trend": "up" if (up or 0) > 0 else "down"}) yield sse_event("agent_complete", { "pipeline_id": pipeline_id, "agent": "ui_mapper", "agent_label": "The UI Mapper", "status": "completed", "message": f"Dashboard ready — {len(chart_configs)} charts, {len(metrics)} metrics.", "result": {"charts_count": len(chart_configs), "metrics_count": len(metrics)}, }) except Exception as exc: logger.error("UI Mapper failed: %s", exc) yield sse_event("agent_error", { "pipeline_id": pipeline_id, "agent": "ui_mapper", "message": f"UI mapping error: {exc}", }) # -------------------------------------------------------------- # # Final # # -------------------------------------------------------------- # try: interpretations = analysis_results.get("_interpretations", []) signals_text = "\n".join(analysis_results.get("_signals", [])[:10]) interp_text = "\n".join(interpretations[:6]) summary_prompt = ( f"Summarize this analysis for {', '.join(tickers)}:\n" f"Request: {user_prompt}\n" f"Models run: {', '.join(analysis_results.get('_models_run', []))}\n" f"Key signals:\n{signals_text}\n" f"Model interpretations:\n{interp_text}\n" f"Artifacts: {', '.join(artifacts.keys())}\n\n" f"Write a 3-5 sentence executive summary synthesizing the findings." ) final_summary = _call_llm( "You are a concise financial analyst. Write an executive summary.", summary_prompt, ) except Exception: final_summary = f"Analysis pipeline completed for {', '.join(tickers)}." data_summary = {} for k, v in collected_data.items(): if isinstance(v, dict) and "records" in v: data_summary[k] = { "ticker": v.get("ticker", ""), "company_name": v.get("company_name", ""), "current_price": v.get("current_price"), "records_count": len(v.get("records", [])), } else: data_summary[k] = {"preview": str(v)[:200]} clean_analysis_final = { k: v for k, v in analysis_results.items() if k != "_stdout" and not (isinstance(v, list) and len(v) > 20) } yield sse_event("pipeline_complete", { "pipeline_id": pipeline_id, "status": "completed", "summary": final_summary, "tickers": tickers, "charts": chart_configs, "metrics": metrics, "artifacts": artifacts, "collected_data": data_summary, "analysis_results": clean_analysis_final, })