File size: 4,637 Bytes
dcee3d3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import json
import re
from typing import Any
from models import PRAction
ALLOWED_CATEGORIES = {"logic", "security", "correctness", "performance", "none"}
def _strip_markdown_fences(text: str) -> str:
text = text.strip()
fenced = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL | re.IGNORECASE)
if fenced:
return fenced.group(1).strip()
return text
def _extract_json_object(text: str) -> dict[str, Any] | None:
candidate = _strip_markdown_fences(text)
try:
parsed = json.loads(candidate)
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
pass
start = candidate.find("{")
if start == -1:
return None
depth = 0
in_string = False
escape = False
for idx in range(start, len(candidate)):
char = candidate[idx]
if in_string:
if escape:
escape = False
elif char == "\\":
escape = True
elif char == '"':
in_string = False
continue
if char == '"':
in_string = True
elif char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
try:
parsed = json.loads(candidate[start : idx + 1])
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
return None
return None
def _coerce_dict(payload: Any) -> tuple[dict[str, Any], str | None]:
if isinstance(payload, dict):
if "action" in payload:
return _coerce_dict(payload["action"])
for key in ("content", "message", "raw", "text", "response"):
value = payload.get(key)
if isinstance(value, str):
parsed = _extract_json_object(value)
if parsed:
return parsed, value
return payload, None
if isinstance(payload, str):
parsed = _extract_json_object(payload)
if parsed:
return parsed, payload
return {"comment": payload}, payload
return {}, None
def normalize_decision(value: Any, fallback_text: str = "") -> str:
text = f"{value or ''} {fallback_text}".lower().replace("-", "_").strip()
compact = re.sub(r"[\s_]+", "_", text)
if "escalate" in compact or "security_team" in compact:
return "escalate"
if "request_changes" in compact or "changes_requested" in compact:
return "request_changes"
if "do_not_approve" in compact or "cannot_approve" in compact or "not_approve" in compact:
return "request_changes"
if any(token in compact for token in ("reject", "needs_changes", "require_changes")):
return "request_changes"
if any(token in compact for token in ("approve", "approved", "accept", "lgtm", "merge")):
return "approve"
return "request_changes"
def normalize_issue_category(value: Any, fallback_text: str = "") -> str:
explicit = str(value).lower().replace("-", "_") if value is not None else ""
if explicit in ALLOWED_CATEGORIES:
return explicit
text = f"{explicit} {fallback_text}".lower()
if "security" in text or "injection" in text or "secret" in text or "auth" in text:
return "security"
if (
"logic" in text
or "off_by_one" in text
or "off-by-one" in text
or "pagination" in text
or "offset" in text
or "page 1" in text
or "skips first" in text
):
return "logic"
if "correct" in text or "bug" in text or "valid" in text:
return "correctness"
if "performance" in text or "latency" in text or "slow" in text:
return "performance"
return "none"
def normalize_action_payload(payload: Any) -> PRAction:
data, raw_text = _coerce_dict(payload)
comment_value = data.get("comment") or data.get("review") or data.get("feedback")
if comment_value is None:
comment_value = raw_text or "No detailed comment provided."
if not isinstance(comment_value, str):
comment_value = json.dumps(comment_value, ensure_ascii=False)
decision = normalize_decision(data.get("decision") or data.get("verdict"), comment_value)
issue_category = normalize_issue_category(
data.get("issue_category") or data.get("category") or data.get("issue_type"),
comment_value,
)
return PRAction(
decision=decision,
comment=comment_value.strip() or "No detailed comment provided.",
issue_category=issue_category,
)
|