| """Review Grader System. |
| |
| Implements programmatic sub-scoring logic for evaluating agent |
| security actions against internal semantic criteria. |
| """ |
|
|
| from typing import Tuple, Dict, Any |
|
|
| SCORE_BUG_IDENTIFIED = 0.20 |
| SCORE_BUG_TYPE = 0.20 |
| SCORE_BUG_LOCATION = 0.10 |
| SCORE_DESC_QUALITY = 0.25 |
| SCORE_FIX_QUALITY = 0.15 |
| SCORE_SEV_EXACT = 0.10 |
| SCORE_SEV_PARTIAL = 0.05 |
|
|
| KEYWORD_HIT_TARGET = 3.0 |
| PENALTY_THRESHOLD = 0.5 |
| PENALTY_MULTIPLIER = 0.2 |
|
|
|
|
| def grade_action(action: Dict[str, Any], task: Dict[str, Any]) -> Tuple[float, Dict[str, float]]: |
| """Evaluate an action against the task definition. |
| |
| Args: |
| action: The structured payload proposed by the AI agent. |
| task: The dictionary blueprint detailing the expected vulnerability. |
| |
| Returns: |
| A tuple of the normalized aggregate reward and the individual component breakdown. |
| """ |
| reward = 0.0 |
| breakdown: Dict[str, float] = {} |
|
|
| try: |
| |
| if action.get("bug_identified"): |
| reward += SCORE_BUG_IDENTIFIED |
| breakdown["bug_identified"] = SCORE_BUG_IDENTIFIED |
| else: |
| breakdown["bug_identified"] = 0.00 |
| |
| return max(0.0, min(1.0, reward)), breakdown |
|
|
| |
| action_type = action.get("bug_type", "").lower().replace("-", " ").replace("_", " ") |
| task_type = task["bug_type"].lower().replace("-", " ").replace("_", " ") |
| if task_type in action_type or action_type in task_type: |
| reward += SCORE_BUG_TYPE |
| breakdown["bug_type"] = SCORE_BUG_TYPE |
| else: |
| breakdown["bug_type"] = 0.00 |
|
|
| |
| action_location = action.get("bug_location", "").lower() |
| location_keywords = [w for w in task["bug_location"].lower().split() if len(w) > 3] |
| if location_keywords: |
| matched = sum(1 for kw in location_keywords if kw in action_location) |
| loc_score = round(SCORE_BUG_LOCATION * (matched / len(location_keywords)), 4) |
| else: |
| loc_score = 0.0 |
| |
| reward += loc_score |
| breakdown["bug_location"] = loc_score |
|
|
| |
| description = action.get("bug_description", "").lower() |
| desc_score = 0.0 |
| if len(description) >= 20: |
| task_keywords = task["keywords"] |
| target = task.get("keyword_target_override", KEYWORD_HIT_TARGET) |
| matched_kw = [kw for kw in task_keywords if kw in description] |
| desc_score = round(min(SCORE_DESC_QUALITY, SCORE_DESC_QUALITY * (len(matched_kw) / target)), 4) |
| |
| breakdown["description_quality"] = desc_score |
| reward += desc_score |
|
|
| |
| fix = action.get("suggested_fix", "").lower() |
| fix_score = 0.0 |
| if len(fix) >= 10: |
| fix_patterns = task["fix_patterns"] |
| matched_fix = [p for p in fix_patterns if p.lower() in fix] |
| fix_score = round(min(SCORE_FIX_QUALITY, SCORE_FIX_QUALITY * len(matched_fix)), 4) |
| |
| breakdown["fix_quality"] = fix_score |
| reward += fix_score |
|
|
| |
| action_sev = action.get("severity", "").lower() |
| task_sev = task["severity"].lower() |
| if action_sev == task_sev: |
| sev_score = SCORE_SEV_EXACT |
| elif action_sev in ("high", "critical") and task_sev in ("high", "critical"): |
| sev_score = SCORE_SEV_PARTIAL |
| else: |
| sev_score = 0.00 |
| |
| breakdown["severity"] = sev_score |
| reward += sev_score |
|
|
| |
| words = description.split() |
| unique_ratio = len(set(words)) / len(words) if words else 1.0 |
| if unique_ratio < PENALTY_THRESHOLD: |
| reward *= PENALTY_MULTIPLIER |
| breakdown["stuffing_penalty_multiplier"] = PENALTY_MULTIPLIER |
| for k in list(breakdown.keys()): |
| if k != "stuffing_penalty_multiplier": |
| breakdown[k] = round(breakdown[k] * PENALTY_MULTIPLIER, 4) |
|
|
| return max(0.0, min(1.0, round(reward, 4))), breakdown |
|
|
| except KeyError as exc: |
| raise RuntimeError(f"Missing mandatory schema key in task definition: {exc}") from exc |
|
|