| import json |
| import re |
| from typing import Any |
|
|
| from models import PRAction |
|
|
|
|
| ALLOWED_CATEGORIES = {"logic", "security", "correctness", "performance", "none"} |
|
|
|
|
| def _strip_markdown_fences(text: str) -> str: |
| text = text.strip() |
| fenced = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL | re.IGNORECASE) |
| if fenced: |
| return fenced.group(1).strip() |
| return text |
|
|
|
|
| def _extract_json_object(text: str) -> dict[str, Any] | None: |
| candidate = _strip_markdown_fences(text) |
| try: |
| parsed = json.loads(candidate) |
| return parsed if isinstance(parsed, dict) else None |
| except json.JSONDecodeError: |
| pass |
|
|
| start = candidate.find("{") |
| if start == -1: |
| return None |
|
|
| depth = 0 |
| in_string = False |
| escape = False |
| for idx in range(start, len(candidate)): |
| char = candidate[idx] |
| if in_string: |
| if escape: |
| escape = False |
| elif char == "\\": |
| escape = True |
| elif char == '"': |
| in_string = False |
| continue |
|
|
| if char == '"': |
| in_string = True |
| elif char == "{": |
| depth += 1 |
| elif char == "}": |
| depth -= 1 |
| if depth == 0: |
| try: |
| parsed = json.loads(candidate[start : idx + 1]) |
| return parsed if isinstance(parsed, dict) else None |
| except json.JSONDecodeError: |
| return None |
| return None |
|
|
|
|
| def _coerce_dict(payload: Any) -> tuple[dict[str, Any], str | None]: |
| if isinstance(payload, dict): |
| if "action" in payload: |
| return _coerce_dict(payload["action"]) |
| for key in ("content", "message", "raw", "text", "response"): |
| value = payload.get(key) |
| if isinstance(value, str): |
| parsed = _extract_json_object(value) |
| if parsed: |
| return parsed, value |
| return payload, None |
|
|
| if isinstance(payload, str): |
| parsed = _extract_json_object(payload) |
| if parsed: |
| return parsed, payload |
| return {"comment": payload}, payload |
|
|
| return {}, None |
|
|
|
|
| def normalize_decision(value: Any, fallback_text: str = "") -> str: |
| text = f"{value or ''} {fallback_text}".lower().replace("-", "_").strip() |
| compact = re.sub(r"[\s_]+", "_", text) |
|
|
| if "escalate" in compact or "security_team" in compact: |
| return "escalate" |
| if "request_changes" in compact or "changes_requested" in compact: |
| return "request_changes" |
| if "do_not_approve" in compact or "cannot_approve" in compact or "not_approve" in compact: |
| return "request_changes" |
| if any(token in compact for token in ("reject", "needs_changes", "require_changes")): |
| return "request_changes" |
| if any(token in compact for token in ("approve", "approved", "accept", "lgtm", "merge")): |
| return "approve" |
| return "request_changes" |
|
|
|
|
| def normalize_issue_category(value: Any, fallback_text: str = "") -> str: |
| explicit = str(value).lower().replace("-", "_") if value is not None else "" |
| if explicit in ALLOWED_CATEGORIES: |
| return explicit |
|
|
| text = f"{explicit} {fallback_text}".lower() |
| if "security" in text or "injection" in text or "secret" in text or "auth" in text: |
| return "security" |
| if ( |
| "logic" in text |
| or "off_by_one" in text |
| or "off-by-one" in text |
| or "pagination" in text |
| or "offset" in text |
| or "page 1" in text |
| or "skips first" in text |
| ): |
| return "logic" |
| if "correct" in text or "bug" in text or "valid" in text: |
| return "correctness" |
| if "performance" in text or "latency" in text or "slow" in text: |
| return "performance" |
| return "none" |
|
|
|
|
| def normalize_action_payload(payload: Any) -> PRAction: |
| data, raw_text = _coerce_dict(payload) |
| comment_value = data.get("comment") or data.get("review") or data.get("feedback") |
| if comment_value is None: |
| comment_value = raw_text or "No detailed comment provided." |
|
|
| if not isinstance(comment_value, str): |
| comment_value = json.dumps(comment_value, ensure_ascii=False) |
|
|
| decision = normalize_decision(data.get("decision") or data.get("verdict"), comment_value) |
| issue_category = normalize_issue_category( |
| data.get("issue_category") or data.get("category") or data.get("issue_type"), |
| comment_value, |
| ) |
|
|
| return PRAction( |
| decision=decision, |
| comment=comment_value.strip() or "No detailed comment provided.", |
| issue_category=issue_category, |
| ) |
|
|