File size: 12,794 Bytes
c4002f1
 
 
 
 
 
 
 
e954dcc
c4002f1
 
 
 
 
 
 
 
 
 
3255f2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4002f1
 
 
e954dcc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2236bd7
 
 
 
 
 
 
 
 
 
 
e954dcc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4002f1
 
2236bd7
 
 
 
 
 
c4002f1
 
 
 
 
 
 
 
 
 
c6464b6
 
 
 
 
c4002f1
 
 
e954dcc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4002f1
 
 
 
 
c6464b6
 
 
 
 
c4002f1
 
2236bd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4002f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
"""Shared SAST engine — stage 1 (detect) and stage 2 (adversarial refute).

Extracted verbatim from the snippet path so both the single-snippet audit and the
whole-repo audit reuse the *same* prompts, schemas and refutation logic. Pure functions:
they take an Outlines `model` + tokenizer, no GPU decorator (callers wrap as needed).
"""

import json
import re

from schemas import Report, Verdict

DETECT_SYS = (
    "You are a security code reviewer. List candidate SECURITY vulnerabilities with a real "
    "attack surface (command/SQL injection, path traversal, XSS, SSRF, deserialization, auth "
    "bypass...). Be inclusive about security issues — an adversarial pass verifies each — but "
    "ignore pure style, quality or error-handling nits. Use 1-based line numbers."
)
REFUTE_SYS = (
    "You are an exploit analyst verifying ONE claimed vulnerability in the given code. Decide if it "
    "is GENUINELY exploitable. Judge ONLY what the code shows.\n"
    "- EXPLOITABLE (exploitable=true): attacker-controlled input reaches the dangerous sink with NO "
    "adequate sanitization in between; give a concrete proof-of-concept input.\n"
    "- FALSE POSITIVE (exploitable=false): adequate protection on the value before the sink — a "
    "prepared/parameterized query, an allow-list of permitted values, a cast to a safe type, proper "
    "escaping (escapeshellarg, htmlspecialchars), or strict value validation (is_numeric on the whole "
    "value). Also FP if the sink is unreachable/dead code, or the input is not attacker-controlled. "
    "If a proper sanitization like that is visible, it IS a false positive — say so.\n"
    "Two traps, do NOT fall for them:\n"
    "1. An EXISTENCE/presence check is NOT sanitization. If attacker input is used to build a path "
    "passed to include/require/file_get_contents/fopen/readfile — EVEN with a fixed directory prefix, "
    "a forced extension (.php), or a file_exists()/is_file() guard — it IS EXPLOITABLE (path traversal "
    "/ LFI): the attacker still controls which file loads, and such prefix/suffix/existence constraints "
    "are routinely bypassed (../, encoded traversal, existing sensitive files).\n"
    "2. isset/empty/strlen do not stop injection.\n"
    "If exploitable, set exploitable=true and give the PoC input."
)

SEV = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "⚪"}
_SEV_SET = {"low", "medium", "high", "critical"}

# Flat JSON schema mirroring schemas.Report (Candidate fields) — used as response_format for the
# HF Inference detector, which does NOT do Outlines-style constrained decoding (unlike ZeroGPU).
REPORT_JSON_SCHEMA = {
    "type": "object",
    "properties": {
        "findings": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "vuln_type": {"type": "string"},
                    "line": {"type": "integer"},
                    "severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
                    "rationale": {"type": "string"},
                },
                "required": ["vuln_type", "line", "severity", "rationale"],
            },
        }
    },
    "required": ["findings"],
}

# Flat JSON schema mirroring schemas.Verdict — response_format for the HF Inference refuter.
VERDICT_JSON_SCHEMA = {
    "type": "object",
    "properties": {
        "exploitable": {"type": "boolean"},
        "reasoning": {"type": "string"},
        "poc": {"type": "string"},
    },
    "required": ["exploitable", "reasoning"],
}


def number_lines(code):
    """Prefix each line with its 1-based number (`N| `) so the detector cites the right line."""
    return "\n".join(f"{i}| {ln}" for i, ln in enumerate(code.splitlines(), 1))


def _usage_tokens(resp):
    u = getattr(resp, "usage", None)
    return (getattr(u, "total_tokens", 0) or 0) if u else 0


