Spaces:
Sleeping
Sleeping
| """Multi-Agent Financial Analysis Pipeline | |
| Subagents: | |
| 1. Orchestrator - Plans and delegates tasks | |
| 2. Data Quant - Fetches raw financial data | |
| 3. Analyst - Runs quantitative analysis (sandbox code execution) | |
| 4. Publisher - Generates artifacts (PPTX, XLSX) | |
| 5. UI Mapper - Transforms data for React charts | |
| """ | |
| import contextlib | |
| import io | |
| import json | |
| import logging | |
| import re | |
| from datetime import datetime | |
| from typing import Any, Generator | |
| from .claude_agent_service import call_claude_simple as _call_llm | |
| from .tools import ( | |
| fetch_breaking_news_data, | |
| fetch_market_benchmarks_data, | |
| fetch_stock_news_data, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ------------------------------------------------------------------ # | |
| # SSE Helper # | |
| # ------------------------------------------------------------------ # | |
| def sse_event(event_type: str, data: dict) -> str: | |
| payload = json.dumps(data, default=str) | |
| return f"event: {event_type}\ndata: {payload}\n\n" | |
| def _parse_json(text: str) -> dict | None: | |
| body = (text or "").strip() | |
| if not body: | |
| return None | |
| try: | |
| parsed = json.loads(body) | |
| if isinstance(parsed, dict): | |
| return parsed | |
| except json.JSONDecodeError: | |
| pass | |
| start, end = body.find("{"), body.rfind("}") | |
| if start != -1 and end > start: | |
| try: | |
| parsed = json.loads(body[start : end + 1]) | |
| if isinstance(parsed, dict): | |
| return parsed | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| # ------------------------------------------------------------------ # | |
| # Tool Functions # | |
| # ------------------------------------------------------------------ # | |
| def get_market_data(ticker: str, period: str = "3mo", interval: str = "1d") -> dict: | |
| try: | |
| import yfinance as yf | |
| except ImportError: | |
| return {"error": "yfinance not installed", "ticker": ticker} | |
| try: | |
| stock = yf.Ticker(ticker) | |
| hist = stock.history(period=period, interval=interval) | |
| if hist.empty: | |
| return {"error": f"No data for {ticker}", "ticker": ticker} | |
| records = [] | |
| for date, row in hist.iterrows(): | |
| records.append({ | |
| "date": date.strftime("%Y-%m-%d"), | |
| "open": round(row["Open"], 2), | |
| "high": round(row["High"], 2), | |
| "low": round(row["Low"], 2), | |
| "close": round(row["Close"], 2), | |
| "volume": int(row["Volume"]), | |
| }) | |
| info = stock.info | |
| return { | |
| "ticker": ticker, | |
| "period": period, | |
| "records": records[-60:], | |
| "company_name": info.get("shortName", ticker), | |
| "sector": info.get("sector", ""), | |
| "industry": info.get("industry", ""), | |
| "market_cap": info.get("marketCap"), | |
| "current_price": info.get("currentPrice") or info.get("regularMarketPrice"), | |
| "pe_ratio": info.get("trailingPE"), | |
| "forward_pe": info.get("forwardPE"), | |
| "dividend_yield": info.get("dividendYield"), | |
| "52w_high": info.get("fiftyTwoWeekHigh"), | |
| "52w_low": info.get("fiftyTwoWeekLow"), | |
| "beta": info.get("beta"), | |
| "revenue": info.get("totalRevenue"), | |
| "profit_margin": info.get("profitMargins"), | |
| } | |
| except Exception as exc: | |
| return {"error": str(exc), "ticker": ticker} | |
| def get_financial_statements(ticker: str, statement_type: str = "income") -> dict: | |
| """Fetch financial statements using yfinance (income, balance sheet, cash flow).""" | |
| try: | |
| import yfinance as yf | |
| stock = yf.Ticker(ticker) | |
| df_map = {"income": stock.financials, "balance": stock.balance_sheet, "cashflow": stock.cashflow} | |
| df = df_map.get(statement_type) | |
| if df is None or df.empty: | |
| return {"error": f"No {statement_type} data", "ticker": ticker} | |
| records = {} | |
| for col in list(df.columns)[:4]: | |
| period_key = col.strftime("%Y-%m-%d") if hasattr(col, "strftime") else str(col) | |
| records[period_key] = { | |
| str(idx): (float(df.loc[idx, col]) if df.loc[idx, col] == df.loc[idx, col] else None) | |
| for idx in df.index | |
| } | |
| return {"ticker": ticker, "statement_type": statement_type, "statements": records} | |
| except ImportError: | |
| return {"error": "yfinance not installed", "ticker": ticker} | |
| except Exception as exc: | |
| return {"error": str(exc), "ticker": ticker} | |
| def execute_sandbox_code(python_code: str) -> dict: | |
| allowed_builtins = { | |
| "abs": abs, "round": round, "min": min, "max": max, "sum": sum, | |
| "len": len, "range": range, "enumerate": enumerate, "zip": zip, | |
| "sorted": sorted, "list": list, "dict": dict, "tuple": tuple, | |
| "set": set, "str": str, "int": int, "float": float, "bool": bool, | |
| "print": print, "isinstance": isinstance, "map": map, "filter": filter, | |
| "any": any, "all": all, "__import__": __import__, | |
| } | |
| safe_globals: dict[str, Any] = {"__builtins__": allowed_builtins} | |
| try: | |
| import statistics | |
| safe_globals["statistics"] = statistics | |
| except ImportError: | |
| pass | |
| try: | |
| import math | |
| safe_globals["math"] = math | |
| except ImportError: | |
| pass | |
| result_store: dict[str, Any] = {} | |
| safe_globals["_result"] = result_store | |
| output_buffer = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(output_buffer): | |
| exec(python_code, safe_globals) # noqa: S102 | |
| return {"success": True, "stdout": output_buffer.getvalue()[:5000], "result": result_store or None} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc), "stdout": output_buffer.getvalue()[:2000]} | |
| # ------------------------------------------------------------------ # | |
| # Subagent System Prompts # | |
| # ------------------------------------------------------------------ # | |
| _ORCHESTRATOR_PROMPT = """\ | |
| You are a lead portfolio manager. Break the user's request into a step-by-step plan \ | |
| and identify what data, analysis, and artifacts are needed. | |
| Available statistical models (pick the most relevant): | |
| moving_averages, rsi, macd, bollinger_bands, monte_carlo, | |
| dcf, beta_capm, risk_metrics, correlation, regression | |
| Return ONLY valid JSON (no markdown fences): | |
| { | |
| "plan_summary": "1-2 sentence overview", | |
| "tickers": ["AAPL"], | |
| "data_needs": { | |
| "market_data": true, | |
| "financial_statements": false, | |
| "statement_types": ["income"], | |
| "news": true, | |
| "benchmarks": true | |
| }, | |
| "artifacts_needed": {"slides": true, "excel": true}, | |
| "models_to_run": ["moving_averages", "rsi", "macd", "risk_metrics"], | |
| "analysis_requirements": ["moving averages", "volatility", "returns"], | |
| "period": "3mo" | |
| }""" | |
| _UI_MAPPER_PROMPT = """\ | |
| You are a frontend data engineer. Transform analysis results into React chart configs. | |
| Return ONLY valid JSON (no markdown fences): | |
| { | |
| "charts": [{"id": "str", "title": "str", "chartType": "line|bar|pie", | |
| "data": [{"name": "x", "value": 0}], "xKey": "name", | |
| "yKeys": ["value"], "colors": ["#E8440D"]}], | |
| "metrics": [{"label": "str", "value": "str", "change": "str", "trend": "up|down|neutral"}], | |
| "summary": "Brief dashboard description" | |
| }""" | |
| # ------------------------------------------------------------------ # | |
| # Chart / Artifact Helpers # | |
| # ------------------------------------------------------------------ # | |
| def _build_base_charts(ticker: str, records: list[dict]) -> list[dict]: | |
| charts = [] | |
| if not records: | |
| return charts | |
| tail = records[-30:] | |
| charts.append({ | |
| "id": "price_trend", | |
| "title": f"{ticker} Price Trend", | |
| "chartType": "line", | |
| "data": [{"date": r["date"], "close": r["close"]} for r in tail], | |
| "xKey": "date", | |
| "yKeys": ["close"], | |
| "colors": ["#E8440D"], | |
| }) | |
| charts.append({ | |
| "id": "volume", | |
| "title": f"{ticker} Trading Volume", | |
| "chartType": "bar", | |
| "data": [{"date": r["date"], "volume": r["volume"]} for r in tail[-20:]], | |
| "xKey": "date", | |
| "yKeys": ["volume"], | |
| "colors": ["#2563EB"], | |
| }) | |
| return charts | |
| def _build_base_metrics(data: dict) -> list[dict]: | |
| metrics = [] | |
| price = data.get("current_price") | |
| if price and isinstance(price, (int, float)): | |
| metrics.append({"label": "Current Price", "value": f"${price:,.2f}", "change": "", "trend": "neutral"}) | |
| pe = data.get("pe_ratio") | |
| if pe and isinstance(pe, (int, float)): | |
| metrics.append({"label": "P/E Ratio", "value": f"{pe:.1f}", "change": "", "trend": "neutral"}) | |
| mc = data.get("market_cap") | |
| if mc and isinstance(mc, (int, float)): | |
| if mc >= 1e12: | |
| ms = f"${mc / 1e12:.2f}T" | |
| elif mc >= 1e9: | |
| ms = f"${mc / 1e9:.2f}B" | |
| else: | |
| ms = f"${mc / 1e6:.0f}M" | |
| metrics.append({"label": "Market Cap", "value": ms, "change": "", "trend": "neutral"}) | |
| hi = data.get("52w_high") | |
| lo = data.get("52w_low") | |
| if hi and isinstance(hi, (int, float)): | |
| metrics.append({"label": "52-Week High", "value": f"${hi:,.2f}", "change": "", "trend": "neutral"}) | |
| if lo and isinstance(lo, (int, float)): | |
| metrics.append({"label": "52-Week Low", "value": f"${lo:,.2f}", "change": "", "trend": "neutral"}) | |
| return metrics | |
| def _build_excel_artifact(data: dict, analysis: dict, title: str) -> dict: | |
| sheets = [] | |
| records = data.get("records", []) | |
| if records: | |
| cols = list(records[0].keys()) | |
| sheets.append({ | |
| "name": "Market Data", | |
| "columns": cols, | |
| "rows": [[row.get(c, "") for c in cols] for row in records[:50]], | |
| }) | |
| analysis_rows = [ | |
| [str(k), str(v)] | |
| for k, v in analysis.items() | |
| if k != "_stdout" and not isinstance(v, (dict, list)) | |
| ] | |
| if analysis_rows: | |
| sheets.append({"name": "Analysis", "columns": ["Metric", "Value"], "rows": analysis_rows}) | |
| if not sheets: | |
| summary = {k: v for k, v in data.items() if not isinstance(v, (dict, list))} | |
| sheets.append({ | |
| "name": "Summary", | |
| "columns": ["Field", "Value"], | |
| "rows": [[str(k), str(v)] for k, v in summary.items()][:20], | |
| }) | |
| return {"title": title, "sheets": sheets, "formulas": [], "insights": []} | |
| # ------------------------------------------------------------------ # | |
| # Pipeline # | |
| # ------------------------------------------------------------------ # | |
| def run_multi_agent_pipeline( | |
| user_prompt: str, | |
| data_context: dict[str, Any] | None = None, | |
| ) -> Generator[str, None, None]: | |
| """Yield SSE events as the 5-agent pipeline executes.""" | |
| pipeline_id = f"p_{int(datetime.now().timestamp() * 1000)}" | |
| default_ticker = (data_context or {}).get("ticker", "AAPL") if isinstance(data_context, dict) else "AAPL" | |
| collected_data: dict[str, Any] = {} | |
| analysis_results: dict[str, Any] = {} | |
| artifacts: dict[str, Any] = {} | |
| chart_configs: list[dict] = [] | |
| metrics: list[dict] = [] | |
| # -------------------------------------------------------------- # | |
| # 1. Orchestrator # | |
| # -------------------------------------------------------------- # | |
| yield sse_event("agent_start", { | |
| "pipeline_id": pipeline_id, "agent": "orchestrator", | |
| "agent_label": "The Orchestrator", "status": "running", | |
| "message": "Breaking down your request into an execution plan...", | |
| }) | |
| ctx_str = "" | |
| if data_context: | |
| ctx_str = f"\n\nSession context: {json.dumps(data_context, default=str)[:1500]}" | |
| try: | |
| plan_text = _call_llm(_ORCHESTRATOR_PROMPT, f"User request: {user_prompt}{ctx_str}") | |
| plan = _parse_json(plan_text) | |
| except Exception as exc: | |
| logger.error("Orchestrator LLM call failed: %s", exc) | |
| plan = None | |
| if not plan: | |
| plan = { | |
| "plan_summary": f"Analyze {default_ticker} based on user request", | |
| "tickers": [default_ticker], | |
| "data_needs": {"market_data": True, "financial_statements": True, | |
| "statement_types": ["income"], "news": True, "benchmarks": True}, | |
| "artifacts_needed": {"slides": True, "excel": True}, | |
| "analysis_requirements": ["price trend", "moving averages", "volatility", "key ratios"], | |
| "period": "3mo", | |
| } | |
| tickers = plan.get("tickers") or [default_ticker] | |
| if not tickers: | |
| tickers = [default_ticker] | |
| yield sse_event("agent_complete", { | |
| "pipeline_id": pipeline_id, "agent": "orchestrator", | |
| "agent_label": "The Orchestrator", "status": "completed", | |
| "message": plan.get("plan_summary", "Plan created."), | |
| "result": {"plan": plan, "tickers": tickers}, | |
| }) | |
| # -------------------------------------------------------------- # | |
| # 2. Data Quant # | |
| # -------------------------------------------------------------- # | |
| yield sse_event("agent_start", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "agent_label": "The Data Quant", "status": "running", | |
| "message": f"Fetching market data for {', '.join(tickers[:3])}...", | |
| }) | |
| data_needs = plan.get("data_needs", {}) | |
| period = plan.get("period", "3mo") | |
| try: | |
| for tkr in tickers[:3]: | |
| if data_needs.get("market_data", True): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "message": f"Fetching OHLCV data for {tkr}...", | |
| }) | |
| collected_data[f"{tkr}_market"] = get_market_data(tkr, period=period) | |
| if data_needs.get("financial_statements"): | |
| for st in data_needs.get("statement_types", ["income"]): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "message": f"Fetching {st} statement for {tkr}...", | |
| }) | |
| collected_data[f"{tkr}_{st}"] = get_financial_statements(tkr, st) | |
| if data_needs.get("news", True): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "message": "Fetching market news...", | |
| }) | |
| try: | |
| collected_data["stock_news"] = fetch_stock_news_data(tickers[0]) | |
| except Exception: | |
| collected_data["stock_news"] = "No news available" | |
| try: | |
| collected_data["breaking_news"] = fetch_breaking_news_data() | |
| except Exception: | |
| collected_data["breaking_news"] = "No breaking news" | |
| if data_needs.get("benchmarks", True): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "message": "Fetching benchmarks...", | |
| }) | |
| try: | |
| collected_data["benchmarks"] = fetch_market_benchmarks_data() | |
| except Exception: | |
| collected_data["benchmarks"] = "No benchmarks" | |
| total_points = sum( | |
| len(v.get("records", [])) if isinstance(v, dict) else 0 | |
| for v in collected_data.values() | |
| ) | |
| yield sse_event("agent_complete", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "agent_label": "The Data Quant", "status": "completed", | |
| "message": f"Retrieved {total_points} data points from {len(collected_data)} sources.", | |
| "result": {"data_points": total_points, "sources": len(collected_data)}, | |
| }) | |
| except Exception as exc: | |
| logger.error("Data Quant failed: %s", exc) | |
| yield sse_event("agent_error", { | |
| "pipeline_id": pipeline_id, "agent": "data_quant", | |
| "message": f"Data retrieval error: {exc}", | |
| }) | |
| # -------------------------------------------------------------- # | |
| # 3. Analyst — run pre-built statistical models # | |
| # -------------------------------------------------------------- # | |
| yield sse_event("agent_start", { | |
| "pipeline_id": pipeline_id, "agent": "analyst", | |
| "agent_label": "The Analyst", "status": "running", | |
| "message": "Running quantitative models...", | |
| }) | |
| primary = tickers[0] | |
| primary_data = collected_data.get(f"{primary}_market", {}) | |
| records = primary_data.get("records", []) | |
| from .financial_models import run_model, ALL_MODELS | |
| default_models = ["moving_averages", "rsi", "macd", "bollinger_bands", "risk_metrics", "regression"] | |
| ui_requested = (data_context or {}).get("requested_models") if isinstance(data_context, dict) else None | |
| requested_models = ui_requested or plan.get("models_to_run", default_models) | |
| if not requested_models: | |
| requested_models = default_models | |
| valid_models = [m for m in requested_models if m in ALL_MODELS] | |
| if not valid_models: | |
| valid_models = default_models | |
| model_results: dict[str, Any] = {} | |
| model_period = plan.get("period", "6mo") | |
| try: | |
| for model_name in valid_models: | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "analyst", | |
| "message": f"Running {model_name} on {primary}...", | |
| }) | |
| kwargs = {"period": model_period} | |
| if model_name == "correlation" and len(tickers) >= 2: | |
| kwargs["tickers"] = tickers[:5] | |
| res = run_model(model_name, primary, **kwargs) | |
| model_results[model_name] = res | |
| analysis_results = {} | |
| all_signals: list[str] = [] | |
| all_interpretations: list[str] = [] | |
| for name, res in model_results.items(): | |
| if "error" in res: | |
| analysis_results[f"{name}_error"] = res["error"] | |
| continue | |
| for k, v in (res.get("metrics") or {}).items(): | |
| analysis_results[f"{name}_{k}"] = v | |
| all_signals.extend(res.get("signals", [])) | |
| interp = res.get("interpretation", "") | |
| if interp: | |
| all_interpretations.append(interp) | |
| analysis_results["_signals"] = all_signals | |
| analysis_results["_interpretations"] = all_interpretations | |
| analysis_results["_model_results"] = model_results | |
| analysis_results["_models_run"] = list(model_results.keys()) | |
| yield sse_event("agent_complete", { | |
| "pipeline_id": pipeline_id, "agent": "analyst", | |
| "agent_label": "The Analyst", "status": "completed", | |
| "message": ( | |
| f"Analysis complete — {len(valid_models)} models, " | |
| f"{len(analysis_results)} metrics, {len(all_signals)} signals." | |
| ), | |
| "result": { | |
| "models_run": list(model_results.keys()), | |
| "metrics_count": len(analysis_results), | |
| "signals": all_signals[:8], | |
| }, | |
| }) | |
| except Exception as exc: | |
| logger.error("Analyst failed: %s", exc) | |
| analysis_results = {"error": str(exc)} | |
| yield sse_event("agent_error", { | |
| "pipeline_id": pipeline_id, "agent": "analyst", | |
| "message": f"Analysis error: {exc}", | |
| }) | |
| # -------------------------------------------------------------- # | |
| # 4. Publisher # | |
| # -------------------------------------------------------------- # | |
| yield sse_event("agent_start", { | |
| "pipeline_id": pipeline_id, "agent": "publisher", | |
| "agent_label": "The Publisher", "status": "running", | |
| "message": "Generating artifacts...", | |
| }) | |
| artifacts_needed = plan.get("artifacts_needed", {}) | |
| try: | |
| if artifacts_needed.get("slides", True): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "publisher", | |
| "message": "Creating slide deck...", | |
| }) | |
| from .agent_tools_service import SLIDES_ARTIFACT_SCHEMA, SLIDES_INSTRUCTION | |
| pub_context = { | |
| "ticker": primary, | |
| "company": primary_data.get("company_name", primary), | |
| "price": primary_data.get("current_price"), | |
| "pe": primary_data.get("pe_ratio"), | |
| "market_cap": primary_data.get("market_cap"), | |
| "analysis": { | |
| k: v for k, v in analysis_results.items() | |
| if k != "_stdout" and not isinstance(v, list) | |
| }, | |
| "news": str(collected_data.get("stock_news", ""))[:800], | |
| "request": user_prompt, | |
| } | |
| slides_prompt = ( | |
| f"{SLIDES_INSTRUCTION}\n\n" | |
| f"Data:\n{json.dumps(pub_context, default=str)[:3000]}\n\n" | |
| f"Return JSON matching: {json.dumps(SLIDES_ARTIFACT_SCHEMA, indent=2)}\n" | |
| ) | |
| slides_text = _call_llm( | |
| "You are a presentation generator. Output ONLY valid JSON.", | |
| slides_prompt, | |
| ) | |
| slides_artifact = _parse_json(slides_text) | |
| if slides_artifact: | |
| artifacts["slides"] = slides_artifact | |
| if artifacts_needed.get("excel", True): | |
| yield sse_event("agent_progress", { | |
| "pipeline_id": pipeline_id, "agent": "publisher", | |
| "message": "Building Excel model...", | |
| }) | |
| artifacts["excel"] = _build_excel_artifact( | |
| primary_data, analysis_results, f"{primary} Financial Model", | |
| ) | |
| yield sse_event("agent_complete", { | |
| "pipeline_id": pipeline_id, "agent": "publisher", | |
| "agent_label": "The Publisher", "status": "completed", | |
| "message": f"Generated: {', '.join(artifacts.keys())}", | |
| "result": { | |
| "artifacts": list(artifacts.keys()), | |
| "slides_count": len(artifacts.get("slides", {}).get("slides", [])), | |
| "excel_sheets": len(artifacts.get("excel", {}).get("sheets", [])), | |
| }, | |
| }) | |
| except Exception as exc: | |
| logger.error("Publisher failed: %s", exc) | |
| yield sse_event("agent_error", { | |
| "pipeline_id": pipeline_id, "agent": "publisher", | |
| "message": f"Artifact generation error: {exc}", | |
| }) | |
| # -------------------------------------------------------------- # | |
| # 5. UI Mapper — build charts from model results # | |
| # -------------------------------------------------------------- # | |
| yield sse_event("agent_start", { | |
| "pipeline_id": pipeline_id, "agent": "ui_mapper", | |
| "agent_label": "The UI Mapper", "status": "running", | |
| "message": "Preparing dashboard visualizations...", | |
| }) | |
| try: | |
| chart_configs = _build_base_charts(primary, records) | |
| metrics = _build_base_metrics(primary_data) | |
| model_chart_map = { | |
| "moving_averages": {"title": f"{primary} Moving Averages", "chartType": "line", | |
| "yKeys": ["close", "SMA20", "SMA50"], "xKey": "date", | |
| "colors": ["#111111", "#E8440D", "#2563EB"]}, | |
| "rsi": {"title": f"{primary} RSI", "chartType": "line", | |
| "yKeys": ["RSI"], "xKey": "date", "colors": ["#7C3AED"]}, | |
| "macd": {"title": f"{primary} MACD", "chartType": "bar", | |
| "yKeys": ["MACD", "Signal", "Histogram"], "xKey": "date", | |
| "colors": ["#E8440D", "#2563EB", "#D1D5DB"]}, | |
| "bollinger_bands": {"title": f"{primary} Bollinger Bands", "chartType": "line", | |
| "yKeys": ["close", "upper", "mid", "lower"], "xKey": "date", | |
| "colors": ["#111111", "#E8440D", "#888", "#E8440D"]}, | |
| "monte_carlo": {"title": f"{primary} Monte Carlo Forecast", "chartType": "line", | |
| "yKeys": ["p5", "p25", "median", "p75", "p95"], "xKey": "day", | |
| "colors": ["#FCA5A5", "#F97316", "#E8440D", "#F97316", "#FCA5A5"]}, | |
| "regression": {"title": f"{primary} Trend & Forecast", "chartType": "line", | |
| "yKeys": ["close", "trend"], "xKey": "date", | |
| "colors": ["#111111", "#E8440D"]}, | |
| "risk_metrics": {"title": f"{primary} Drawdown", "chartType": "line", | |
| "yKeys": ["cumulative_return", "drawdown"], "xKey": "date", | |
| "colors": ["#16A34A", "#DC2626"]}, | |
| } | |
| mr = analysis_results.get("_model_results", {}) | |
| for model_name, chart_cfg in model_chart_map.items(): | |
| res = mr.get(model_name, {}) | |
| cdata = res.get("chart_data") | |
| if not cdata: | |
| continue | |
| chart_configs.append({ | |
| "id": f"model_{model_name}", | |
| "title": chart_cfg["title"], | |
| "chartType": chart_cfg["chartType"], | |
| "data": cdata[:60], | |
| "xKey": chart_cfg["xKey"], | |
| "yKeys": chart_cfg["yKeys"], | |
| "colors": chart_cfg["colors"], | |
| }) | |
| for model_name, res in mr.items(): | |
| if "error" in res: | |
| continue | |
| for sig in (res.get("signals") or [])[:2]: | |
| trend = "up" if any(w in sig.lower() for w in ("bullish", "buy", "above", "positive", "upward", "strong", "undervalued")) else \ | |
| "down" if any(w in sig.lower() for w in ("bearish", "sell", "below", "negative", "downward", "overvalued", "overbought")) else "neutral" | |
| metrics.append({"label": model_name.replace("_", " ").title(), "value": sig[:50], "change": "", "trend": trend}) | |
| dcf = mr.get("dcf", {}) | |
| if dcf.get("metrics"): | |
| iv = dcf["metrics"].get("intrinsic_value_per_share") | |
| up = dcf["metrics"].get("upside_pct") | |
| if iv is not None: | |
| metrics.append({"label": "DCF Intrinsic Value", "value": f"${iv:,.2f}" if isinstance(iv, (int, float)) else str(iv), | |
| "change": f"{up:+.1f}%" if isinstance(up, (int, float)) else "", "trend": "up" if (up or 0) > 0 else "down"}) | |
| yield sse_event("agent_complete", { | |
| "pipeline_id": pipeline_id, "agent": "ui_mapper", | |
| "agent_label": "The UI Mapper", "status": "completed", | |
| "message": f"Dashboard ready — {len(chart_configs)} charts, {len(metrics)} metrics.", | |
| "result": {"charts_count": len(chart_configs), "metrics_count": len(metrics)}, | |
| }) | |
| except Exception as exc: | |
| logger.error("UI Mapper failed: %s", exc) | |
| yield sse_event("agent_error", { | |
| "pipeline_id": pipeline_id, "agent": "ui_mapper", | |
| "message": f"UI mapping error: {exc}", | |
| }) | |
| # -------------------------------------------------------------- # | |
| # Final # | |
| # -------------------------------------------------------------- # | |
| try: | |
| interpretations = analysis_results.get("_interpretations", []) | |
| signals_text = "\n".join(analysis_results.get("_signals", [])[:10]) | |
| interp_text = "\n".join(interpretations[:6]) | |
| summary_prompt = ( | |
| f"Summarize this analysis for {', '.join(tickers)}:\n" | |
| f"Request: {user_prompt}\n" | |
| f"Models run: {', '.join(analysis_results.get('_models_run', []))}\n" | |
| f"Key signals:\n{signals_text}\n" | |
| f"Model interpretations:\n{interp_text}\n" | |
| f"Artifacts: {', '.join(artifacts.keys())}\n\n" | |
| f"Write a 3-5 sentence executive summary synthesizing the findings." | |
| ) | |
| final_summary = _call_llm( | |
| "You are a concise financial analyst. Write an executive summary.", | |
| summary_prompt, | |
| ) | |
| except Exception: | |
| final_summary = f"Analysis pipeline completed for {', '.join(tickers)}." | |
| data_summary = {} | |
| for k, v in collected_data.items(): | |
| if isinstance(v, dict) and "records" in v: | |
| data_summary[k] = { | |
| "ticker": v.get("ticker", ""), | |
| "company_name": v.get("company_name", ""), | |
| "current_price": v.get("current_price"), | |
| "records_count": len(v.get("records", [])), | |
| } | |
| else: | |
| data_summary[k] = {"preview": str(v)[:200]} | |
| clean_analysis_final = { | |
| k: v for k, v in analysis_results.items() | |
| if k != "_stdout" and not (isinstance(v, list) and len(v) > 20) | |
| } | |
| yield sse_event("pipeline_complete", { | |
| "pipeline_id": pipeline_id, | |
| "status": "completed", | |
| "summary": final_summary, | |
| "tickers": tickers, | |
| "charts": chart_configs, | |
| "metrics": metrics, | |
| "artifacts": artifacts, | |
| "collected_data": data_summary, | |
| "analysis_results": clean_analysis_final, | |
| }) | |