AnalyzrAI / apps /copilot /multi_agent_service.py
thejagstudio's picture
Upload 92 files
0310410 verified
"""Multi-Agent Financial Analysis Pipeline
Subagents:
1. Orchestrator - Plans and delegates tasks
2. Data Quant - Fetches raw financial data
3. Analyst - Runs quantitative analysis (sandbox code execution)
4. Publisher - Generates artifacts (PPTX, XLSX)
5. UI Mapper - Transforms data for React charts
"""
import contextlib
import io
import json
import logging
import re
from datetime import datetime
from typing import Any, Generator
from .claude_agent_service import call_claude_simple as _call_llm
from .tools import (
fetch_breaking_news_data,
fetch_market_benchmarks_data,
fetch_stock_news_data,
)
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------ #
# SSE Helper #
# ------------------------------------------------------------------ #
def sse_event(event_type: str, data: dict) -> str:
payload = json.dumps(data, default=str)
return f"event: {event_type}\ndata: {payload}\n\n"
def _parse_json(text: str) -> dict | None:
body = (text or "").strip()
if not body:
return None
try:
parsed = json.loads(body)
if isinstance(parsed, dict):
return parsed
except json.JSONDecodeError:
pass
start, end = body.find("{"), body.rfind("}")
if start != -1 and end > start:
try:
parsed = json.loads(body[start : end + 1])
if isinstance(parsed, dict):
return parsed
except json.JSONDecodeError:
pass
return None
# ------------------------------------------------------------------ #
# Tool Functions #
# ------------------------------------------------------------------ #
def get_market_data(ticker: str, period: str = "3mo", interval: str = "1d") -> dict:
try:
import yfinance as yf
except ImportError:
return {"error": "yfinance not installed", "ticker": ticker}
try:
stock = yf.Ticker(ticker)
hist = stock.history(period=period, interval=interval)
if hist.empty:
return {"error": f"No data for {ticker}", "ticker": ticker}
records = []
for date, row in hist.iterrows():
records.append({
"date": date.strftime("%Y-%m-%d"),
"open": round(row["Open"], 2),
"high": round(row["High"], 2),
"low": round(row["Low"], 2),
"close": round(row["Close"], 2),
"volume": int(row["Volume"]),
})
info = stock.info
return {
"ticker": ticker,
"period": period,
"records": records[-60:],
"company_name": info.get("shortName", ticker),
"sector": info.get("sector", ""),
"industry": info.get("industry", ""),
"market_cap": info.get("marketCap"),
"current_price": info.get("currentPrice") or info.get("regularMarketPrice"),
"pe_ratio": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"dividend_yield": info.get("dividendYield"),
"52w_high": info.get("fiftyTwoWeekHigh"),
"52w_low": info.get("fiftyTwoWeekLow"),
"beta": info.get("beta"),
"revenue": info.get("totalRevenue"),
"profit_margin": info.get("profitMargins"),
}
except Exception as exc:
return {"error": str(exc), "ticker": ticker}
def get_financial_statements(ticker: str, statement_type: str = "income") -> dict:
"""Fetch financial statements using yfinance (income, balance sheet, cash flow)."""
try:
import yfinance as yf
stock = yf.Ticker(ticker)
df_map = {"income": stock.financials, "balance": stock.balance_sheet, "cashflow": stock.cashflow}
df = df_map.get(statement_type)
if df is None or df.empty:
return {"error": f"No {statement_type} data", "ticker": ticker}
records = {}
for col in list(df.columns)[:4]:
period_key = col.strftime("%Y-%m-%d") if hasattr(col, "strftime") else str(col)
records[period_key] = {
str(idx): (float(df.loc[idx, col]) if df.loc[idx, col] == df.loc[idx, col] else None)
for idx in df.index
}
return {"ticker": ticker, "statement_type": statement_type, "statements": records}
except ImportError:
return {"error": "yfinance not installed", "ticker": ticker}
except Exception as exc:
return {"error": str(exc), "ticker": ticker}
def execute_sandbox_code(python_code: str) -> dict:
allowed_builtins = {
"abs": abs, "round": round, "min": min, "max": max, "sum": sum,
"len": len, "range": range, "enumerate": enumerate, "zip": zip,
"sorted": sorted, "list": list, "dict": dict, "tuple": tuple,
"set": set, "str": str, "int": int, "float": float, "bool": bool,
"print": print, "isinstance": isinstance, "map": map, "filter": filter,
"any": any, "all": all, "__import__": __import__,
}
safe_globals: dict[str, Any] = {"__builtins__": allowed_builtins}
try:
import statistics
safe_globals["statistics"] = statistics
except ImportError:
pass
try:
import math
safe_globals["math"] = math
except ImportError:
pass
result_store: dict[str, Any] = {}
safe_globals["_result"] = result_store
output_buffer = io.StringIO()
try:
with contextlib.redirect_stdout(output_buffer):
exec(python_code, safe_globals) # noqa: S102
return {"success": True, "stdout": output_buffer.getvalue()[:5000], "result": result_store or None}
except Exception as exc:
return {"success": False, "error": str(exc), "stdout": output_buffer.getvalue()[:2000]}
# ------------------------------------------------------------------ #
# Subagent System Prompts #
# ------------------------------------------------------------------ #
_ORCHESTRATOR_PROMPT = """\
You are a lead portfolio manager. Break the user's request into a step-by-step plan \
and identify what data, analysis, and artifacts are needed.
Available statistical models (pick the most relevant):
moving_averages, rsi, macd, bollinger_bands, monte_carlo,
dcf, beta_capm, risk_metrics, correlation, regression
Return ONLY valid JSON (no markdown fences):
{
"plan_summary": "1-2 sentence overview",
"tickers": ["AAPL"],
"data_needs": {
"market_data": true,
"financial_statements": false,
"statement_types": ["income"],
"news": true,
"benchmarks": true
},
"artifacts_needed": {"slides": true, "excel": true},
"models_to_run": ["moving_averages", "rsi", "macd", "risk_metrics"],
"analysis_requirements": ["moving averages", "volatility", "returns"],
"period": "3mo"
}"""
_UI_MAPPER_PROMPT = """\
You are a frontend data engineer. Transform analysis results into React chart configs.
Return ONLY valid JSON (no markdown fences):
{
"charts": [{"id": "str", "title": "str", "chartType": "line|bar|pie",
"data": [{"name": "x", "value": 0}], "xKey": "name",
"yKeys": ["value"], "colors": ["#E8440D"]}],
"metrics": [{"label": "str", "value": "str", "change": "str", "trend": "up|down|neutral"}],
"summary": "Brief dashboard description"
}"""
# ------------------------------------------------------------------ #
# Chart / Artifact Helpers #
# ------------------------------------------------------------------ #
def _build_base_charts(ticker: str, records: list[dict]) -> list[dict]:
charts = []
if not records:
return charts
tail = records[-30:]
charts.append({
"id": "price_trend",
"title": f"{ticker} Price Trend",
"chartType": "line",
"data": [{"date": r["date"], "close": r["close"]} for r in tail],
"xKey": "date",
"yKeys": ["close"],
"colors": ["#E8440D"],
})
charts.append({
"id": "volume",
"title": f"{ticker} Trading Volume",
"chartType": "bar",
"data": [{"date": r["date"], "volume": r["volume"]} for r in tail[-20:]],
"xKey": "date",
"yKeys": ["volume"],
"colors": ["#2563EB"],
})
return charts
def _build_base_metrics(data: dict) -> list[dict]:
metrics = []
price = data.get("current_price")
if price and isinstance(price, (int, float)):
metrics.append({"label": "Current Price", "value": f"${price:,.2f}", "change": "", "trend": "neutral"})
pe = data.get("pe_ratio")
if pe and isinstance(pe, (int, float)):
metrics.append({"label": "P/E Ratio", "value": f"{pe:.1f}", "change": "", "trend": "neutral"})
mc = data.get("market_cap")
if mc and isinstance(mc, (int, float)):
if mc >= 1e12:
ms = f"${mc / 1e12:.2f}T"
elif mc >= 1e9:
ms = f"${mc / 1e9:.2f}B"
else:
ms = f"${mc / 1e6:.0f}M"
metrics.append({"label": "Market Cap", "value": ms, "change": "", "trend": "neutral"})
hi = data.get("52w_high")
lo = data.get("52w_low")
if hi and isinstance(hi, (int, float)):
metrics.append({"label": "52-Week High", "value": f"${hi:,.2f}", "change": "", "trend": "neutral"})
if lo and isinstance(lo, (int, float)):
metrics.append({"label": "52-Week Low", "value": f"${lo:,.2f}", "change": "", "trend": "neutral"})
return metrics
def _build_excel_artifact(data: dict, analysis: dict, title: str) -> dict:
sheets = []
records = data.get("records", [])
if records:
cols = list(records[0].keys())
sheets.append({
"name": "Market Data",
"columns": cols,
"rows": [[row.get(c, "") for c in cols] for row in records[:50]],
})
analysis_rows = [
[str(k), str(v)]
for k, v in analysis.items()
if k != "_stdout" and not isinstance(v, (dict, list))
]
if analysis_rows:
sheets.append({"name": "Analysis", "columns": ["Metric", "Value"], "rows": analysis_rows})
if not sheets:
summary = {k: v for k, v in data.items() if not isinstance(v, (dict, list))}
sheets.append({
"name": "Summary",
"columns": ["Field", "Value"],
"rows": [[str(k), str(v)] for k, v in summary.items()][:20],
})
return {"title": title, "sheets": sheets, "formulas": [], "insights": []}
# ------------------------------------------------------------------ #
# Pipeline #
# ------------------------------------------------------------------ #
def run_multi_agent_pipeline(
user_prompt: str,
data_context: dict[str, Any] | None = None,
) -> Generator[str, None, None]:
"""Yield SSE events as the 5-agent pipeline executes."""
pipeline_id = f"p_{int(datetime.now().timestamp() * 1000)}"
default_ticker = (data_context or {}).get("ticker", "AAPL") if isinstance(data_context, dict) else "AAPL"
collected_data: dict[str, Any] = {}
analysis_results: dict[str, Any] = {}
artifacts: dict[str, Any] = {}
chart_configs: list[dict] = []
metrics: list[dict] = []
# -------------------------------------------------------------- #
# 1. Orchestrator #
# -------------------------------------------------------------- #
yield sse_event("agent_start", {
"pipeline_id": pipeline_id, "agent": "orchestrator",
"agent_label": "The Orchestrator", "status": "running",
"message": "Breaking down your request into an execution plan...",
})
ctx_str = ""
if data_context:
ctx_str = f"\n\nSession context: {json.dumps(data_context, default=str)[:1500]}"
try:
plan_text = _call_llm(_ORCHESTRATOR_PROMPT, f"User request: {user_prompt}{ctx_str}")
plan = _parse_json(plan_text)
except Exception as exc:
logger.error("Orchestrator LLM call failed: %s", exc)
plan = None
if not plan:
plan = {
"plan_summary": f"Analyze {default_ticker} based on user request",
"tickers": [default_ticker],
"data_needs": {"market_data": True, "financial_statements": True,
"statement_types": ["income"], "news": True, "benchmarks": True},
"artifacts_needed": {"slides": True, "excel": True},
"analysis_requirements": ["price trend", "moving averages", "volatility", "key ratios"],
"period": "3mo",
}
tickers = plan.get("tickers") or [default_ticker]
if not tickers:
tickers = [default_ticker]
yield sse_event("agent_complete", {
"pipeline_id": pipeline_id, "agent": "orchestrator",
"agent_label": "The Orchestrator", "status": "completed",
"message": plan.get("plan_summary", "Plan created."),
"result": {"plan": plan, "tickers": tickers},
})
# -------------------------------------------------------------- #
# 2. Data Quant #
# -------------------------------------------------------------- #
yield sse_event("agent_start", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"agent_label": "The Data Quant", "status": "running",
"message": f"Fetching market data for {', '.join(tickers[:3])}...",
})
data_needs = plan.get("data_needs", {})
period = plan.get("period", "3mo")
try:
for tkr in tickers[:3]:
if data_needs.get("market_data", True):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"message": f"Fetching OHLCV data for {tkr}...",
})
collected_data[f"{tkr}_market"] = get_market_data(tkr, period=period)
if data_needs.get("financial_statements"):
for st in data_needs.get("statement_types", ["income"]):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"message": f"Fetching {st} statement for {tkr}...",
})
collected_data[f"{tkr}_{st}"] = get_financial_statements(tkr, st)
if data_needs.get("news", True):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"message": "Fetching market news...",
})
try:
collected_data["stock_news"] = fetch_stock_news_data(tickers[0])
except Exception:
collected_data["stock_news"] = "No news available"
try:
collected_data["breaking_news"] = fetch_breaking_news_data()
except Exception:
collected_data["breaking_news"] = "No breaking news"
if data_needs.get("benchmarks", True):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"message": "Fetching benchmarks...",
})
try:
collected_data["benchmarks"] = fetch_market_benchmarks_data()
except Exception:
collected_data["benchmarks"] = "No benchmarks"
total_points = sum(
len(v.get("records", [])) if isinstance(v, dict) else 0
for v in collected_data.values()
)
yield sse_event("agent_complete", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"agent_label": "The Data Quant", "status": "completed",
"message": f"Retrieved {total_points} data points from {len(collected_data)} sources.",
"result": {"data_points": total_points, "sources": len(collected_data)},
})
except Exception as exc:
logger.error("Data Quant failed: %s", exc)
yield sse_event("agent_error", {
"pipeline_id": pipeline_id, "agent": "data_quant",
"message": f"Data retrieval error: {exc}",
})
# -------------------------------------------------------------- #
# 3. Analyst — run pre-built statistical models #
# -------------------------------------------------------------- #
yield sse_event("agent_start", {
"pipeline_id": pipeline_id, "agent": "analyst",
"agent_label": "The Analyst", "status": "running",
"message": "Running quantitative models...",
})
primary = tickers[0]
primary_data = collected_data.get(f"{primary}_market", {})
records = primary_data.get("records", [])
from .financial_models import run_model, ALL_MODELS
default_models = ["moving_averages", "rsi", "macd", "bollinger_bands", "risk_metrics", "regression"]
ui_requested = (data_context or {}).get("requested_models") if isinstance(data_context, dict) else None
requested_models = ui_requested or plan.get("models_to_run", default_models)
if not requested_models:
requested_models = default_models
valid_models = [m for m in requested_models if m in ALL_MODELS]
if not valid_models:
valid_models = default_models
model_results: dict[str, Any] = {}
model_period = plan.get("period", "6mo")
try:
for model_name in valid_models:
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "analyst",
"message": f"Running {model_name} on {primary}...",
})
kwargs = {"period": model_period}
if model_name == "correlation" and len(tickers) >= 2:
kwargs["tickers"] = tickers[:5]
res = run_model(model_name, primary, **kwargs)
model_results[model_name] = res
analysis_results = {}
all_signals: list[str] = []
all_interpretations: list[str] = []
for name, res in model_results.items():
if "error" in res:
analysis_results[f"{name}_error"] = res["error"]
continue
for k, v in (res.get("metrics") or {}).items():
analysis_results[f"{name}_{k}"] = v
all_signals.extend(res.get("signals", []))
interp = res.get("interpretation", "")
if interp:
all_interpretations.append(interp)
analysis_results["_signals"] = all_signals
analysis_results["_interpretations"] = all_interpretations
analysis_results["_model_results"] = model_results
analysis_results["_models_run"] = list(model_results.keys())
yield sse_event("agent_complete", {
"pipeline_id": pipeline_id, "agent": "analyst",
"agent_label": "The Analyst", "status": "completed",
"message": (
f"Analysis complete — {len(valid_models)} models, "
f"{len(analysis_results)} metrics, {len(all_signals)} signals."
),
"result": {
"models_run": list(model_results.keys()),
"metrics_count": len(analysis_results),
"signals": all_signals[:8],
},
})
except Exception as exc:
logger.error("Analyst failed: %s", exc)
analysis_results = {"error": str(exc)}
yield sse_event("agent_error", {
"pipeline_id": pipeline_id, "agent": "analyst",
"message": f"Analysis error: {exc}",
})
# -------------------------------------------------------------- #
# 4. Publisher #
# -------------------------------------------------------------- #
yield sse_event("agent_start", {
"pipeline_id": pipeline_id, "agent": "publisher",
"agent_label": "The Publisher", "status": "running",
"message": "Generating artifacts...",
})
artifacts_needed = plan.get("artifacts_needed", {})
try:
if artifacts_needed.get("slides", True):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "publisher",
"message": "Creating slide deck...",
})
from .agent_tools_service import SLIDES_ARTIFACT_SCHEMA, SLIDES_INSTRUCTION
pub_context = {
"ticker": primary,
"company": primary_data.get("company_name", primary),
"price": primary_data.get("current_price"),
"pe": primary_data.get("pe_ratio"),
"market_cap": primary_data.get("market_cap"),
"analysis": {
k: v for k, v in analysis_results.items()
if k != "_stdout" and not isinstance(v, list)
},
"news": str(collected_data.get("stock_news", ""))[:800],
"request": user_prompt,
}
slides_prompt = (
f"{SLIDES_INSTRUCTION}\n\n"
f"Data:\n{json.dumps(pub_context, default=str)[:3000]}\n\n"
f"Return JSON matching: {json.dumps(SLIDES_ARTIFACT_SCHEMA, indent=2)}\n"
)
slides_text = _call_llm(
"You are a presentation generator. Output ONLY valid JSON.",
slides_prompt,
)
slides_artifact = _parse_json(slides_text)
if slides_artifact:
artifacts["slides"] = slides_artifact
if artifacts_needed.get("excel", True):
yield sse_event("agent_progress", {
"pipeline_id": pipeline_id, "agent": "publisher",
"message": "Building Excel model...",
})
artifacts["excel"] = _build_excel_artifact(
primary_data, analysis_results, f"{primary} Financial Model",
)
yield sse_event("agent_complete", {
"pipeline_id": pipeline_id, "agent": "publisher",
"agent_label": "The Publisher", "status": "completed",
"message": f"Generated: {', '.join(artifacts.keys())}",
"result": {
"artifacts": list(artifacts.keys()),
"slides_count": len(artifacts.get("slides", {}).get("slides", [])),
"excel_sheets": len(artifacts.get("excel", {}).get("sheets", [])),
},
})
except Exception as exc:
logger.error("Publisher failed: %s", exc)
yield sse_event("agent_error", {
"pipeline_id": pipeline_id, "agent": "publisher",
"message": f"Artifact generation error: {exc}",
})
# -------------------------------------------------------------- #
# 5. UI Mapper — build charts from model results #
# -------------------------------------------------------------- #
yield sse_event("agent_start", {
"pipeline_id": pipeline_id, "agent": "ui_mapper",
"agent_label": "The UI Mapper", "status": "running",
"message": "Preparing dashboard visualizations...",
})
try:
chart_configs = _build_base_charts(primary, records)
metrics = _build_base_metrics(primary_data)
model_chart_map = {
"moving_averages": {"title": f"{primary} Moving Averages", "chartType": "line",
"yKeys": ["close", "SMA20", "SMA50"], "xKey": "date",
"colors": ["#111111", "#E8440D", "#2563EB"]},
"rsi": {"title": f"{primary} RSI", "chartType": "line",
"yKeys": ["RSI"], "xKey": "date", "colors": ["#7C3AED"]},
"macd": {"title": f"{primary} MACD", "chartType": "bar",
"yKeys": ["MACD", "Signal", "Histogram"], "xKey": "date",
"colors": ["#E8440D", "#2563EB", "#D1D5DB"]},
"bollinger_bands": {"title": f"{primary} Bollinger Bands", "chartType": "line",
"yKeys": ["close", "upper", "mid", "lower"], "xKey": "date",
"colors": ["#111111", "#E8440D", "#888", "#E8440D"]},
"monte_carlo": {"title": f"{primary} Monte Carlo Forecast", "chartType": "line",
"yKeys": ["p5", "p25", "median", "p75", "p95"], "xKey": "day",
"colors": ["#FCA5A5", "#F97316", "#E8440D", "#F97316", "#FCA5A5"]},
"regression": {"title": f"{primary} Trend & Forecast", "chartType": "line",
"yKeys": ["close", "trend"], "xKey": "date",
"colors": ["#111111", "#E8440D"]},
"risk_metrics": {"title": f"{primary} Drawdown", "chartType": "line",
"yKeys": ["cumulative_return", "drawdown"], "xKey": "date",
"colors": ["#16A34A", "#DC2626"]},
}
mr = analysis_results.get("_model_results", {})
for model_name, chart_cfg in model_chart_map.items():
res = mr.get(model_name, {})
cdata = res.get("chart_data")
if not cdata:
continue
chart_configs.append({
"id": f"model_{model_name}",
"title": chart_cfg["title"],
"chartType": chart_cfg["chartType"],
"data": cdata[:60],
"xKey": chart_cfg["xKey"],
"yKeys": chart_cfg["yKeys"],
"colors": chart_cfg["colors"],
})
for model_name, res in mr.items():
if "error" in res:
continue
for sig in (res.get("signals") or [])[:2]:
trend = "up" if any(w in sig.lower() for w in ("bullish", "buy", "above", "positive", "upward", "strong", "undervalued")) else \
"down" if any(w in sig.lower() for w in ("bearish", "sell", "below", "negative", "downward", "overvalued", "overbought")) else "neutral"
metrics.append({"label": model_name.replace("_", " ").title(), "value": sig[:50], "change": "", "trend": trend})
dcf = mr.get("dcf", {})
if dcf.get("metrics"):
iv = dcf["metrics"].get("intrinsic_value_per_share")
up = dcf["metrics"].get("upside_pct")
if iv is not None:
metrics.append({"label": "DCF Intrinsic Value", "value": f"${iv:,.2f}" if isinstance(iv, (int, float)) else str(iv),
"change": f"{up:+.1f}%" if isinstance(up, (int, float)) else "", "trend": "up" if (up or 0) > 0 else "down"})
yield sse_event("agent_complete", {
"pipeline_id": pipeline_id, "agent": "ui_mapper",
"agent_label": "The UI Mapper", "status": "completed",
"message": f"Dashboard ready — {len(chart_configs)} charts, {len(metrics)} metrics.",
"result": {"charts_count": len(chart_configs), "metrics_count": len(metrics)},
})
except Exception as exc:
logger.error("UI Mapper failed: %s", exc)
yield sse_event("agent_error", {
"pipeline_id": pipeline_id, "agent": "ui_mapper",
"message": f"UI mapping error: {exc}",
})
# -------------------------------------------------------------- #
# Final #
# -------------------------------------------------------------- #
try:
interpretations = analysis_results.get("_interpretations", [])
signals_text = "\n".join(analysis_results.get("_signals", [])[:10])
interp_text = "\n".join(interpretations[:6])
summary_prompt = (
f"Summarize this analysis for {', '.join(tickers)}:\n"
f"Request: {user_prompt}\n"
f"Models run: {', '.join(analysis_results.get('_models_run', []))}\n"
f"Key signals:\n{signals_text}\n"
f"Model interpretations:\n{interp_text}\n"
f"Artifacts: {', '.join(artifacts.keys())}\n\n"
f"Write a 3-5 sentence executive summary synthesizing the findings."
)
final_summary = _call_llm(
"You are a concise financial analyst. Write an executive summary.",
summary_prompt,
)
except Exception:
final_summary = f"Analysis pipeline completed for {', '.join(tickers)}."
data_summary = {}
for k, v in collected_data.items():
if isinstance(v, dict) and "records" in v:
data_summary[k] = {
"ticker": v.get("ticker", ""),
"company_name": v.get("company_name", ""),
"current_price": v.get("current_price"),
"records_count": len(v.get("records", [])),
}
else:
data_summary[k] = {"preview": str(v)[:200]}
clean_analysis_final = {
k: v for k, v in analysis_results.items()
if k != "_stdout" and not (isinstance(v, list) and len(v) > 20)
}
yield sse_event("pipeline_complete", {
"pipeline_id": pipeline_id,
"status": "completed",
"summary": final_summary,
"tickers": tickers,
"charts": chart_configs,
"metrics": metrics,
"artifacts": artifacts,
"collected_data": data_summary,
"analysis_results": clean_analysis_final,
})