def _parse_json(txt):
    """Robust parse: strip ``` fences, else extract the outermost {...}. Returns dict or None."""
    if not txt:
        return None
    txt = txt.strip()
    if txt.startswith("```"):
        txt = re.sub(r"^```[a-zA-Z]*\n?", "", txt)
        txt = re.sub(r"\n?```$", "", txt).strip()
    try:
        return json.loads(txt)
    except json.JSONDecodeError:
        m = re.search(r"\{.*\}", txt, re.S)
        if not m:
            return None
        try:
            return json.loads(m.group(0))
        except json.JSONDecodeError:
            return None


def _clean(findings, max_findings=None):
    """Normalize HF-Inference findings to the Candidate shape (vuln_type/line/severity/rationale)."""
    out = []
    for f in (findings or []):
        if not isinstance(f, dict):
            continue
        try:
            vt = str(f["vuln_type"]).strip()
            ln = int(f["line"])
        except (KeyError, TypeError, ValueError):
            continue
        if not vt:
            continue
        sev = str(f.get("severity", "medium")).lower()
        if sev not in _SEV_SET:
            sev = "medium"
        out.append({"vuln_type": vt, "line": ln, "severity": sev,
                    "rationale": str(f.get("rationale", "")).strip()})
    return out[:max_findings] if max_findings else out


def _clean_verdict(data):
    return {"exploitable": bool((data or {}).get("exploitable")),
            "reasoning": str((data or {}).get("reasoning", "")).strip(),
            "poc": str((data or {}).get("poc") or "").strip()}


def chat(tok, system, user):
    return tok.apply_chat_template(
        [{"role": "system", "content": system}, {"role": "user", "content": user}],
        tokenize=False, add_generation_prompt=True)


def detect(model, tok, code, lang, max_findings=5):
    """Stage 1 — list candidate vulnerabilities for one piece of code."""
    prompt = chat(tok, DETECT_SYS, f"Language: {lang}\nCode:\n```\n{code}\n```\nList up to "
                                   f"{max_findings} candidate vulnerabilities.")
    try:
        rep = json.loads(model(prompt, output_type=Report, max_new_tokens=512))
    except (json.JSONDecodeError, ValueError):
        # output truncated mid-JSON (large/complex file) → nothing reliable to report
        return []
    return (rep.get("findings") or [])[:max_findings]


def detect_hf(client, model_id, code, lang, max_findings=None):
    """Stage 1 via HF Inference (a large model, e.g. Qwen3-Coder-480B) — an off-GPU network call.

    Sends line-numbered code and asks for ALL candidates (no cap by default: detector recall is
    what matters — the source→sink distance is what defeats the small model; stage-2 refutation
    kills false positives). Prefers a strict json_schema response_format; if the provider rejects
    it, retries once with a strict-JSON prompt + robust parse. Returns (findings, total_tokens),
    tokens being for cost logging by the caller.
    """
    numbered = number_lines(code)
    user = (f"Language: {lang}\nCode (each line is prefixed with `N| ` where N is its 1-based line "
            f"number):\n```\n{numbered}\n```\nList ALL candidate security vulnerabilities you find. "
            f"Cite the 1-based line number from the prefix. Do not limit the number of findings.")
    msgs = [{"role": "system", "content": DETECT_SYS}, {"role": "user", "content": user}]
    tokens = 0
    # 1) preferred: constrained to the schema (when the provider supports json_schema)
    try:
        r = client.chat.completions.create(
            model=model_id, messages=msgs, temperature=0, max_tokens=2048,
            response_format={"type": "json_schema",
                             "json_schema": {"name": "Report", "schema": REPORT_JSON_SCHEMA, "strict": True}},
        )
        tokens += _usage_tokens(r)
        data = _parse_json(r.choices[0].message.content)
        if data is not None:
            return _clean(data.get("findings"), max_findings), tokens
    except Exception:
        pass  # provider may not support response_format → fall through to the strict-JSON retry
    # 2) fallback: strict-JSON instruction, no response_format, one retry
    msgs2 = msgs + [{"role": "system", "content":
                     'Return ONLY a JSON object {"findings":[{"vuln_type":string,"line":int,'
                     '"severity":"low|medium|high|critical","rationale":string}]}. No prose, no markdown.'}]
    try:
        r = client.chat.completions.create(model=model_id, messages=msgs2, temperature=0, max_tokens=2048)
        tokens += _usage_tokens(r)
        data = _parse_json(r.choices[0].message.content) or {}
        return _clean(data.get("findings"), max_findings), tokens
    except Exception:
        return [], tokens


