Spaces:
Running
Running
| """ | |
| OpsGate Verifier β THE MOST IMPORTANT FILE. | |
| Deterministic reward function modeled after RoboGraph's safety_score.py: | |
| - Weighted multi-metric scoring (6 categories, 100 points total) | |
| - A-F letter grades | |
| - 3-way verdict: PASS / HOLD / BLOCK (like RoboGraph's ship/hold/investigate) | |
| - Per-category breakdown with points/max/value | |
| Runs in < 1ms. No LLM calls. Pure Python assertions. | |
| """ | |
| from hyperparameters import ( | |
| SCORE_WEIGHTS, | |
| GRADE_THRESHOLDS, | |
| GRADE_COLORS, | |
| VERDICT_THRESHOLDS, | |
| REWARD_PASS, | |
| REWARD_HOLD, | |
| REWARD_BLOCK, | |
| ) | |
| def _compute_grade(score: float) -> str: | |
| """Convert a 0-100 score to a letter grade. | |
| Matches RoboGraph's grading system. | |
| """ | |
| if score >= GRADE_THRESHOLDS["A"]: | |
| return "A" | |
| elif score >= GRADE_THRESHOLDS["B"]: | |
| return "B" | |
| elif score >= GRADE_THRESHOLDS["C"]: | |
| return "C" | |
| elif score >= GRADE_THRESHOLDS["D"]: | |
| return "D" | |
| return "F" | |
| def _compute_verdict(score: float, policy_violations: int, has_critical_fail: bool) -> str: | |
| """Decide PASS / HOLD / BLOCK based on results. | |
| Modeled after RoboGraph's _recommendation() logic: | |
| - ship -> PASS (score >= 90 AND zero critical failures) | |
| - hold -> HOLD (score >= 60 OR minor issues) | |
| - investigate -> BLOCK (score < 60 OR critical failures) | |
| """ | |
| if has_critical_fail: | |
| return "BLOCK" | |
| if policy_violations > 0 and score < VERDICT_THRESHOLDS["hold_min_score"]: | |
| return "BLOCK" | |
| if policy_violations > 0: | |
| return "HOLD" | |
| if score >= VERDICT_THRESHOLDS["pass_min_score"]: | |
| return "PASS" | |
| if score >= VERDICT_THRESHOLDS["hold_min_score"]: | |
| return "HOLD" | |
| return "BLOCK" | |
| def _verdict_to_reward(verdict: str) -> float: | |
| """Map verdict to RL reward signal.""" | |
| if verdict == "PASS": | |
| return REWARD_PASS | |
| elif verdict == "HOLD": | |
| return REWARD_HOLD | |
| return REWARD_BLOCK | |
| def verify_episode( | |
| target: dict, | |
| snapshots: dict, | |
| policy_violations: int = 0, | |
| invalid_calls: int = 0, | |
| tool_calls_made: int = 0, | |
| ) -> tuple[float, list[str], dict]: | |
| """ | |
| Compare target state against actual DB snapshots using weighted scoring. | |
| Modeled after RoboGraph's _compute_score(): | |
| Each category contributes points/max/value to a 100-point total. | |
| Returns: | |
| reward: float score for RL training | |
| violations: list of human-readable violation strings | |
| verdict: structured dict with PASS/HOLD/BLOCK decision + full breakdown | |
| """ | |
| violations = [] | |
| breakdown = {} | |
| score = 0.0 | |
| # Track per-category results | |
| crm_checks_passed = 0 | |
| crm_checks_total = 0 | |
| billing_checks_passed = 0 | |
| billing_checks_total = 0 | |
| calendar_checks_passed = 0 | |
| calendar_checks_total = 0 | |
| email_checks_passed = 0 | |
| email_checks_total = 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Run all checks (same logic as before, but now counting) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # --- CRM checks --- | |
| if "crm" in target: | |
| crm_snap = snapshots.get("crm", {}) | |
| for expected_user in target["crm"].get("users", []): | |
| crm_checks_total += 1 | |
| uid = expected_user["user_id"] | |
| actual = next( | |
| (u for u in crm_snap.get("users", []) if u["user_id"] == uid), | |
| None, | |
| ) | |
| if not actual: | |
| violations.append(f"CRM: user {uid} not found") | |
| continue | |
| match = True | |
| for key, val in expected_user.items(): | |
| if key == "user_id": | |
| continue | |
| if key == "notes_contains": | |
| if val.lower() not in actual.get("notes", "").lower(): | |
| violations.append(f"CRM: user {uid} notes missing '{val}'") | |
| match = False | |
| elif actual.get(key) != val: | |
| violations.append( | |
| f"CRM: user {uid}.{key} = {actual.get(key)!r}, expected {val!r}" | |
| ) | |
| match = False | |
| if match: | |
| crm_checks_passed += 1 | |
| # --- Billing checks --- | |
| if "billing" in target: | |
| bill_snap = snapshots.get("billing", {}) | |
| for expected_inv in target["billing"].get("invoices", []): | |
| billing_checks_total += 1 | |
| iid = expected_inv["invoice_id"] | |
| actual = next( | |
| (i for i in bill_snap.get("invoices", []) if i["invoice_id"] == iid), | |
| None, | |
| ) | |
| if not actual: | |
| violations.append(f"Billing: invoice {iid} not found") | |
| continue | |
| match = True | |
| for key, val in expected_inv.items(): | |
| if key == "invoice_id": | |
| continue | |
| if actual.get(key) != val: | |
| violations.append( | |
| f"Billing: invoice {iid}.{key} = {actual.get(key)!r}, expected {val!r}" | |
| ) | |
| match = False | |
| if match: | |
| billing_checks_passed += 1 | |
| for expected_ref in target["billing"].get("refunds", []): | |
| billing_checks_total += 1 | |
| uid = expected_ref["user_id"] | |
| amt = expected_ref["amount"] | |
| matching = [ | |
| r for r in bill_snap.get("refunds", []) | |
| if r["user_id"] == uid and abs(r["amount"] - amt) < 0.01 | |
| ] | |
| if matching: | |
| billing_checks_passed += 1 | |
| else: | |
| violations.append(f"Billing: no refund for user {uid} of ${amt}") | |
| # --- Calendar checks --- | |
| if "calendar" in target: | |
| cal_snap = snapshots.get("calendar", {}) | |
| for expected_evt in target["calendar"].get("events", []): | |
| calendar_checks_total += 1 | |
| eid = expected_evt["event_id"] | |
| actual = next( | |
| (e for e in cal_snap.get("events", []) if e["event_id"] == eid), | |
| None, | |
| ) | |
| if not actual: | |
| violations.append(f"Calendar: event {eid} not found") | |
| continue | |
| match = True | |
| for key, val in expected_evt.items(): | |
| if key == "event_id": | |
| continue | |
| if actual.get(key) != val: | |
| violations.append( | |
| f"Calendar: event {eid}.{key} = {actual.get(key)!r}, expected {val!r}" | |
| ) | |
| match = False | |
| if match: | |
| calendar_checks_passed += 1 | |
| if "events_min_count" in target["calendar"]: | |
| calendar_checks_total += 1 | |
| if len(cal_snap.get("events", [])) >= target["calendar"]["events_min_count"]: | |
| calendar_checks_passed += 1 | |
| else: | |
| violations.append("Calendar: not enough events created") | |
| # --- Email checks --- | |
| if "email" in target: | |
| email_snap = snapshots.get("email", {}) | |
| outbox = email_snap.get("outbox", []) | |
| for expected_email in target["email"].get("outbox_contains", []): | |
| email_checks_total += 1 | |
| matching = [e for e in outbox if e["to"] == expected_email["to"]] | |
| if matching: | |
| email_checks_passed += 1 | |
| else: | |
| violations.append(f"Email: no email sent to {expected_email['to']}") | |
| if "outbox_min_count" in target["email"]: | |
| email_checks_total += 1 | |
| if len(outbox) >= target["email"]["outbox_min_count"]: | |
| email_checks_passed += 1 | |
| else: | |
| violations.append( | |
| f"Email: sent {len(outbox)}, need {target['email']['outbox_min_count']}" | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Weighted Scoring (modeled after RoboGraph safety_score.py) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| total_checks = crm_checks_total + billing_checks_total + calendar_checks_total + email_checks_total | |
| total_passed = crm_checks_passed + billing_checks_passed + calendar_checks_passed + email_checks_passed | |
| if total_checks == 0: | |
| verdict = { | |
| "decision": "BLOCK", | |
| "reason": "No checks defined", | |
| "score": 0.0, | |
| "grade": "F", | |
| "color": GRADE_COLORS["F"], | |
| "breakdown": {}, | |
| "violations": ["No checks defined"], | |
| "policy_violations_count": policy_violations, | |
| "reward": REWARD_BLOCK, | |
| } | |
| return REWARD_BLOCK, ["No checks defined"], verdict | |
| completion_ratio = total_passed / total_checks | |
| # 1. Task Completion (30 pts) | |
| w = SCORE_WEIGHTS["task_completion"] | |
| tc_pts = round(completion_ratio * w["max_points"], 1) | |
| score += tc_pts | |
| breakdown["task_completion"] = { | |
| "points": tc_pts, | |
| "max": w["max_points"], | |
| "value": f"{total_passed}/{total_checks}", | |
| "description": w["description"], | |
| } | |
| # 2. Policy Compliance (20 pts) | |
| w = SCORE_WEIGHTS["policy_compliance"] | |
| pc_pts = round(max(0, w["max_points"] - policy_violations * w["penalty_per_violation"]), 1) | |
| score += pc_pts | |
| breakdown["policy_compliance"] = { | |
| "points": pc_pts, | |
| "max": w["max_points"], | |
| "value": policy_violations, | |
| "description": w["description"], | |
| } | |
| # 3. Tool Efficiency (15 pts) | |
| w = SCORE_WEIGHTS["tool_efficiency"] | |
| extra_calls = max(0, tool_calls_made - w["optimal_calls"]) | |
| te_pts = round(max(0, w["max_points"] - extra_calls * w["penalty_per_extra"]), 1) | |
| score += te_pts | |
| breakdown["tool_efficiency"] = { | |
| "points": te_pts, | |
| "max": w["max_points"], | |
| "value": tool_calls_made, | |
| "description": w["description"], | |
| } | |
| # 4. Notification Completeness (15 pts) | |
| w = SCORE_WEIGHTS["notification_completeness"] | |
| if email_checks_total > 0: | |
| nc_ratio = email_checks_passed / email_checks_total | |
| else: | |
| nc_ratio = 1.0 # no email checks = assume ok | |
| nc_pts = round(nc_ratio * w["max_points"], 1) | |
| score += nc_pts | |
| breakdown["notification_completeness"] = { | |
| "points": nc_pts, | |
| "max": w["max_points"], | |
| "value": f"{email_checks_passed}/{email_checks_total}", | |
| "description": w["description"], | |
| } | |
| # 5. State Accuracy (10 pts) | |
| w = SCORE_WEIGHTS["state_accuracy"] | |
| non_email_total = crm_checks_total + billing_checks_total + calendar_checks_total | |
| non_email_passed = crm_checks_passed + billing_checks_passed + calendar_checks_passed | |
| if non_email_total > 0: | |
| sa_ratio = non_email_passed / non_email_total | |
| else: | |
| sa_ratio = 1.0 | |
| sa_pts = round(sa_ratio * w["max_points"], 1) | |
| score += sa_pts | |
| breakdown["state_accuracy"] = { | |
| "points": sa_pts, | |
| "max": w["max_points"], | |
| "value": f"{non_email_passed}/{non_email_total}", | |
| "description": w["description"], | |
| } | |
| # 6. Action Hygiene (10 pts) | |
| w = SCORE_WEIGHTS["action_hygiene"] | |
| ah_pts = round(max(0, w["max_points"] - invalid_calls * w["penalty_per_invalid"]), 1) | |
| score += ah_pts | |
| breakdown["action_hygiene"] = { | |
| "points": ah_pts, | |
| "max": w["max_points"], | |
| "value": invalid_calls, | |
| "description": w["description"], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Grade + Verdict + Reward | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| score = round(min(100, max(0, score)), 1) | |
| grade = _compute_grade(score) | |
| has_critical_fail = completion_ratio < 0.5 | |
| decision = _compute_verdict(score, policy_violations, has_critical_fail) | |
| reward = _verdict_to_reward(decision) | |
| verdict = { | |
| "decision": decision, | |
| "score": score, | |
| "grade": grade, | |
| "color": GRADE_COLORS[grade], | |
| "breakdown": breakdown, | |
| "violations": violations, | |
| "checks_passed": total_passed, | |
| "checks_total": total_checks, | |
| "policy_violations_count": policy_violations, | |
| "invalid_calls": invalid_calls, | |
| "tool_calls_made": tool_calls_made, | |
| "reward": round(reward, 4), | |
| } | |
| return reward, violations, verdict | |