File size: 1,449 Bytes
1ce499f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from collections import defaultdict
from app.repository import list_high_risk_pairs, latest_runs
def shell_analytics(db, limit:int=10):
rows=list_high_risk_pairs(db, limit=500); buckets=defaultdict(lambda: {"shell_key":"","pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_or_critical":0})
for r in rows:
key=r.shell_key or "unknown"; b=buckets[key]; b["shell_key"]=key; b["pair_count"]+=1; b["avg_final_score"]+=r.final_score; b["max_final_score"]=max(b["max_final_score"], r.final_score)
if r.risk_label in ("high","critical"): b["high_or_critical"]+=1
out=[];
for b in buckets.values():
b["avg_final_score"]=b["avg_final_score"]/max(1,b["pair_count"]); out.append(b)
out.sort(key=lambda x:x["avg_final_score"], reverse=True); return out[:limit]
def diagnostics_summary(db):
rows=list_high_risk_pairs(db, limit=500); runs=latest_runs(db, limit=10); labels={"low":0,"medium":0,"high":0,"critical":0}
if not rows: return {"pair_count":0,"avg_final_score":0.0,"max_final_score":0.0,"high_plus_critical":0,"label_distribution":labels,"recent_runs":len(runs)}
scores=[r.final_score for r in rows]
for r in rows: labels[r.risk_label]=labels.get(r.risk_label,0)+1
return {"pair_count":len(rows),"avg_final_score":sum(scores)/len(scores),"max_final_score":max(scores),"high_plus_critical":labels["high"]+labels["critical"],"label_distribution":labels,"recent_runs":len(runs)}
|