def refute(model, tok, code, candidate):
    """Stage 2 — adversarially verify ONE candidate (the calibrated, reused step)."""
    prompt = chat(tok, REFUTE_SYS,
                  f"Code:\n```\n{code}\n```\nClaimed vulnerability: {candidate['vuln_type']} "
                  f"at line {candidate['line']}{candidate['rationale']}\nIs it really exploitable?")
    try:
        return json.loads(model(prompt, output_type=Verdict, max_new_tokens=300))
    except (json.JSONDecodeError, ValueError):
        # couldn't parse a verdict → conservatively treat as not-confirmed (no false alarm)
        return {"exploitable": False, "reasoning": "verification inconclusive (parse error)", "poc": ""}


def refute_hf(client, model_id, code, candidate):
    """Stage 2 via HF Inference (small model) — an off-GPU network call, so a whole-repo scan needs
    no ZeroGPU reservation (no proxy-token expiry on long scans). Same REFUTE_SYS + targeted context
    as the ZeroGPU path. Returns (verdict, total_tokens). `code` is the caller's tight window."""
    user = (f"Code:\n```\n{code}\n```\nClaimed vulnerability: {candidate['vuln_type']} "
            f"at line {candidate['line']}{candidate['rationale']}\nIs it really exploitable?")
    msgs = [{"role": "system", "content": REFUTE_SYS}, {"role": "user", "content": user}]
    tokens = 0
    try:
        r = client.chat.completions.create(
            model=model_id, messages=msgs, temperature=0, max_tokens=512,
            response_format={"type": "json_schema",
                             "json_schema": {"name": "Verdict", "schema": VERDICT_JSON_SCHEMA, "strict": True}},
        )
        tokens += _usage_tokens(r)
        data = _parse_json(r.choices[0].message.content)
        if data is not None:
            return _clean_verdict(data), tokens
    except Exception:
        pass
    msgs2 = msgs + [{"role": "system", "content":
                     'Return ONLY {"exploitable":bool,"reasoning":string,"poc":string}. No prose, no markdown.'}]
    try:
        r = client.chat.completions.create(model=model_id, messages=msgs2, temperature=0, max_tokens=512)
        tokens += _usage_tokens(r)
        return _clean_verdict(_parse_json(r.choices[0].message.content)), tokens
    except Exception:
        return {"exploitable": False, "reasoning": "verification inconclusive (inference error)", "poc": ""}, tokens


def render_snippet(out, dt):
    """Markdown report for the single-snippet path (unchanged behaviour)."""
    if not out.get("verified"):
        cs = out.get("candidates", [])
        if not cs:
            return f"No candidate vulnerabilities found · {dt:.1f}s"
        lines = [f"### ⚠️ {len(cs)} candidates — **unverified** (raw detector) · {dt:.1f}s",
                 "_Raw guesses — some are false positives. Flip **Verify ON** to refute them._\n"]
        for c in cs:
            lines.append(f"- **{c['vuln_type']}** · line {c['line']} · "
                         f"{SEV.get(c['severity'], '')} {c['severity']}{c['rationale']}")
        return "\n".join(lines)

    res = out.get("results", [])
    real = [r for r in res if r["verdict"].get("exploitable")]
    fp = [r for r in res if not r["verdict"].get("exploitable")]
    lines = [f"### ✅ Verified · **{len(real)} confirmed**, {len(fp)} refuted · {dt:.1f}s\n"]
    if real:
        lines.append(f"#### ✗ Confirmed ({len(real)})")
        for r in real:
            c, v = r["candidate"], r["verdict"]
            lines.append(f"- **{c['vuln_type']}** · line {c['line']} · {SEV.get(c['severity'], '')} {c['severity']}")
            if v.get("poc"):
                lines.append(f"  - PoC: `{v['poc']}`")
            lines.append(f"  - {v.get('reasoning', '')}")
    if fp:
        lines.append(f"\n#### ✓ Refuted as false positive ({len(fp)})")
        for r in fp:
            c, v = r["candidate"], r["verdict"]
            lines.append(f"- ~~{c['vuln_type']} · line {c['line']}~~ — {v.get('reasoning', '')}")
    if not res:
        lines.append("_Nothing flagged._")
    return "\n".join(lines)