"""Manager console aggregations.""" from __future__ import annotations from collections import Counter, defaultdict from datetime import datetime from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse, Response from sqlalchemy import select from sqlalchemy.orm import Session from app.core.db import get_db from app.core.middleware import etag_for from app.utils.as_of import get_as_of from app.models.entities import ( Bond, ConfidenceScore, CreditSignal, EvaluatedPrice, InvestigationCase, Issuer, ) from app.schemas.manager import ( ActionTile, AgingBucket, AnalystLoadRow, ManagerDashboard, ManagerKpi, PriorityRow, SectorTile, ) router = APIRouter() def _aging_bucket(hours: float) -> str: if hours < 4: return "<4h" if hours < 24: return "4-24h" if hours < 72: return "1-3d" if hours < 168: return "3-7d" return ">7d" AGING_ORDER = ["<4h", "4-24h", "1-3d", "3-7d", ">7d"] @router.get("/kpis") def manager_kpis(request: Request, db: Session = Depends(get_db)) -> Response: as_of = get_as_of(db) # ---- Bond-level aggregates (one evaluated price per bond in this demo) bond_rows = db.execute( select(Bond, Issuer, EvaluatedPrice, ConfidenceScore) .join(Issuer, Issuer.id == Bond.issuer_id) .join(EvaluatedPrice, EvaluatedPrice.bond_id == Bond.id) .join(ConfidenceScore, ConfidenceScore.evaluated_price_id == EvaluatedPrice.id) ).all() total = len(bond_rows) low_conf = sum(1 for _, _, _, cs in bond_rows if cs.score < 4.5) avg_conf = round(sum(cs.score for _, _, _, cs in bond_rows) / total, 2) if total else 0.0 # ---- Case-level aggregates open_cases = ( db.query(InvestigationCase) .filter(InvestigationCase.status != "RESOLVED") .all() ) resolved_cases = ( db.query(InvestigationCase) .filter(InvestigationCase.status == "RESOLVED") .all() ) escalations = [c for c in open_cases if c.recommended_action == "ESCALATE"] auto_accepts = [c for c in resolved_cases if c.recommended_action == "ACCEPT"] # ---- Sector tiles neg_issuer_ids = { s.issuer_id for s in db.query(CreditSignal).filter(CreditSignal.score < 0).all() } sector_agg: dict[str, dict] = defaultdict( lambda: {"bonds": 0, "sum_conf": 0.0, "low": 0, "neg_issuers": set(), "open": 0} ) open_by_bond = {c.bond_id: c for c in open_cases} for b, iss, _ep, cs in bond_rows: s = sector_agg[iss.sector] s["bonds"] += 1 s["sum_conf"] += cs.score if cs.score < 4.5: s["low"] += 1 if iss.id in neg_issuer_ids: s["neg_issuers"].add(iss.id) if b.id in open_by_bond: s["open"] += 1 sector_tiles = sorted( [ SectorTile( sector=name, bonds=v["bonds"], avg_confidence=round(v["sum_conf"] / v["bonds"], 2) if v["bonds"] else 0.0, low_confidence_count=v["low"], low_confidence_pct=round(100.0 * v["low"] / v["bonds"], 1) if v["bonds"] else 0.0, negative_issuers=len(v["neg_issuers"]), open_cases=v["open"], ) for name, v in sector_agg.items() ], key=lambda t: (-t.low_confidence_pct, -t.bonds), ) # ---- Aging buckets now = datetime.utcnow() buckets = Counter( _aging_bucket((now - c.opened_at).total_seconds() / 3600.0) for c in open_cases ) aging = [AgingBucket(label=b, count=buckets.get(b, 0)) for b in AGING_ORDER] # ---- Action distribution over all cases (open + resolved) all_cases = open_cases + resolved_cases action_counter: Counter[str] = Counter() for c in all_cases: action_counter[c.recommended_action or "UNSET"] += 1 actions = [ ActionTile(action=a, count=action_counter.get(a, 0)) for a in ("ACCEPT", "REVIEW", "ESCALATE", "UNSET") ] # ---- Analyst load load_agg: dict[str, list[float]] = defaultdict(list) for c in open_cases: key = c.assigned_to or "Unassigned" load_agg[key].append(c.materiality_bps or 0.0) analyst_load = sorted( [ AnalystLoadRow( analyst=k, open_cases=len(v), avg_materiality_bps=round(sum(v) / len(v), 1) if v else None, ) for k, v in load_agg.items() ], key=lambda r: -r.open_cases, ) # ---- Priority queue: top open cases by materiality priority_queue = [] ranked = sorted( open_cases, key=lambda c: -(c.materiality_bps or 0.0), )[:8] issuer_by_bond = {b.id: iss for b, iss, _, _ in bond_rows} for c in ranked: iss = issuer_by_bond.get(c.bond_id) priority_queue.append( PriorityRow( case_id=c.id, bond_id=c.bond_id, issuer_name=iss.name if iss else c.bond_id, recommended_action=c.recommended_action, materiality_bps=c.materiality_bps, opened_at=c.opened_at.isoformat(), age_hours=round((now - c.opened_at).total_seconds() / 3600.0, 1), trigger=c.trigger, ) ) # ---- Top-line KPIs kpis = [ ManagerKpi(label="Bonds monitored", value=str(total)), ManagerKpi( label="Low-confidence %", value=f"{(100.0 * low_conf / total):.0f}%" if total else "—", hint=f"{low_conf} bonds below 4.5", tone="warning" if low_conf else "neutral", ), ManagerKpi(label="Avg. confidence", value=f"{avg_conf:.1f}", hint="weighted across book"), ManagerKpi( label="Open cases", value=str(len(open_cases)), hint=f"{len(escalations)} flagged ESCALATE", tone="negative" if escalations else "warning" if open_cases else "neutral", ), ManagerKpi( label="Auto-resolved", value=str(len(auto_accepts)), hint="accepted without manual review", tone="positive", ), ManagerKpi( label="Negative issuer signals", value=str(len(neg_issuer_ids)), hint="issuers with credit signal < 0", tone="negative" if neg_issuer_ids else "neutral", ), ] dashboard = ManagerDashboard( as_of=as_of.isoformat(), kpis=kpis, sectors=sector_tiles, aging=aging, actions=actions, analyst_load=analyst_load, priority_queue=priority_queue, ) payload = dashboard.model_dump(mode="json") tag = etag_for(payload) if request.headers.get("if-none-match") == tag: return Response(status_code=304, headers={"etag": tag}) return JSONResponse(content=payload, headers={"etag": tag})