from collections import defaultdict from app.repository import list_high_risk_pairs, latest_runs def shell_analytics(db, limit:int=10): rows=list_high_risk_pairs(db, limit=500); buckets=defaultdict(lambda: {"shell_key":"","pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_or_critical":0}) for r in rows: key=r.shell_key or "unknown"; b=buckets[key]; b["shell_key"]=key; b["pair_count"]+=1; b["avg_final_score"]+=r.final_score; b["max_final_score"]=max(b["max_final_score"], r.final_score) if r.risk_label in ("high","critical"): b["high_or_critical"]+=1 out=[]; for b in buckets.values(): b["avg_final_score"]=b["avg_final_score"]/max(1,b["pair_count"]); out.append(b) out.sort(key=lambda x:x["avg_final_score"], reverse=True); return out[:limit] def diagnostics_summary(db): rows=list_high_risk_pairs(db, limit=500); runs=latest_runs(db, limit=10); labels={"low":0,"medium":0,"high":0,"critical":0} if not rows: return {"pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_plus_critical":0,"label_distribution":labels,"recent_runs":len(runs)} scores=[r.final_score for r in rows] for r in rows: labels[r.risk_label]=labels.get(r.risk_label,0)+1 return {"pair_count":len(rows),"avg_final_score":sum(scores)/len(scores),"max_final_score":max(scores),"high_plus_critical":labels["high"]+labels["critical"],"label_distribution":labels,"recent_runs":len(runs)}