Spaces:
Running on Zero
Running on Zero
File size: 12,794 Bytes
c4002f1 e954dcc c4002f1 3255f2b c4002f1 e954dcc 2236bd7 e954dcc c4002f1 2236bd7 c4002f1 c6464b6 c4002f1 e954dcc c4002f1 c6464b6 c4002f1 2236bd7 c4002f1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | """Shared SAST engine — stage 1 (detect) and stage 2 (adversarial refute).
Extracted verbatim from the snippet path so both the single-snippet audit and the
whole-repo audit reuse the *same* prompts, schemas and refutation logic. Pure functions:
they take an Outlines `model` + tokenizer, no GPU decorator (callers wrap as needed).
"""
import json
import re
from schemas import Report, Verdict
DETECT_SYS = (
"You are a security code reviewer. List candidate SECURITY vulnerabilities with a real "
"attack surface (command/SQL injection, path traversal, XSS, SSRF, deserialization, auth "
"bypass...). Be inclusive about security issues — an adversarial pass verifies each — but "
"ignore pure style, quality or error-handling nits. Use 1-based line numbers."
)
REFUTE_SYS = (
"You are an exploit analyst verifying ONE claimed vulnerability in the given code. Decide if it "
"is GENUINELY exploitable. Judge ONLY what the code shows.\n"
"- EXPLOITABLE (exploitable=true): attacker-controlled input reaches the dangerous sink with NO "
"adequate sanitization in between; give a concrete proof-of-concept input.\n"
"- FALSE POSITIVE (exploitable=false): adequate protection on the value before the sink — a "
"prepared/parameterized query, an allow-list of permitted values, a cast to a safe type, proper "
"escaping (escapeshellarg, htmlspecialchars), or strict value validation (is_numeric on the whole "
"value). Also FP if the sink is unreachable/dead code, or the input is not attacker-controlled. "
"If a proper sanitization like that is visible, it IS a false positive — say so.\n"
"Two traps, do NOT fall for them:\n"
"1. An EXISTENCE/presence check is NOT sanitization. If attacker input is used to build a path "
"passed to include/require/file_get_contents/fopen/readfile — EVEN with a fixed directory prefix, "
"a forced extension (.php), or a file_exists()/is_file() guard — it IS EXPLOITABLE (path traversal "
"/ LFI): the attacker still controls which file loads, and such prefix/suffix/existence constraints "
"are routinely bypassed (../, encoded traversal, existing sensitive files).\n"
"2. isset/empty/strlen do not stop injection.\n"
"If exploitable, set exploitable=true and give the PoC input."
)
SEV = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "⚪"}
_SEV_SET = {"low", "medium", "high", "critical"}
# Flat JSON schema mirroring schemas.Report (Candidate fields) — used as response_format for the
# HF Inference detector, which does NOT do Outlines-style constrained decoding (unlike ZeroGPU).
REPORT_JSON_SCHEMA = {
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"vuln_type": {"type": "string"},
"line": {"type": "integer"},
"severity": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"rationale": {"type": "string"},
},
"required": ["vuln_type", "line", "severity", "rationale"],
},
}
},
"required": ["findings"],
}
# Flat JSON schema mirroring schemas.Verdict — response_format for the HF Inference refuter.
VERDICT_JSON_SCHEMA = {
"type": "object",
"properties": {
"exploitable": {"type": "boolean"},
"reasoning": {"type": "string"},
"poc": {"type": "string"},
},
"required": ["exploitable", "reasoning"],
}
def number_lines(code):
"""Prefix each line with its 1-based number (`N| `) so the detector cites the right line."""
return "\n".join(f"{i}| {ln}" for i, ln in enumerate(code.splitlines(), 1))
def _usage_tokens(resp):
u = getattr(resp, "usage", None)
return (getattr(u, "total_tokens", 0) or 0) if u else 0
def _parse_json(txt):
"""Robust parse: strip ``` fences, else extract the outermost {...}. Returns dict or None."""
if not txt:
return None
txt = txt.strip()
if txt.startswith("```"):
txt = re.sub(r"^```[a-zA-Z]*\n?", "", txt)
txt = re.sub(r"\n?```$", "", txt).strip()
try:
return json.loads(txt)
except json.JSONDecodeError:
m = re.search(r"\{.*\}", txt, re.S)
if not m:
return None
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
return None
def _clean(findings, max_findings=None):
"""Normalize HF-Inference findings to the Candidate shape (vuln_type/line/severity/rationale)."""
out = []
for f in (findings or []):
if not isinstance(f, dict):
continue
try:
vt = str(f["vuln_type"]).strip()
ln = int(f["line"])
except (KeyError, TypeError, ValueError):
continue
if not vt:
continue
sev = str(f.get("severity", "medium")).lower()
if sev not in _SEV_SET:
sev = "medium"
out.append({"vuln_type": vt, "line": ln, "severity": sev,
"rationale": str(f.get("rationale", "")).strip()})
return out[:max_findings] if max_findings else out
def _clean_verdict(data):
return {"exploitable": bool((data or {}).get("exploitable")),
"reasoning": str((data or {}).get("reasoning", "")).strip(),
"poc": str((data or {}).get("poc") or "").strip()}
def chat(tok, system, user):
return tok.apply_chat_template(
[{"role": "system", "content": system}, {"role": "user", "content": user}],
tokenize=False, add_generation_prompt=True)
def detect(model, tok, code, lang, max_findings=5):
"""Stage 1 — list candidate vulnerabilities for one piece of code."""
prompt = chat(tok, DETECT_SYS, f"Language: {lang}\nCode:\n```\n{code}\n```\nList up to "
f"{max_findings} candidate vulnerabilities.")
try:
rep = json.loads(model(prompt, output_type=Report, max_new_tokens=512))
except (json.JSONDecodeError, ValueError):
# output truncated mid-JSON (large/complex file) → nothing reliable to report
return []
return (rep.get("findings") or [])[:max_findings]
def detect_hf(client, model_id, code, lang, max_findings=None):
"""Stage 1 via HF Inference (a large model, e.g. Qwen3-Coder-480B) — an off-GPU network call.
Sends line-numbered code and asks for ALL candidates (no cap by default: detector recall is
what matters — the source→sink distance is what defeats the small model; stage-2 refutation
kills false positives). Prefers a strict json_schema response_format; if the provider rejects
it, retries once with a strict-JSON prompt + robust parse. Returns (findings, total_tokens),
tokens being for cost logging by the caller.
"""
numbered = number_lines(code)
user = (f"Language: {lang}\nCode (each line is prefixed with `N| ` where N is its 1-based line "
f"number):\n```\n{numbered}\n```\nList ALL candidate security vulnerabilities you find. "
f"Cite the 1-based line number from the prefix. Do not limit the number of findings.")
msgs = [{"role": "system", "content": DETECT_SYS}, {"role": "user", "content": user}]
tokens = 0
# 1) preferred: constrained to the schema (when the provider supports json_schema)
try:
r = client.chat.completions.create(
model=model_id, messages=msgs, temperature=0, max_tokens=2048,
response_format={"type": "json_schema",
"json_schema": {"name": "Report", "schema": REPORT_JSON_SCHEMA, "strict": True}},
)
tokens += _usage_tokens(r)
data = _parse_json(r.choices[0].message.content)
if data is not None:
return _clean(data.get("findings"), max_findings), tokens
except Exception:
pass # provider may not support response_format → fall through to the strict-JSON retry
# 2) fallback: strict-JSON instruction, no response_format, one retry
msgs2 = msgs + [{"role": "system", "content":
'Return ONLY a JSON object {"findings":[{"vuln_type":string,"line":int,'
'"severity":"low|medium|high|critical","rationale":string}]}. No prose, no markdown.'}]
try:
r = client.chat.completions.create(model=model_id, messages=msgs2, temperature=0, max_tokens=2048)
tokens += _usage_tokens(r)
data = _parse_json(r.choices[0].message.content) or {}
return _clean(data.get("findings"), max_findings), tokens
except Exception:
return [], tokens
def refute(model, tok, code, candidate):
"""Stage 2 — adversarially verify ONE candidate (the calibrated, reused step)."""
prompt = chat(tok, REFUTE_SYS,
f"Code:\n```\n{code}\n```\nClaimed vulnerability: {candidate['vuln_type']} "
f"at line {candidate['line']} — {candidate['rationale']}\nIs it really exploitable?")
try:
return json.loads(model(prompt, output_type=Verdict, max_new_tokens=300))
except (json.JSONDecodeError, ValueError):
# couldn't parse a verdict → conservatively treat as not-confirmed (no false alarm)
return {"exploitable": False, "reasoning": "verification inconclusive (parse error)", "poc": ""}
def refute_hf(client, model_id, code, candidate):
"""Stage 2 via HF Inference (small model) — an off-GPU network call, so a whole-repo scan needs
no ZeroGPU reservation (no proxy-token expiry on long scans). Same REFUTE_SYS + targeted context
as the ZeroGPU path. Returns (verdict, total_tokens). `code` is the caller's tight window."""
user = (f"Code:\n```\n{code}\n```\nClaimed vulnerability: {candidate['vuln_type']} "
f"at line {candidate['line']} — {candidate['rationale']}\nIs it really exploitable?")
msgs = [{"role": "system", "content": REFUTE_SYS}, {"role": "user", "content": user}]
tokens = 0
try:
r = client.chat.completions.create(
model=model_id, messages=msgs, temperature=0, max_tokens=512,
response_format={"type": "json_schema",
"json_schema": {"name": "Verdict", "schema": VERDICT_JSON_SCHEMA, "strict": True}},
)
tokens += _usage_tokens(r)
data = _parse_json(r.choices[0].message.content)
if data is not None:
return _clean_verdict(data), tokens
except Exception:
pass
msgs2 = msgs + [{"role": "system", "content":
'Return ONLY {"exploitable":bool,"reasoning":string,"poc":string}. No prose, no markdown.'}]
try:
r = client.chat.completions.create(model=model_id, messages=msgs2, temperature=0, max_tokens=512)
tokens += _usage_tokens(r)
return _clean_verdict(_parse_json(r.choices[0].message.content)), tokens
except Exception:
return {"exploitable": False, "reasoning": "verification inconclusive (inference error)", "poc": ""}, tokens
def render_snippet(out, dt):
"""Markdown report for the single-snippet path (unchanged behaviour)."""
if not out.get("verified"):
cs = out.get("candidates", [])
if not cs:
return f"No candidate vulnerabilities found · {dt:.1f}s"
lines = [f"### ⚠️ {len(cs)} candidates — **unverified** (raw detector) · {dt:.1f}s",
"_Raw guesses — some are false positives. Flip **Verify ON** to refute them._\n"]
for c in cs:
lines.append(f"- **{c['vuln_type']}** · line {c['line']} · "
f"{SEV.get(c['severity'], '')} {c['severity']} — {c['rationale']}")
return "\n".join(lines)
res = out.get("results", [])
real = [r for r in res if r["verdict"].get("exploitable")]
fp = [r for r in res if not r["verdict"].get("exploitable")]
lines = [f"### ✅ Verified · **{len(real)} confirmed**, {len(fp)} refuted · {dt:.1f}s\n"]
if real:
lines.append(f"#### ✗ Confirmed ({len(real)})")
for r in real:
c, v = r["candidate"], r["verdict"]
lines.append(f"- **{c['vuln_type']}** · line {c['line']} · {SEV.get(c['severity'], '')} {c['severity']}")
if v.get("poc"):
lines.append(f" - PoC: `{v['poc']}`")
lines.append(f" - {v.get('reasoning', '')}")
if fp:
lines.append(f"\n#### ✓ Refuted as false positive ({len(fp)})")
for r in fp:
c, v = r["candidate"], r["verdict"]
lines.append(f"- ~~{c['vuln_type']} · line {c['line']}~~ — {v.get('reasoning', '')}")
if not res:
lines.append("_Nothing flagged._")
return "\n".join(lines)
|