PR_Review_Negotiation_Environment / server /action_normalizer.py
3v324v23's picture
feat: refresh dashboard UX and backend integration
dcee3d3
import json
import re
from typing import Any
from models import PRAction
ALLOWED_CATEGORIES = {"logic", "security", "correctness", "performance", "none"}
def _strip_markdown_fences(text: str) -> str:
text = text.strip()
fenced = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL | re.IGNORECASE)
if fenced:
return fenced.group(1).strip()
return text
def _extract_json_object(text: str) -> dict[str, Any] | None:
candidate = _strip_markdown_fences(text)
try:
parsed = json.loads(candidate)
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
pass
start = candidate.find("{")
if start == -1:
return None
depth = 0
in_string = False
escape = False
for idx in range(start, len(candidate)):
char = candidate[idx]
if in_string:
if escape:
escape = False
elif char == "\\":
escape = True
elif char == '"':
in_string = False
continue
if char == '"':
in_string = True
elif char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
try:
parsed = json.loads(candidate[start : idx + 1])
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
return None
return None
def _coerce_dict(payload: Any) -> tuple[dict[str, Any], str | None]:
if isinstance(payload, dict):
if "action" in payload:
return _coerce_dict(payload["action"])
for key in ("content", "message", "raw", "text", "response"):
value = payload.get(key)
if isinstance(value, str):
parsed = _extract_json_object(value)
if parsed:
return parsed, value
return payload, None
if isinstance(payload, str):
parsed = _extract_json_object(payload)
if parsed:
return parsed, payload
return {"comment": payload}, payload
return {}, None
def normalize_decision(value: Any, fallback_text: str = "") -> str:
text = f"{value or ''} {fallback_text}".lower().replace("-", "_").strip()
compact = re.sub(r"[\s_]+", "_", text)
if "escalate" in compact or "security_team" in compact:
return "escalate"
if "request_changes" in compact or "changes_requested" in compact:
return "request_changes"
if "do_not_approve" in compact or "cannot_approve" in compact or "not_approve" in compact:
return "request_changes"
if any(token in compact for token in ("reject", "needs_changes", "require_changes")):
return "request_changes"
if any(token in compact for token in ("approve", "approved", "accept", "lgtm", "merge")):
return "approve"
return "request_changes"
def normalize_issue_category(value: Any, fallback_text: str = "") -> str:
explicit = str(value).lower().replace("-", "_") if value is not None else ""
if explicit in ALLOWED_CATEGORIES:
return explicit
text = f"{explicit} {fallback_text}".lower()
if "security" in text or "injection" in text or "secret" in text or "auth" in text:
return "security"
if (
"logic" in text
or "off_by_one" in text
or "off-by-one" in text
or "pagination" in text
or "offset" in text
or "page 1" in text
or "skips first" in text
):
return "logic"
if "correct" in text or "bug" in text or "valid" in text:
return "correctness"
if "performance" in text or "latency" in text or "slow" in text:
return "performance"
return "none"
def normalize_action_payload(payload: Any) -> PRAction:
data, raw_text = _coerce_dict(payload)
comment_value = data.get("comment") or data.get("review") or data.get("feedback")
if comment_value is None:
comment_value = raw_text or "No detailed comment provided."
if not isinstance(comment_value, str):
comment_value = json.dumps(comment_value, ensure_ascii=False)
decision = normalize_decision(data.get("decision") or data.get("verdict"), comment_value)
issue_category = normalize_issue_category(
data.get("issue_category") or data.get("category") or data.get("issue_type"),
comment_value,
)
return PRAction(
decision=decision,
comment=comment_value.strip() or "No detailed comment provided.",
issue_category=issue_category,
)