File size: 4,637 Bytes
dcee3d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import json
import re
from typing import Any

from models import PRAction


ALLOWED_CATEGORIES = {"logic", "security", "correctness", "performance", "none"}


def _strip_markdown_fences(text: str) -> str:
    text = text.strip()
    fenced = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL | re.IGNORECASE)
    if fenced:
        return fenced.group(1).strip()
    return text


def _extract_json_object(text: str) -> dict[str, Any] | None:
    candidate = _strip_markdown_fences(text)
    try:
        parsed = json.loads(candidate)
        return parsed if isinstance(parsed, dict) else None
    except json.JSONDecodeError:
        pass

    start = candidate.find("{")
    if start == -1:
        return None

    depth = 0
    in_string = False
    escape = False
    for idx in range(start, len(candidate)):
        char = candidate[idx]
        if in_string:
            if escape:
                escape = False
            elif char == "\\":
                escape = True
            elif char == '"':
                in_string = False
            continue

        if char == '"':
            in_string = True
        elif char == "{":
            depth += 1
        elif char == "}":
            depth -= 1
            if depth == 0:
                try:
                    parsed = json.loads(candidate[start : idx + 1])
                    return parsed if isinstance(parsed, dict) else None
                except json.JSONDecodeError:
                    return None
    return None


def _coerce_dict(payload: Any) -> tuple[dict[str, Any], str | None]:
    if isinstance(payload, dict):
        if "action" in payload:
            return _coerce_dict(payload["action"])
        for key in ("content", "message", "raw", "text", "response"):
            value = payload.get(key)
            if isinstance(value, str):
                parsed = _extract_json_object(value)
                if parsed:
                    return parsed, value
        return payload, None

    if isinstance(payload, str):
        parsed = _extract_json_object(payload)
        if parsed:
            return parsed, payload
        return {"comment": payload}, payload

    return {}, None


def normalize_decision(value: Any, fallback_text: str = "") -> str:
    text = f"{value or ''} {fallback_text}".lower().replace("-", "_").strip()
    compact = re.sub(r"[\s_]+", "_", text)

    if "escalate" in compact or "security_team" in compact:
        return "escalate"
    if "request_changes" in compact or "changes_requested" in compact:
        return "request_changes"
    if "do_not_approve" in compact or "cannot_approve" in compact or "not_approve" in compact:
        return "request_changes"
    if any(token in compact for token in ("reject", "needs_changes", "require_changes")):
        return "request_changes"
    if any(token in compact for token in ("approve", "approved", "accept", "lgtm", "merge")):
        return "approve"
    return "request_changes"


def normalize_issue_category(value: Any, fallback_text: str = "") -> str:
    explicit = str(value).lower().replace("-", "_") if value is not None else ""
    if explicit in ALLOWED_CATEGORIES:
        return explicit

    text = f"{explicit} {fallback_text}".lower()
    if "security" in text or "injection" in text or "secret" in text or "auth" in text:
        return "security"
    if (
        "logic" in text
        or "off_by_one" in text
        or "off-by-one" in text
        or "pagination" in text
        or "offset" in text
        or "page 1" in text
        or "skips first" in text
    ):
        return "logic"
    if "correct" in text or "bug" in text or "valid" in text:
        return "correctness"
    if "performance" in text or "latency" in text or "slow" in text:
        return "performance"
    return "none"


def normalize_action_payload(payload: Any) -> PRAction:
    data, raw_text = _coerce_dict(payload)
    comment_value = data.get("comment") or data.get("review") or data.get("feedback")
    if comment_value is None:
        comment_value = raw_text or "No detailed comment provided."

    if not isinstance(comment_value, str):
        comment_value = json.dumps(comment_value, ensure_ascii=False)

    decision = normalize_decision(data.get("decision") or data.get("verdict"), comment_value)
    issue_category = normalize_issue_category(
        data.get("issue_category") or data.get("category") or data.get("issue_type"),
        comment_value,
    )

    return PRAction(
        decision=decision,
        comment=comment_value.strip() or "No detailed comment provided.",
        issue_category=issue_category,
    )