| from collections import defaultdict |
| from app.repository import list_high_risk_pairs, latest_runs |
| def shell_analytics(db, limit:int=10): |
| rows=list_high_risk_pairs(db, limit=500); buckets=defaultdict(lambda: {"shell_key":"","pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_or_critical":0}) |
| for r in rows: |
| key=r.shell_key or "unknown"; b=buckets[key]; b["shell_key"]=key; b["pair_count"]+=1; b["avg_final_score"]+=r.final_score; b["max_final_score"]=max(b["max_final_score"], r.final_score) |
| if r.risk_label in ("high","critical"): b["high_or_critical"]+=1 |
| out=[]; |
| for b in buckets.values(): |
| b["avg_final_score"]=b["avg_final_score"]/max(1,b["pair_count"]); out.append(b) |
| out.sort(key=lambda x:x["avg_final_score"], reverse=True); return out[:limit] |
| def diagnostics_summary(db): |
| rows=list_high_risk_pairs(db, limit=500); runs=latest_runs(db, limit=10); labels={"low":0,"medium":0,"high":0,"critical":0} |
| if not rows: return {"pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_plus_critical":0,"label_distribution":labels,"recent_runs":len(runs)} |
| scores=[r.final_score for r in rows] |
| for r in rows: labels[r.risk_label]=labels.get(r.risk_label,0)+1 |
| return {"pair_count":len(rows),"avg_final_score":sum(scores)/len(scores),"max_final_score":max(scores),"high_plus_critical":labels["high"]+labels["critical"],"label_distribution":labels,"recent_runs":len(runs)} |
|
|