AJAY KASU
Live as_of banner + real rates-driven bond price history
a5e13ab
"""Manager console aggregations."""
from __future__ import annotations
from collections import Counter, defaultdict
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse, Response
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.db import get_db
from app.core.middleware import etag_for
from app.utils.as_of import get_as_of
from app.models.entities import (
Bond,
ConfidenceScore,
CreditSignal,
EvaluatedPrice,
InvestigationCase,
Issuer,
)
from app.schemas.manager import (
ActionTile,
AgingBucket,
AnalystLoadRow,
ManagerDashboard,
ManagerKpi,
PriorityRow,
SectorTile,
)
router = APIRouter()
def _aging_bucket(hours: float) -> str:
if hours < 4:
return "<4h"
if hours < 24:
return "4-24h"
if hours < 72:
return "1-3d"
if hours < 168:
return "3-7d"
return ">7d"
AGING_ORDER = ["<4h", "4-24h", "1-3d", "3-7d", ">7d"]
@router.get("/kpis")
def manager_kpis(request: Request, db: Session = Depends(get_db)) -> Response:
as_of = get_as_of(db)
# ---- Bond-level aggregates (one evaluated price per bond in this demo)
bond_rows = db.execute(
select(Bond, Issuer, EvaluatedPrice, ConfidenceScore)
.join(Issuer, Issuer.id == Bond.issuer_id)
.join(EvaluatedPrice, EvaluatedPrice.bond_id == Bond.id)
.join(ConfidenceScore, ConfidenceScore.evaluated_price_id == EvaluatedPrice.id)
).all()
total = len(bond_rows)
low_conf = sum(1 for _, _, _, cs in bond_rows if cs.score < 4.5)
avg_conf = round(sum(cs.score for _, _, _, cs in bond_rows) / total, 2) if total else 0.0
# ---- Case-level aggregates
open_cases = (
db.query(InvestigationCase)
.filter(InvestigationCase.status != "RESOLVED")
.all()
)
resolved_cases = (
db.query(InvestigationCase)
.filter(InvestigationCase.status == "RESOLVED")
.all()
)
escalations = [c for c in open_cases if c.recommended_action == "ESCALATE"]
auto_accepts = [c for c in resolved_cases if c.recommended_action == "ACCEPT"]
# ---- Sector tiles
neg_issuer_ids = {
s.issuer_id
for s in db.query(CreditSignal).filter(CreditSignal.score < 0).all()
}
sector_agg: dict[str, dict] = defaultdict(
lambda: {"bonds": 0, "sum_conf": 0.0, "low": 0, "neg_issuers": set(), "open": 0}
)
open_by_bond = {c.bond_id: c for c in open_cases}
for b, iss, _ep, cs in bond_rows:
s = sector_agg[iss.sector]
s["bonds"] += 1
s["sum_conf"] += cs.score
if cs.score < 4.5:
s["low"] += 1
if iss.id in neg_issuer_ids:
s["neg_issuers"].add(iss.id)
if b.id in open_by_bond:
s["open"] += 1
sector_tiles = sorted(
[
SectorTile(
sector=name,
bonds=v["bonds"],
avg_confidence=round(v["sum_conf"] / v["bonds"], 2) if v["bonds"] else 0.0,
low_confidence_count=v["low"],
low_confidence_pct=round(100.0 * v["low"] / v["bonds"], 1) if v["bonds"] else 0.0,
negative_issuers=len(v["neg_issuers"]),
open_cases=v["open"],
)
for name, v in sector_agg.items()
],
key=lambda t: (-t.low_confidence_pct, -t.bonds),
)
# ---- Aging buckets
now = datetime.utcnow()
buckets = Counter(
_aging_bucket((now - c.opened_at).total_seconds() / 3600.0) for c in open_cases
)
aging = [AgingBucket(label=b, count=buckets.get(b, 0)) for b in AGING_ORDER]
# ---- Action distribution over all cases (open + resolved)
all_cases = open_cases + resolved_cases
action_counter: Counter[str] = Counter()
for c in all_cases:
action_counter[c.recommended_action or "UNSET"] += 1
actions = [
ActionTile(action=a, count=action_counter.get(a, 0))
for a in ("ACCEPT", "REVIEW", "ESCALATE", "UNSET")
]
# ---- Analyst load
load_agg: dict[str, list[float]] = defaultdict(list)
for c in open_cases:
key = c.assigned_to or "Unassigned"
load_agg[key].append(c.materiality_bps or 0.0)
analyst_load = sorted(
[
AnalystLoadRow(
analyst=k,
open_cases=len(v),
avg_materiality_bps=round(sum(v) / len(v), 1) if v else None,
)
for k, v in load_agg.items()
],
key=lambda r: -r.open_cases,
)
# ---- Priority queue: top open cases by materiality
priority_queue = []
ranked = sorted(
open_cases,
key=lambda c: -(c.materiality_bps or 0.0),
)[:8]
issuer_by_bond = {b.id: iss for b, iss, _, _ in bond_rows}
for c in ranked:
iss = issuer_by_bond.get(c.bond_id)
priority_queue.append(
PriorityRow(
case_id=c.id,
bond_id=c.bond_id,
issuer_name=iss.name if iss else c.bond_id,
recommended_action=c.recommended_action,
materiality_bps=c.materiality_bps,
opened_at=c.opened_at.isoformat(),
age_hours=round((now - c.opened_at).total_seconds() / 3600.0, 1),
trigger=c.trigger,
)
)
# ---- Top-line KPIs
kpis = [
ManagerKpi(label="Bonds monitored", value=str(total)),
ManagerKpi(
label="Low-confidence %",
value=f"{(100.0 * low_conf / total):.0f}%" if total else "—",
hint=f"{low_conf} bonds below 4.5",
tone="warning" if low_conf else "neutral",
),
ManagerKpi(label="Avg. confidence", value=f"{avg_conf:.1f}", hint="weighted across book"),
ManagerKpi(
label="Open cases",
value=str(len(open_cases)),
hint=f"{len(escalations)} flagged ESCALATE",
tone="negative" if escalations else "warning" if open_cases else "neutral",
),
ManagerKpi(
label="Auto-resolved",
value=str(len(auto_accepts)),
hint="accepted without manual review",
tone="positive",
),
ManagerKpi(
label="Negative issuer signals",
value=str(len(neg_issuer_ids)),
hint="issuers with credit signal < 0",
tone="negative" if neg_issuer_ids else "neutral",
),
]
dashboard = ManagerDashboard(
as_of=as_of.isoformat(),
kpis=kpis,
sectors=sector_tiles,
aging=aging,
actions=actions,
analyst_load=analyst_load,
priority_queue=priority_queue,
)
payload = dashboard.model_dump(mode="json")
tag = etag_for(payload)
if request.headers.get("if-none-match") == tag:
return Response(status_code=304, headers={"etag": tag})
return JSONResponse(content=payload, headers={"etag": tag})