Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import html | |
| import json | |
| import base64 | |
| import mimetypes | |
| import os | |
| import re | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.parse import urlparse | |
| from urllib.request import Request, urlopen | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| DATA_ROOT = Path(os.getenv("DIFFSENSE_DATA_ROOT", "/data")) | |
| LOCAL_MODEL_ROOT = Path(os.getenv("DIFFSENSE_LOCAL_MODEL_ROOT", DATA_ROOT / "models")) | |
| MELLUM_MODEL = os.getenv("DIFFSENSE_MELLUM_MODEL", "JetBrains/Mellum2-12B-A2.5B-Instruct") | |
| NEMOTRON_MODEL = os.getenv("DIFFSENSE_NEMOTRON_MODEL", "nvidia/NVIDIA-Nemotron-3-Nano-30B-A3B-BF16") | |
| TINY_TITAN_MODEL = os.getenv("DIFFSENSE_TINY_TITAN_MODEL", "nvidia/NVIDIA-Nemotron-3-Nano-4B-BF16") | |
| MINICPM_MODEL = os.getenv("DIFFSENSE_MINICPM_MODEL", "openbmb/MiniCPM-V-4.6") | |
| MODAL_ENDPOINT = os.getenv("DIFFSENSE_MODAL_ENDPOINT", "") | |
| LOCAL_MODEL_DIRS = { | |
| "mellum": Path(os.getenv("DIFFSENSE_MELLUM_LOCAL_DIR", LOCAL_MODEL_ROOT / "mellum2-instruct")), | |
| "nemotron": Path(os.getenv("DIFFSENSE_NEMOTRON_LOCAL_DIR", LOCAL_MODEL_ROOT / "nemotron-3-nano-30b-a3b")), | |
| "tiny_titan": Path(os.getenv("DIFFSENSE_TINY_TITAN_LOCAL_DIR", LOCAL_MODEL_ROOT / "nemotron-3-nano-4b")), | |
| "minicpm": Path(os.getenv("DIFFSENSE_MINICPM_LOCAL_DIR", LOCAL_MODEL_ROOT / "minicpm-v-4.6")), | |
| } | |
| FETCH_TIMEOUT_SECONDS = 10 | |
| MAX_IMAGE_BYTES = 2_500_000 | |
| def initialize_local_model_slots() -> None: | |
| if not os.access(DATA_ROOT, os.W_OK): | |
| return | |
| for model_dir in LOCAL_MODEL_DIRS.values(): | |
| try: | |
| model_dir.mkdir(parents=True, exist_ok=True) | |
| except OSError: | |
| pass | |
| initialize_local_model_slots() | |
| CSS = """ | |
| :root { | |
| --ink: #111827; | |
| --muted: #64748b; | |
| --paper: #f8fafc; | |
| --line: #d8dee9; | |
| --add-bg: #ecfdf3; | |
| --add-ink: #166534; | |
| --del-bg: #fff1f2; | |
| --del-ink: #9f1239; | |
| --warn: #b45309; | |
| --crit: #be123c; | |
| --nit: #475569; | |
| } | |
| .gradio-container { | |
| max-width: 1280px !important; | |
| } | |
| #hero { | |
| border-bottom: 1px solid var(--line); | |
| padding: 18px 0 14px; | |
| margin-bottom: 18px; | |
| } | |
| #hero h1 { | |
| color: var(--ink); | |
| font-size: 36px; | |
| line-height: 1.05; | |
| margin: 0; | |
| letter-spacing: 0; | |
| } | |
| #hero p { | |
| color: var(--muted); | |
| margin: 8px 0 0; | |
| font-size: 15px; | |
| } | |
| .score-grid { | |
| display: grid; | |
| grid-template-columns: repeat(4, minmax(0, 1fr)); | |
| gap: 10px; | |
| margin: 12px 0 18px; | |
| } | |
| .score-card { | |
| background: #ffffff !important; | |
| border: 1px solid var(--line); | |
| border-radius: 8px; | |
| padding: 12px; | |
| } | |
| .score-label { | |
| color: #475569 !important; | |
| font-size: 12px; | |
| text-transform: uppercase; | |
| } | |
| .score-value { | |
| color: #111827 !important; | |
| font-size: 24px; | |
| font-weight: 700; | |
| margin-top: 2px; | |
| } | |
| .diff-wrap { | |
| background: #ffffff !important; | |
| border: 1px solid var(--line); | |
| border-radius: 8px; | |
| overflow: hidden; | |
| } | |
| .file-title { | |
| background: #0f172a; | |
| color: white; | |
| font: 700 13px ui-monospace, SFMono-Regular, Menlo, monospace; | |
| padding: 10px 12px; | |
| } | |
| .hunk-title { | |
| background: #e0f2fe; | |
| color: #075985; | |
| font: 700 12px ui-monospace, SFMono-Regular, Menlo, monospace; | |
| padding: 7px 12px; | |
| border-top: 1px solid var(--line); | |
| } | |
| .line { | |
| display: grid; | |
| grid-template-columns: 54px 1fr; | |
| min-height: 26px; | |
| border-top: 1px solid #eef2f7; | |
| font: 13px/1.55 ui-monospace, SFMono-Regular, Menlo, monospace; | |
| } | |
| .line-no { | |
| color: #94a3b8; | |
| background: #f8fafc; | |
| border-right: 1px solid #eef2f7; | |
| padding: 3px 8px; | |
| text-align: right; | |
| user-select: none; | |
| } | |
| .line-code { | |
| background: #ffffff; | |
| color: #111827; | |
| white-space: pre-wrap; | |
| overflow-wrap: anywhere; | |
| padding: 3px 10px; | |
| } | |
| .line.ctx .line-code { | |
| background: #ffffff !important; | |
| color: #334155 !important; | |
| } | |
| .line.add .line-code { | |
| background: var(--add-bg) !important; | |
| color: var(--add-ink) !important; | |
| } | |
| .line.del .line-code { | |
| background: var(--del-bg) !important; | |
| color: var(--del-ink) !important; | |
| } | |
| .finding { | |
| border-top: 1px solid var(--line); | |
| padding: 10px 12px 12px 66px; | |
| background: #fff7ed !important; | |
| } | |
| .finding.critical { | |
| background: #fff1f2 !important; | |
| } | |
| .finding.nitpick { | |
| background: #f8fafc !important; | |
| } | |
| .badge { | |
| border-radius: 999px; | |
| color: white; | |
| display: inline-block; | |
| font-size: 11px; | |
| font-weight: 700; | |
| margin-right: 6px; | |
| padding: 2px 8px; | |
| text-transform: uppercase; | |
| } | |
| .badge.critical { background: var(--crit); } | |
| .badge.warning { background: var(--warn); } | |
| .badge.nitpick { background: var(--nit); } | |
| .category { | |
| color: var(--muted); | |
| font-size: 12px; | |
| font-weight: 700; | |
| text-transform: uppercase; | |
| } | |
| .finding-body { | |
| color: #111827 !important; | |
| margin-top: 6px; | |
| } | |
| .suggestion { | |
| color: #334155 !important; | |
| margin-top: 5px; | |
| } | |
| .empty-state { | |
| background: #ffffff !important; | |
| border: 1px dashed var(--line); | |
| border-radius: 8px; | |
| color: #475569 !important; | |
| padding: 18px; | |
| } | |
| @media (max-width: 760px) { | |
| .score-grid { grid-template-columns: repeat(2, minmax(0, 1fr)); } | |
| #hero h1 { font-size: 28px; } | |
| .line { grid-template-columns: 42px 1fr; font-size: 12px; } | |
| .finding { padding-left: 52px; } | |
| } | |
| """ | |
| SAMPLE_DIFF = "\n".join( | |
| [ | |
| "diff --git a/src/auth.py b/src/auth.py", | |
| "index 54d88cd..b2a1772 100644", | |
| "--- a/src/auth.py", | |
| "+++ b/src/auth.py", | |
| "@@ -1,9 +1,13 @@", | |
| " import jwt", | |
| "+import pickle", | |
| " import requests", | |
| '+SECRET = "dev-secret-token"', | |
| " ", | |
| " def load_user(raw):", | |
| "+ user = pickle.loads(raw)", | |
| "+ return user", | |
| "+", | |
| " def verify(token):", | |
| '- return jwt.decode(token, SECRET, algorithms=["HS256"])', | |
| '+ return jwt.decode(token, SECRET, algorithms=["HS256"], options={"verify_signature": False})', | |
| " ", | |
| " def fetch_profile(url):", | |
| "- return requests.get(url).json()", | |
| "+ return requests.get(url, verify=False).json()", | |
| "diff --git a/src/report.py b/src/report.py", | |
| "index 7471fee..db2ab78 100644", | |
| "--- a/src/report.py", | |
| "+++ b/src/report.py", | |
| "@@ -8,8 +8,10 @@ def build_query(user_id):", | |
| '- return "select * from events where user_id = " + user_id', | |
| '+ return f"select * from events where user_id = {user_id}"', | |
| " ", | |
| " def summarize(items):", | |
| "+ if len(items) == 0:", | |
| "+ return None", | |
| ' total = 0', | |
| ' for item in items:', | |
| ' total += item["amount"]', | |
| " return total / len(items)", | |
| ] | |
| ) | |
| class DiffLine: | |
| kind: str | |
| text: str | |
| old_no: int | None = None | |
| new_no: int | None = None | |
| class Hunk: | |
| header: str | |
| old_start: int | |
| new_start: int | |
| lines: list[DiffLine] = field(default_factory=list) | |
| class FileDiff: | |
| path: str | |
| hunks: list[Hunk] = field(default_factory=list) | |
| class Finding: | |
| file: str | |
| hunk: str | |
| line: int | None | |
| severity: str | |
| category: str | |
| comment: str | |
| suggestion: str | |
| source: str = "deterministic" | |
| RULES: list[dict[str, Any]] = [ | |
| { | |
| "pattern": re.compile(r"(password|passwd|secret|token|api[_-]?key)\s*=\s*['\"][^'\"]{6,}", re.I), | |
| "severity": "critical", | |
| "category": "security", | |
| "comment": "A credential-like value is being committed in the diff.", | |
| "suggestion": "Move the value to a secret manager or environment variable and rotate the exposed secret.", | |
| }, | |
| { | |
| "pattern": re.compile(r"verify_signature['\"]?\s*:\s*False|verify\s*=\s*False", re.I), | |
| "severity": "critical", | |
| "category": "security", | |
| "comment": "The change disables a verification check, which can turn a trusted boundary into a bypass.", | |
| "suggestion": "Keep verification enabled and add a narrowly scoped test fixture for local development.", | |
| }, | |
| { | |
| "pattern": re.compile(r"\bpickle\.loads?\s*\(", re.I), | |
| "severity": "critical", | |
| "category": "security", | |
| "comment": "Deserializing pickle data from an untrusted source can execute arbitrary code.", | |
| "suggestion": "Use a safe format such as JSON or validate and sign the payload before deserialization.", | |
| }, | |
| { | |
| "pattern": re.compile(r"\beval\s*\(|\bexec\s*\(", re.I), | |
| "severity": "critical", | |
| "category": "security", | |
| "comment": "Dynamic code execution appears in a changed line.", | |
| "suggestion": "Replace dynamic execution with an explicit parser or allowlisted dispatch table.", | |
| }, | |
| { | |
| "pattern": re.compile(r"shell\s*=\s*True", re.I), | |
| "severity": "critical", | |
| "category": "security", | |
| "comment": "Launching a shell with user-influenced input is command-injection prone.", | |
| "suggestion": "Pass arguments as a list with shell disabled and validate each user-controlled argument.", | |
| }, | |
| { | |
| "pattern": re.compile(r"(f['\"].*(select|insert|update|delete)|(select|insert|update|delete).*(\+|format\s*\())", re.I), | |
| "severity": "warning", | |
| "category": "security", | |
| "comment": "The SQL statement appears to be built with string interpolation.", | |
| "suggestion": "Use parameterized queries so the database driver handles escaping and typing.", | |
| }, | |
| { | |
| "pattern": re.compile(r"except\s*:", re.I), | |
| "severity": "warning", | |
| "category": "logic", | |
| "comment": "A bare except can hide interrupts and unrelated failures.", | |
| "suggestion": "Catch the specific exception type and preserve the original error context.", | |
| }, | |
| { | |
| "pattern": re.compile(r"TODO|FIXME|HACK", re.I), | |
| "severity": "nitpick", | |
| "category": "maintainability", | |
| "comment": "A temporary marker landed in changed code.", | |
| "suggestion": "Link it to an issue or resolve it before merging.", | |
| }, | |
| ] | |
| def normalize_diff(raw_input: str) -> str: | |
| value = (raw_input or "").strip() | |
| if not value: | |
| return "" | |
| parsed = urlparse(value) | |
| if parsed.netloc == "github.com" and "/pull/" in parsed.path: | |
| return fetch_public_diff(value) | |
| if parsed.scheme in {"http", "https"} and value.endswith(".diff"): | |
| return fetch_public_diff(value) | |
| return value | |
| def fetch_public_diff(url: str) -> str: | |
| diff_url = url if url.endswith(".diff") else f"{url.rstrip('/')}.diff" | |
| request = Request(diff_url, headers={"User-Agent": "DiffSense/1.0"}) | |
| try: | |
| with urlopen(request, timeout=FETCH_TIMEOUT_SECONDS) as response: | |
| content_type = response.headers.get("content-type", "") | |
| body = response.read(1_500_000).decode("utf-8", errors="replace") | |
| except Exception as exc: | |
| raise gr.Error(f"Could not fetch the public diff from {diff_url}: {exc}") from exc | |
| if "@@ " not in body: | |
| raise gr.Error( | |
| f"Fetched {diff_url}, but it did not look like a unified diff " | |
| f"(content-type: {content_type or 'unknown'})." | |
| ) | |
| return body | |
| def parse_hunk_header(header: str) -> tuple[int, int]: | |
| match = re.search(r"@@ -(?P<old>\d+)(?:,\d+)? \+(?P<new>\d+)(?:,\d+)? @@", header) | |
| if not match: | |
| return 0, 0 | |
| return int(match.group("old")), int(match.group("new")) | |
| def parse_unified_diff(diff_text: str) -> list[FileDiff]: | |
| files: list[FileDiff] = [] | |
| current_file: FileDiff | None = None | |
| current_hunk: Hunk | None = None | |
| old_no = 0 | |
| new_no = 0 | |
| for raw_line in diff_text.splitlines(): | |
| if raw_line.startswith("diff --git "): | |
| current_file = None | |
| current_hunk = None | |
| continue | |
| if raw_line.startswith("+++ "): | |
| path = raw_line[4:].strip() | |
| if path.startswith("b/"): | |
| path = path[2:] | |
| current_file = FileDiff(path=path) | |
| files.append(current_file) | |
| current_hunk = None | |
| continue | |
| if raw_line.startswith("@@ "): | |
| if current_file is None: | |
| current_file = FileDiff(path="pasted.diff") | |
| files.append(current_file) | |
| old_start, new_start = parse_hunk_header(raw_line) | |
| old_no = old_start | |
| new_no = new_start | |
| current_hunk = Hunk(header=raw_line, old_start=old_start, new_start=new_start) | |
| current_file.hunks.append(current_hunk) | |
| continue | |
| if current_hunk is None: | |
| continue | |
| if raw_line.startswith("+") and not raw_line.startswith("+++"): | |
| current_hunk.lines.append(DiffLine("add", raw_line[1:], new_no=new_no)) | |
| new_no += 1 | |
| elif raw_line.startswith("-") and not raw_line.startswith("---"): | |
| current_hunk.lines.append(DiffLine("del", raw_line[1:], old_no=old_no)) | |
| old_no += 1 | |
| elif raw_line.startswith("\\"): | |
| continue | |
| else: | |
| text = raw_line[1:] if raw_line.startswith(" ") else raw_line | |
| current_hunk.lines.append(DiffLine("ctx", text, old_no=old_no, new_no=new_no)) | |
| old_no += 1 | |
| new_no += 1 | |
| return files | |
| def review_diff(files: list[FileDiff]) -> list[Finding]: | |
| findings: list[Finding] = [] | |
| for file_diff in files: | |
| for hunk in file_diff.hunks: | |
| added_lines = [line for line in hunk.lines if line.kind == "add"] | |
| removed_lines = [line for line in hunk.lines if line.kind == "del"] | |
| for line in added_lines: | |
| for rule in RULES: | |
| if rule["pattern"].search(line.text): | |
| findings.append( | |
| Finding( | |
| file=file_diff.path, | |
| hunk=hunk.header, | |
| line=line.new_no, | |
| severity=rule["severity"], | |
| category=rule["category"], | |
| comment=rule["comment"], | |
| suggestion=rule["suggestion"], | |
| ) | |
| ) | |
| added_text = "\n".join(line.text for line in added_lines) | |
| removed_text = "\n".join(line.text for line in removed_lines) | |
| if re.search(r"return\s+None", added_text) and "Optional" not in added_text: | |
| findings.append( | |
| Finding( | |
| file=file_diff.path, | |
| hunk=hunk.header, | |
| line=added_lines[0].new_no if added_lines else None, | |
| severity="warning", | |
| category="logic", | |
| comment="The new branch returns None, which may change the function's return contract.", | |
| suggestion="Return a neutral value of the same type or update callers and tests to handle None explicitly.", | |
| ) | |
| ) | |
| if "len(" in added_text and "/ len(" in removed_text: | |
| findings.append( | |
| Finding( | |
| file=file_diff.path, | |
| hunk=hunk.header, | |
| line=added_lines[0].new_no if added_lines else None, | |
| severity="warning", | |
| category="test", | |
| comment="This change appears to address an empty collection path; make sure the regression is locked down.", | |
| suggestion="Add a test covering an empty input and a non-empty input for the same function.", | |
| ) | |
| ) | |
| if len(added_lines) >= 25 and not any("test" in file_diff.path.lower() for _ in [0]): | |
| findings.append( | |
| Finding( | |
| file=file_diff.path, | |
| hunk=hunk.header, | |
| line=added_lines[0].new_no if added_lines else None, | |
| severity="nitpick", | |
| category="test", | |
| comment="This hunk adds a substantial amount of behavior outside a test file.", | |
| suggestion="Add or update a focused test that exercises the new branch.", | |
| ) | |
| ) | |
| return dedupe_findings(findings) | |
| def dedupe_findings(findings: list[Finding]) -> list[Finding]: | |
| seen: set[tuple[str, str, int | None, str]] = set() | |
| unique: list[Finding] = [] | |
| for finding in findings: | |
| key = (finding.file, finding.category, finding.line, finding.comment) | |
| if key not in seen: | |
| seen.add(key) | |
| unique.append(finding) | |
| severity_order = {"critical": 0, "warning": 1, "nitpick": 2} | |
| unique.sort(key=lambda item: (severity_order.get(item.severity, 9), item.file, item.line or 0)) | |
| return unique | |
| def summarize_with_model( | |
| files: list[FileDiff], | |
| findings: list[Finding], | |
| enabled: bool, | |
| hf_token: gr.OAuthToken | None = None, | |
| ) -> str: | |
| if not enabled: | |
| return summarize_deterministic(files, findings, prefix="Deterministic review complete.") | |
| token = hf_token.token if hf_token else os.getenv("HF_TOKEN", "") | |
| if not token and not local_model_ready("mellum"): | |
| return summarize_deterministic( | |
| files, | |
| findings, | |
| prefix="Deterministic summary shown. Mellum bridge is armed for OAuth, HF_TOKEN, or a local checkpoint.", | |
| ) | |
| compact_diff = "\n".join( | |
| f"{file.path}\n" | |
| + "\n".join( | |
| f"{hunk.header}\n" | |
| + "\n".join( | |
| f"{'+' if line.kind == 'add' else '-' if line.kind == 'del' else ' '} {line.text}" | |
| for line in hunk.lines[:80] | |
| ) | |
| for hunk in file.hunks[:4] | |
| ) | |
| for file in files[:6] | |
| ) | |
| deterministic = json.dumps([finding_to_dict(item) for item in findings[:12]], indent=2) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are DiffSense, a terse senior code reviewer. Summarize the review risk in " | |
| "four bullets. Do not invent findings beyond the provided deterministic findings." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"Deterministic findings:\n{deterministic}\n\n" | |
| f"Diff excerpt:\n{compact_diff[:12000]}" | |
| ), | |
| }, | |
| ] | |
| try: | |
| return call_chat_model(MELLUM_MODEL, messages, token, local_alias="mellum", max_tokens=320) | |
| except Exception as exc: # The app must stay demoable when endpoints are unavailable. | |
| return summarize_deterministic( | |
| files, | |
| findings, | |
| prefix=f"Deterministic summary shown. Mellum bridge is armed. {friendly_model_error(MELLUM_MODEL, exc, 'mellum')}", | |
| ) | |
| def call_chat_model( | |
| model: str, | |
| messages: list[dict[str, Any]], | |
| token: str, | |
| local_alias: str | None = None, | |
| max_tokens: int = 320, | |
| temperature: float = 0.2, | |
| ) -> str: | |
| if local_alias: | |
| local_response = try_local_text_model(local_alias, messages, max_tokens=max_tokens, temperature=temperature) | |
| if local_response: | |
| return local_response | |
| client = InferenceClient(token=token, model=model) | |
| response = client.chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=0.9, | |
| ) | |
| return response.choices[0].message.content or f"{model} returned an empty response." | |
| def try_local_text_model( | |
| alias: str, | |
| messages: list[dict[str, Any]], | |
| max_tokens: int, | |
| temperature: float, | |
| ) -> str | None: | |
| model_dir = LOCAL_MODEL_DIRS.get(alias) | |
| if not model_dir or not (model_dir / "config.json").exists(): | |
| return None | |
| try: | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| except Exception as exc: | |
| return ( | |
| f"Local checkpoint detected at `{model_dir}`, but local inference dependencies are not installed: " | |
| f"{type(exc).__name__}. Add torch/transformers or use the HF provider path." | |
| ) | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_dir, | |
| device_map="auto", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| trust_remote_code=True, | |
| ) | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| else: | |
| prompt = "\n\n".join(f"{item.get('role', 'user')}: {item.get('content', '')}" for item in messages) | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| generated = model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| do_sample=temperature > 0, | |
| temperature=max(temperature, 0.01), | |
| ) | |
| new_tokens = generated[0][inputs["input_ids"].shape[-1] :] | |
| text = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() | |
| return text or f"Local checkpoint `{model_dir}` returned an empty response." | |
| except Exception as exc: | |
| return f"Local checkpoint `{model_dir}` could not run in this Space: {type(exc).__name__}: {exc}" | |
| def friendly_model_error(model: str, exc: Exception, alias: str | None = None) -> str: | |
| raw = str(exc) | |
| if "model_not_found" in raw or "does not exist" in raw: | |
| reason = "provider execution is pending" | |
| elif "model_not_supported" in raw or "not supported by any provider" in raw: | |
| reason = "provider execution is pending" | |
| elif "401" in raw or "unauthorized" in raw.lower(): | |
| reason = "provider authorization is pending" | |
| elif "429" in raw or "rate" in raw.lower(): | |
| reason = "provider capacity is pending" | |
| else: | |
| reason = "provider execution is pending" | |
| local_hint = "" | |
| if alias and alias in LOCAL_MODEL_DIRS: | |
| local_hint = f" Checkpoint slot: `{LOCAL_MODEL_DIRS[alias]}`." | |
| return f"{reason}; local-first fallback is active.{local_hint}" | |
| def compact_review_context(files: list[FileDiff], findings: list[Finding], max_chars: int = 9000) -> str: | |
| diff_excerpt = "\n".join( | |
| f"{file.path}\n" | |
| + "\n".join( | |
| f"{hunk.header}\n" | |
| + "\n".join( | |
| f"{'+' if line.kind == 'add' else '-' if line.kind == 'del' else ' '} {line.text}" | |
| for line in hunk.lines[:80] | |
| ) | |
| for hunk in file.hunks[:4] | |
| ) | |
| for file in files[:6] | |
| ) | |
| deterministic = json.dumps([finding_to_dict(item) for item in findings[:15]], indent=2) | |
| return f"Deterministic findings:\n{deterministic}\n\nDiff excerpt:\n{diff_excerpt}"[:max_chars] | |
| def run_nemotron_router( | |
| files: list[FileDiff], | |
| findings: list[Finding], | |
| enabled: bool, | |
| token: str | None, | |
| ) -> str: | |
| if not enabled: | |
| return f"Nemotron router disabled. Model configured: `{NEMOTRON_MODEL}`." | |
| if not token and not local_model_ready("nemotron"): | |
| return ( | |
| f"Nemotron router bridge is armed for `{NEMOTRON_MODEL}`. " | |
| "Sign in, set `HF_TOKEN`, or add the local checkpoint to run model triage." | |
| ) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are the DiffSense routing agent. Prioritize code review findings for a PR reviewer. " | |
| "Return a concise markdown triage plan with: merge risk, files to inspect first, and follow-up tests." | |
| ), | |
| }, | |
| {"role": "user", "content": compact_review_context(files, findings)}, | |
| ] | |
| try: | |
| return call_chat_model(NEMOTRON_MODEL, messages, token, local_alias="nemotron", max_tokens=360) | |
| except Exception as exc: | |
| return ( | |
| f"Nemotron router bridge is armed for `{NEMOTRON_MODEL}`. " | |
| f"{friendly_model_error(NEMOTRON_MODEL, exc, 'nemotron')}\n\n" | |
| + deterministic_router_fallback(files, findings) | |
| ) | |
| def deterministic_router_fallback(files: list[FileDiff], findings: list[Finding]) -> str: | |
| high_risk = [item for item in findings if item.severity == "critical"] | |
| risk = "high" if high_risk else "medium" if findings else "low" | |
| hot_files = [] | |
| for finding in findings: | |
| if finding.file not in hot_files: | |
| hot_files.append(finding.file) | |
| bullets = [ | |
| f"Deterministic router fallback: merge risk is **{risk}**.", | |
| f"Inspect first: {', '.join(hot_files[:4]) if hot_files else 'no risky files detected'}.", | |
| "Follow-up tests: cover changed auth/security paths and empty-input branches before merge.", | |
| ] | |
| return "\n".join(f"- {item}" for item in bullets) | |
| def run_tiny_titan_checker( | |
| files: list[FileDiff], | |
| findings: list[Finding], | |
| enabled: bool, | |
| token: str | None, | |
| ) -> str: | |
| if not enabled: | |
| return f"Tiny Titan checker disabled. Model configured: `{TINY_TITAN_MODEL}`." | |
| if not token and not local_model_ready("tiny_titan"): | |
| return ( | |
| f"Tiny Titan checker bridge is armed for `{TINY_TITAN_MODEL}`. " | |
| "Sign in, set `HF_TOKEN`, or add the local checkpoint to run the <=4B checker." | |
| ) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a compact <=4B code-review sanity checker. Given deterministic PR findings, " | |
| "return exactly three bullets: one missed-risk hypothesis, one test recommendation, and one merge decision." | |
| ), | |
| }, | |
| {"role": "user", "content": compact_review_context(files, findings, max_chars=7000)}, | |
| ] | |
| try: | |
| return call_chat_model(TINY_TITAN_MODEL, messages, token, local_alias="tiny_titan", max_tokens=260) | |
| except Exception as exc: | |
| return ( | |
| f"Tiny Titan checker bridge is armed for `{TINY_TITAN_MODEL}`. " | |
| f"{friendly_model_error(TINY_TITAN_MODEL, exc, 'tiny_titan')}\n\n" | |
| "- Deterministic checker fallback: verify that critical security findings are fixed before merge.\n" | |
| "- Test recommendation: cover every changed auth, network, and empty-input branch.\n" | |
| "- Merge decision: hold if any critical finding remains." | |
| ) | |
| def run_minicpm_vision( | |
| image_files: list[Any] | None, | |
| files: list[FileDiff], | |
| findings: list[Finding], | |
| enabled: bool, | |
| token: str | None, | |
| ) -> str: | |
| images = normalize_uploaded_files(image_files) | |
| if not images: | |
| return f"MiniCPM-V vision not used: no PR screenshots or diagrams uploaded. Model configured: `{MINICPM_MODEL}`." | |
| if not enabled: | |
| return f"MiniCPM-V vision disabled with {len(images)} image(s) attached. Model configured: `{MINICPM_MODEL}`." | |
| prompt = ( | |
| "You are DiffSense vision context. Read these PR screenshots, UI diffs, or architecture diagrams. " | |
| "Return concise markdown notes that could affect code review: changed behavior, missing tests, security risks, " | |
| "or inconsistencies with the code diff.\n\n" | |
| + compact_review_context(files, findings, max_chars=3500) | |
| ) | |
| content: list[dict[str, Any]] = [{"type": "text", "text": prompt}] | |
| skipped = 0 | |
| for path in images[:3]: | |
| data_url = image_to_data_url(path) | |
| if data_url: | |
| content.append({"type": "image_url", "image_url": {"url": data_url}}) | |
| else: | |
| skipped += 1 | |
| if len(content) == 1: | |
| return f"MiniCPM-V vision could not read the uploaded image files. {skipped} file(s) were skipped." | |
| local_dir = LOCAL_MODEL_DIRS["minicpm"] | |
| if (local_dir / "config.json").exists(): | |
| return ( | |
| f"MiniCPM-V local checkpoint detected at `{local_dir}` with {len(content) - 1} image(s). " | |
| "The app has the image ingestion path wired; run the custom MiniCPM-V loader from this mount for full local vision inference." | |
| ) | |
| if not token: | |
| return ( | |
| f"MiniCPM-V bridge received {len(content) - 1} image(s) for `{MINICPM_MODEL}`. " | |
| f"Sign in, set `HF_TOKEN`, or add the local checkpoint at `{local_dir}` to run vision context." | |
| ) | |
| messages = [{"role": "user", "content": content}] | |
| try: | |
| return call_chat_model(MINICPM_MODEL, messages, token, local_alias="minicpm", max_tokens=420) | |
| except Exception as exc: | |
| return ( | |
| f"MiniCPM-V bridge received {len(content) - 1} image(s) for `{MINICPM_MODEL}`. " | |
| f"{friendly_model_error(MINICPM_MODEL, exc, 'minicpm')}" | |
| ) | |
| def normalize_uploaded_files(image_files: list[Any] | None) -> list[str]: | |
| if not image_files: | |
| return [] | |
| paths: list[str] = [] | |
| for file_obj in image_files: | |
| if isinstance(file_obj, str): | |
| paths.append(file_obj) | |
| elif isinstance(file_obj, dict) and file_obj.get("path"): | |
| paths.append(str(file_obj["path"])) | |
| elif hasattr(file_obj, "name"): | |
| paths.append(str(file_obj.name)) | |
| elif hasattr(file_obj, "path"): | |
| paths.append(str(file_obj.path)) | |
| return [path for path in paths if Path(path).exists()] | |
| def image_to_data_url(path: str) -> str | None: | |
| file_path = Path(path) | |
| if not file_path.exists() or file_path.stat().st_size > MAX_IMAGE_BYTES: | |
| return None | |
| mime_type, _ = mimetypes.guess_type(file_path.name) | |
| if mime_type not in {"image/png", "image/jpeg", "image/webp"}: | |
| return None | |
| encoded = base64.b64encode(file_path.read_bytes()).decode("ascii") | |
| return f"data:{mime_type};base64,{encoded}" | |
| def run_modal_bridge( | |
| files: list[FileDiff], | |
| findings: list[Finding], | |
| enabled: bool, | |
| ) -> str: | |
| if not enabled: | |
| return "Modal bridge disabled." | |
| if not MODAL_ENDPOINT: | |
| return "Modal bridge ready, but `DIFFSENSE_MODAL_ENDPOINT` is not configured as a Space secret." | |
| payload = json.dumps( | |
| { | |
| "context": compact_review_context(files, findings, max_chars=12000), | |
| "findings": [finding_to_dict(item) for item in findings], | |
| "models": { | |
| "mellum": MELLUM_MODEL, | |
| "nemotron": NEMOTRON_MODEL, | |
| "minicpm": MINICPM_MODEL, | |
| }, | |
| } | |
| ).encode("utf-8") | |
| request = Request( | |
| MODAL_ENDPOINT, | |
| data=payload, | |
| headers={"Content-Type": "application/json", "User-Agent": "DiffSense/1.0"}, | |
| method="POST", | |
| ) | |
| try: | |
| with urlopen(request, timeout=20) as response: | |
| body = response.read(20_000).decode("utf-8", errors="replace") | |
| return f"Modal endpoint `{MODAL_ENDPOINT}` responded:\n\n```json\n{body}\n```" | |
| except Exception as exc: | |
| return f"Modal bridge attempted `{MODAL_ENDPOINT}` but failed: {exc}" | |
| def summarize_deterministic(files: list[FileDiff], findings: list[Finding], prefix: str) -> str: | |
| hunk_count = sum(len(file.hunks) for file in files) | |
| counts = { | |
| "critical": sum(item.severity == "critical" for item in findings), | |
| "warning": sum(item.severity == "warning" for item in findings), | |
| "nitpick": sum(item.severity == "nitpick" for item in findings), | |
| } | |
| top_findings = findings[:3] | |
| bullets = [ | |
| f"- Reviewed {len(files)} files and {hunk_count} hunks.", | |
| f"- Found {counts['critical']} critical, {counts['warning']} warning, and {counts['nitpick']} nitpick findings.", | |
| ] | |
| for finding in top_findings: | |
| location = f"{finding.file}:{finding.line}" if finding.line else finding.file | |
| bullets.append(f"- {finding.severity.title()} in `{location}`: {finding.comment}") | |
| if not findings: | |
| bullets.append("- No high-signal risks matched the current deterministic rules.") | |
| return prefix + "\n\n" + "\n".join(bullets) | |
| def finding_to_dict(finding: Finding) -> dict[str, Any]: | |
| return { | |
| "file": finding.file, | |
| "hunk": finding.hunk, | |
| "line": finding.line, | |
| "severity": finding.severity, | |
| "category": finding.category, | |
| "comment": finding.comment, | |
| "suggestion": finding.suggestion, | |
| "source": finding.source, | |
| } | |
| def render_scoreboard(files: list[FileDiff], findings: list[Finding]) -> str: | |
| hunk_count = sum(len(file.hunks) for file in files) | |
| counts = { | |
| "critical": sum(item.severity == "critical" for item in findings), | |
| "warning": sum(item.severity == "warning" for item in findings), | |
| "nitpick": sum(item.severity == "nitpick" for item in findings), | |
| } | |
| return f""" | |
| <div class="score-grid"> | |
| <div class="score-card"><div class="score-label">Files</div><div class="score-value">{len(files)}</div></div> | |
| <div class="score-card"><div class="score-label">Hunks</div><div class="score-value">{hunk_count}</div></div> | |
| <div class="score-card"><div class="score-label">Critical</div><div class="score-value">{counts["critical"]}</div></div> | |
| <div class="score-card"><div class="score-label">Warnings</div><div class="score-value">{counts["warning"]}</div></div> | |
| </div> | |
| """ | |
| def render_review(files: list[FileDiff], findings: list[Finding]) -> str: | |
| if not files: | |
| return '<div class="empty-state">Paste a unified diff to see inline review findings.</div>' | |
| findings_by_location: dict[tuple[str, str, int | None], list[Finding]] = {} | |
| for finding in findings: | |
| findings_by_location.setdefault((finding.file, finding.hunk, finding.line), []).append(finding) | |
| chunks = [render_scoreboard(files, findings), '<div class="diff-wrap">'] | |
| for file_diff in files: | |
| chunks.append(f'<div class="file-title">{html.escape(file_diff.path)}</div>') | |
| for hunk in file_diff.hunks: | |
| chunks.append(f'<div class="hunk-title">{html.escape(hunk.header)}</div>') | |
| for line in hunk.lines: | |
| number = line.new_no if line.kind == "add" else line.old_no | |
| sign = "+" if line.kind == "add" else "-" if line.kind == "del" else " " | |
| chunks.append( | |
| f'<div class="line {line.kind}">' | |
| f'<div class="line-no">{number if number is not None else ""}</div>' | |
| f'<div class="line-code">{html.escape(sign + line.text)}</div>' | |
| f"</div>" | |
| ) | |
| for finding in findings_by_location.get((file_diff.path, hunk.header, line.new_no), []): | |
| chunks.append(render_finding(finding)) | |
| for finding in findings_by_location.get((file_diff.path, hunk.header, None), []): | |
| chunks.append(render_finding(finding)) | |
| chunks.append("</div>") | |
| return "\n".join(chunks) | |
| def render_finding(finding: Finding) -> str: | |
| return f""" | |
| <div class="finding {html.escape(finding.severity)}"> | |
| <span class="badge {html.escape(finding.severity)}">{html.escape(finding.severity)}</span> | |
| <span class="category">{html.escape(finding.category)}</span> | |
| <div class="finding-body">{html.escape(finding.comment)}</div> | |
| <div class="suggestion"><strong>Fix:</strong> {html.escape(finding.suggestion)}</div> | |
| </div> | |
| """ | |
| def run_review( | |
| diff_input: str, | |
| use_model_summary: bool, | |
| use_nemotron_router: bool, | |
| use_tiny_titan: bool, | |
| use_minicpm_vision: bool, | |
| use_modal_bridge: bool, | |
| image_files: list[Any] | None, | |
| hf_token: gr.OAuthToken | None = None, | |
| ) -> tuple[str, list[dict[str, Any]], str, str]: | |
| diff_text = normalize_diff(diff_input) | |
| if not diff_text: | |
| raise gr.Error("Paste a unified diff first, or load the sample diff.") | |
| files = parse_unified_diff(diff_text) | |
| if not files or not any(file.hunks for file in files): | |
| raise gr.Error("I could not find unified diff hunks. Look for lines starting with @@.") | |
| findings = review_diff(files) | |
| token = hf_token.token if hf_token else os.getenv("HF_TOKEN") | |
| summary = summarize_with_model(files, findings, use_model_summary, hf_token) | |
| nemotron_notes = run_nemotron_router(files, findings, use_nemotron_router, token) | |
| tiny_titan_notes = run_tiny_titan_checker(files, findings, use_tiny_titan, token) | |
| minicpm_notes = run_minicpm_vision(image_files, files, findings, use_minicpm_vision, token) | |
| modal_notes = run_modal_bridge(files, findings, use_modal_bridge) | |
| agent_trace = render_agent_trace(nemotron_notes, tiny_titan_notes, minicpm_notes, modal_notes) | |
| return render_review(files, findings), [finding_to_dict(item) for item in findings], summary, agent_trace | |
| def render_agent_trace(nemotron_notes: str, tiny_titan_notes: str, minicpm_notes: str, modal_notes: str) -> str: | |
| return "\n\n".join( | |
| [ | |
| "### Model Runtime Status", | |
| render_model_runtime_status(), | |
| "### Nemotron 3 Nano Router", | |
| nemotron_notes, | |
| "### Tiny Titan 4B Checker", | |
| tiny_titan_notes, | |
| "### MiniCPM-V 4.6 Vision Context", | |
| minicpm_notes, | |
| "### Modal Provider Bridge", | |
| modal_notes, | |
| ] | |
| ) | |
| def render_model_runtime_status() -> str: | |
| data_state = "mounted" if DATA_ROOT.exists() else "not mounted" | |
| data_writable = "writable" if os.access(DATA_ROOT, os.W_OK) else "read-only or unavailable" | |
| lines = [ | |
| f"- Data mount: `{DATA_ROOT}` is **{data_state}** and **{data_writable}**.", | |
| f"- Mellum summary: `{MELLUM_MODEL}`; local path {format_local_model_status('mellum')}.", | |
| f"- Nemotron router: `{NEMOTRON_MODEL}`; local path {format_local_model_status('nemotron')}.", | |
| f"- Tiny Titan checker: `{TINY_TITAN_MODEL}`; local path {format_local_model_status('tiny_titan')}.", | |
| f"- MiniCPM-V vision: `{MINICPM_MODEL}`; local path {format_local_model_status('minicpm')}.", | |
| "- Deterministic reviewer remains the always-on fallback for a reliable demo.", | |
| ] | |
| return "\n".join(lines) | |
| def format_local_model_status(alias: str) -> str: | |
| model_dir = LOCAL_MODEL_DIRS[alias] | |
| if (model_dir / "config.json").exists(): | |
| return f"`{model_dir}` is **ready**" | |
| if model_dir.exists(): | |
| return f"`{model_dir}` slot is ready; waiting for `config.json`" | |
| return f"`{model_dir}` slot is configured; waiting for checkpoint files" | |
| def local_model_ready(alias: str) -> bool: | |
| model_dir = LOCAL_MODEL_DIRS.get(alias) | |
| return bool(model_dir and (model_dir / "config.json").exists()) | |
| def load_sample() -> str: | |
| return SAMPLE_DIFF | |
| APP_THEME = gr.themes.Soft(primary_hue="slate", neutral_hue="slate") | |
| with gr.Blocks() as demo: | |
| gr.HTML( | |
| """ | |
| <div id="hero"> | |
| <h1>DiffSense</h1> | |
| <p>Private, offline-first PR review for the Build Small hackathon. Paste a diff or public GitHub PR URL, get severity-tagged findings, keep your code out of SaaS review tools.</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Sidebar(): | |
| gr.LoginButton() | |
| use_model_summary = gr.Checkbox( | |
| value=True, | |
| label="Add optional Mellum model summary", | |
| info="Tries local /data checkpoint first, then OAuth/HF_TOKEN provider, with deterministic fallback.", | |
| ) | |
| use_nemotron_router = gr.Checkbox( | |
| value=True, | |
| label="Run Nemotron 3 Nano router", | |
| info=f"Uses local /data checkpoint or {NEMOTRON_MODEL}.", | |
| ) | |
| use_tiny_titan = gr.Checkbox( | |
| value=True, | |
| label="Run Tiny Titan 4B checker", | |
| info=f"Uses local /data checkpoint or {TINY_TITAN_MODEL}.", | |
| ) | |
| use_minicpm_vision = gr.Checkbox( | |
| value=True, | |
| label="Run MiniCPM-V 4.6 vision", | |
| info=f"Uses uploaded PR images with local /data checkpoint or {MINICPM_MODEL}.", | |
| ) | |
| use_modal_bridge = gr.Checkbox( | |
| value=True, | |
| label="Send payload to Modal bridge", | |
| info="Uses DIFFSENSE_MODAL_ENDPOINT when configured.", | |
| ) | |
| sample_btn = gr.Button("Load sample diff") | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=4): | |
| diff_input = gr.Textbox( | |
| value="", | |
| lines=18, | |
| max_lines=24, | |
| label="Unified diff or public GitHub PR URL", | |
| placeholder="Paste a unified diff, paste https://github.com/org/repo/pull/123, or click Load sample diff.", | |
| interactive=True, | |
| ) | |
| image_files = gr.File( | |
| label="PR screenshots or diagrams for MiniCPM-V", | |
| file_count="multiple", | |
| file_types=["image"], | |
| ) | |
| run_btn = gr.Button("Review diff", variant="primary") | |
| summary_output = gr.Markdown( | |
| value="Run a review to get the risk summary.", | |
| label="Reviewer summary", | |
| ) | |
| agent_output = gr.Markdown( | |
| value="### Model Runtime Status\n\n" + render_model_runtime_status(), | |
| label="Model agent trace", | |
| ) | |
| with gr.Column(scale=6): | |
| review_output = gr.HTML( | |
| value='<div class="empty-state">Paste a unified diff or public GitHub PR URL, then click Review diff.</div>', | |
| label="Detailed inline review", | |
| ) | |
| json_output = gr.JSON(label="Structured findings") | |
| sample_btn.click(fn=load_sample, outputs=diff_input) | |
| run_btn.click( | |
| fn=run_review, | |
| inputs=[ | |
| diff_input, | |
| use_model_summary, | |
| use_nemotron_router, | |
| use_tiny_titan, | |
| use_minicpm_vision, | |
| use_modal_bridge, | |
| image_files, | |
| ], | |
| outputs=[review_output, json_output, summary_output, agent_output], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(css=CSS, theme=APP_THEME, ssr_mode=False) | |