from __future__ import annotations
import json
import os
import pathlib
import re
import shutil
import subprocess
import sys
import textwrap
from dataclasses import dataclass, field
from typing import Any
ROOT = pathlib.Path(__file__).resolve().parents[1]
OUTPUTS = ROOT / "outputs"
REPORT_PATH = OUTPUTS / "verification_report.txt"
@dataclass
class VerificationState:
failures: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
info: list[str] = field(default_factory=list)
def fail(self, msg: str) -> None:
self.failures.append(msg)
self.info.append(f"FAIL: {msg}")
def warn(self, msg: str) -> None:
self.warnings.append(msg)
self.info.append(f"WARNING: {msg}")
def ok(self, msg: str) -> None:
self.info.append(f"PASS: {msg}")
def _run_python(code: str, timeout: int = 120) -> tuple[int, str, str]:
proc = subprocess.run(
[str(ROOT / ".venv" / "bin" / "python"), "-c", code],
cwd=str(ROOT),
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
return proc.returncode, proc.stdout, proc.stderr
def _run_cmd(cmd: list[str], timeout: int = 180, cwd: pathlib.Path | None = None) -> tuple[int, str, str]:
proc = subprocess.run(
cmd,
cwd=str(cwd or ROOT),
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
return proc.returncode, proc.stdout, proc.stderr
def _pyright_bin() -> str:
candidate = ROOT / ".venv" / "bin" / "pyright"
return str(candidate) if candidate.exists() else "pyright"
def section_1_rocm_and_unsloth(state: VerificationState) -> None:
state.info.append("\n=== SECTION 1: AMD ROCm + Unsloth Setup ===")
rc, out, err = _run_python(
textwrap.dedent(
"""
import torch
print(f"cuda_available={torch.cuda.is_available()}")
if torch.cuda.is_available():
p = torch.cuda.get_device_properties(0)
print(f"device={torch.cuda.get_device_name(0)}")
print(f"hip={torch.version.hip}")
print(f"vram={p.total_memory/1e9:.1f}")
"""
)
)
if rc != 0:
state.warn(f"ROCm detection script failed: {err.strip() or out.strip()}")
elif "cuda_available=True" not in out:
state.warn("CUDA/ROCm not available in current environment; set HSA_OVERRIDE_GFX_VERSION=11.0.0 on RX 7900 GRE")
else:
state.ok("ROCm/CUDA available")
rc, out, err = _run_python(
"import unsloth, unsloth_zoo; print(unsloth.__version__)"
)
if rc != 0:
msg = err.strip() or out.strip()
if "no usable HIP accelerator" in msg or "NotImplementedError" in msg:
state.warn(f"Unsloth import requires ROCm torch wheels in this host env: {msg}")
else:
state.fail(f"Unsloth import failed: {msg}")
else:
state.ok("Unsloth import check passed")
train_src = (ROOT / "training" / "train_lora.py").read_text(encoding="utf-8")
if "load_in_4bit=True" in train_src:
state.fail("train_lora.py still has load_in_4bit=True")
elif "load_in_4bit=False" in train_src and "load_in_16bit=True" in train_src:
state.ok("QLoRA AMD guard check passed")
else:
state.fail("train_lora.py missing explicit load_in_4bit/load_in_16bit AMD config")
if 'use_gradient_checkpointing="unsloth"' not in train_src:
state.fail('train_lora.py missing use_gradient_checkpointing="unsloth"')
else:
state.ok("Gemma4 gradient checkpointing guard passed")
def section_2_static_analysis(state: VerificationState) -> None:
state.info.append("\n=== SECTION 2: Static Analysis Pipeline ===")
rc, out, _ = _run_cmd(["grep", "-r", "semgrep", "analyzers/", "db/", "inference.py"], timeout=30)
if rc == 0 and out.strip():
state.fail(f"Semgrep references remain:\n{out.strip()}")
else:
state.ok("Semgrep removed from core runtime paths")
test_file = pathlib.Path("/tmp/pyright_test.py")
test_file.write_text("def f(x: int) -> str:\n return x\n", encoding="utf-8")
rc, out, err = _run_cmd([_pyright_bin(), "--outputjson", str(test_file)], timeout=30)
if rc not in {0, 1}:
state.fail(f"Pyright invocation failed: {err.strip()}")
else:
try:
payload = json.loads(out)
errors = [d for d in payload.get("generalDiagnostics", []) if d.get("severity") == "error"]
if not errors:
state.fail("Pyright failed to report known type error")
else:
state.ok(f"Pyright JSON check passed ({len(errors)} errors on test file)")
except Exception as exc:
state.fail(f"Pyright JSON decode failed: {exc}")
rc, out, err = _run_python(
textwrap.dedent(
"""
from analyzers.ast_checker import run_all
import pathlib, textwrap
p = pathlib.Path('/tmp/ast_test.py')
p.write_text(textwrap.dedent('''
def bad_default(x=[]):
return x
try:
pass
except:
pass
x = None
if x == None:
pass
'''))
findings = run_all(str(p))
print(sorted({f.rule for f in findings}))
"""
)
)
if rc != 0:
state.fail(f"AST checker execution failed: {err.strip() or out.strip()}")
else:
rules = set(json.loads(out.strip().replace("'", '"')) if out.strip().startswith("[") else [])
expected = {"mutable_default_arg", "bare_except", "none_equality_check"}
if not expected.issubset(rules):
state.fail(f"AST checker missing expected rules. got={rules}")
else:
state.ok("AST checker known-pattern checks passed")
rc, out, err = _run_python(
textwrap.dedent(
"""
from analyzers.pipeline import run_pipeline
findings = run_pipeline('sample_project')
print(len(findings))
print(sorted({f.severity for f in findings}))
"""
),
timeout=180,
)
if rc != 0:
state.fail(f"Analyzer pipeline run failed: {err.strip() or out.strip()}")
else:
lines = [l.strip() for l in out.splitlines() if l.strip()]
count = int(lines[0]) if lines else 0
severities = set()
if len(lines) > 1:
try:
severities = set(json.loads(lines[1].replace("'", '"')))
except Exception:
pass
if count <= 10:
state.fail(f"Pipeline findings too low: {count}")
elif "high" not in severities:
state.fail(f"Pipeline produced no high severity findings: {severities}")
else:
state.ok(f"Pipeline findings check passed ({count})")
def section_3_agent_judge(state: VerificationState) -> None:
state.info.append("\n=== SECTION 3: Agent + Judge ===")
rc, out, err = _run_python(
textwrap.dedent(
"""
from llm.agent_runner import extract_thinking_and_action
import json
test_output = '''
root cause is config.py
{"action_type": "FLAG_DEPENDENCY_ISSUE", "target_line": 34, "content": "x", "attributed_to": "config"}
'''
thinking, action = extract_thinking_and_action(test_output)
print(len(thinking))
print(action.get('action_type',''))
print(action.get('attributed_to',''))
"""
)
)
if rc != 0:
state.fail(f"Thinking extraction check failed: {err.strip() or out.strip()}")
else:
vals = [l.strip() for l in out.splitlines() if l.strip()]
if len(vals) < 3 or int(vals[0]) <= 20 or vals[1] != "FLAG_DEPENDENCY_ISSUE" or vals[2] != "config":
state.fail(f"Thinking extraction invalid output: {vals}")
else:
state.ok("Thinking trace extraction check passed")
if not os.getenv("HF_TOKEN"):
state.warn("HF_TOKEN missing; skipping live judge API scoring check")
else:
rc, out, err = _run_python(
textwrap.dedent(
"""
from llm.thinking_judge import score_thinking
result = score_thinking(
thinking_trace='Bug is in config.py due to None timeout',
action={'action_type': 'FLAG_DEPENDENCY_ISSUE', 'attributed_to': 'config'},
finding={'module_id': 'config', 'severity': 'error', 'message': 'Missing key returns None'},
graph_context={'config': {'dependents': ['checkout']}}
)
print(result['score'])
print('what_was_right' in result and 'what_was_wrong' in result)
"""
),
timeout=90,
)
if rc != 0:
state.fail(f"Judge scoring failed: {err.strip() or out.strip()}")
else:
lines = [l.strip() for l in out.splitlines() if l.strip()]
if not lines:
state.fail("Judge scoring returned empty output")
else:
score = float(lines[0])
if not (0.0 <= score <= 1.0):
state.fail(f"Judge score out of range: {score}")
else:
state.ok("Judge scoring API check passed")
rc, out, err = _run_python(
"from training.trajectory_collector import compute_composite_reward as c; print(c(0.6,0.8)); print(c(0.6,0.1))"
)
if rc != 0:
state.fail(f"Composite reward helper failed: {err.strip() or out.strip()}")
else:
lines = [float(x.strip()) for x in out.splitlines() if x.strip()]
if len(lines) != 2 or abs(lines[0] - (0.6 * 0.6 + 0.8 * 0.4)) > 1e-3 or lines[1] >= lines[0]:
state.fail("Composite reward formula verification failed")
else:
state.ok("Composite reward formula check passed")
def section_4_training_data(state: VerificationState) -> None:
state.info.append("\n=== SECTION 4: Training Data Quality ===")
dataset_path = ROOT / "outputs" / "training" / "dataset.latest.jsonl"
if not dataset_path.exists():
state.warn("dataset.latest.jsonl missing; run inference.py or trajectory collection first")
return
records = [json.loads(l) for l in dataset_path.read_text(encoding="utf-8").splitlines() if l.strip()]
if len(records) < 50:
state.fail(f"Training records too low: {len(records)}")
else:
state.ok(f"Training record count OK: {len(records)}")
thinking_count = sum(1 for r in records if "" in str(r.get("text", "")) or "" in str(r.get("chosen", "")))
ratio = thinking_count / max(1, len(records))
if ratio < 0.75:
state.fail(f"Reasoning ratio too low: {ratio:.0%}")
else:
state.ok(f"Reasoning ratio check passed: {ratio:.0%}")
dpo_path = ROOT / "outputs" / "training" / "dpo_pairs.jsonl"
if dpo_path.exists():
pairs = [json.loads(l) for l in dpo_path.read_text(encoding="utf-8").splitlines() if l.strip()]
invalid = [p for p in pairs[:20] if not (p.get("prompt") and p.get("chosen") and p.get("rejected") and p.get("chosen") != p.get("rejected"))]
if invalid:
state.fail("Invalid DPO pairs detected in spot-check")
else:
state.ok(f"DPO pairs spot-check passed ({len(pairs)})")
else:
state.warn("No dpo_pairs.jsonl yet (run trajectory collector first)")
train_modules = {str(r.get("module_id", "")) for r in records}
eval_modules = {"cart", "checkout", "auth", "config", "payments"}
leaked = train_modules & eval_modules
if leaked:
state.fail(f"Eval leakage detected: {sorted(leaked)}")
else:
state.ok("No direct eval-module leakage in module_id field")
def section_5_env_integrity(state: VerificationState) -> None:
state.info.append("\n=== SECTION 5: RL Environment Integrity ===")
if shutil.which("openenv"):
rc, out, err = _run_cmd(["openenv", "validate"], timeout=120)
if rc != 0:
state.fail(f"openenv validate failed: {err.strip() or out.strip()}")
else:
state.ok("openenv validate passed")
else:
state.warn("openenv CLI not available; skipping openenv validate")
rc, out, err = _run_python(
textwrap.dedent(
"""
from env.environment import CodeReviewEnv
from env.action import ReviewAction, ActionType
env = CodeReviewEnv(source_root='sample_project')
obs = env.reset(task_id='style_review')
assert obs.within_budget
assert len(obs.available_actions) > 0
result = env.step(ReviewAction(action_type=ActionType.REQUEST_CHANGES))
reward_value = result.reward if isinstance(result.reward, (int,float)) else result.reward.raw_value
print(reward_value)
"""
),
timeout=120,
)
if rc != 0:
state.fail(f"Environment step verification failed: {err.strip() or out.strip()}")
else:
reward = float([l for l in out.splitlines() if l.strip()][-1])
if not (-2.0 <= reward <= 2.0):
state.fail(f"Reward out of expected range: {reward}")
else:
state.ok("Environment reward-range check passed")
def section_6_hf_readiness(state: VerificationState) -> None:
state.info.append("\n=== SECTION 6: HF Deployment Readiness ===")
dockerfile = (ROOT / "Dockerfile").read_text(encoding="utf-8")
if "7860" not in dockerfile or "CMD" not in dockerfile:
state.fail("Dockerfile missing required HF Spaces port/CMD settings")
else:
state.ok("Dockerfile port and CMD check passed")
server_src = (ROOT / "server" / "app.py").read_text(encoding="utf-8")
for banned in ["import torch", "import llama_cpp", "from unsloth"]:
if banned in server_src:
state.fail(f"server/app.py contains banned runtime GPU import: {banned}")
break
else:
state.ok("server/app.py runtime GPU import guard passed")
inf_src = (ROOT / "inference.py").read_text(encoding="utf-8")
if "os.getenv" not in inf_src and "os.environ" not in inf_src:
state.fail("inference.py does not appear to read environment variables")
else:
state.ok("inference.py environment-variable check passed")
def section_7_inference_logs(state: VerificationState) -> None:
state.info.append("\n=== SECTION 7: Inference Script Compliance ===")
env = os.environ.copy()
env.setdefault("GRAPHREVIEW_AGENT_INFERENCE_ENABLED", "false")
proc = subprocess.run(
[str(ROOT / ".venv" / "bin" / "python"), "inference.py", "sample_project"],
cwd=str(ROOT),
capture_output=True,
text=True,
timeout=1200,
check=False,
env=env,
)
stdout = proc.stdout
if "[START]" not in stdout or "[END]" not in stdout:
state.fail("inference.py missing START/END logs")
return
end_lines = [l for l in stdout.splitlines() if "[END]" in l]
if not end_lines:
state.fail("No END line in inference output")
return
try:
end_data = json.loads(end_lines[-1].split("[END]", 1)[1].strip())
except Exception as exc:
state.fail(f"END payload JSON parse failed: {exc}")
return
required = ["agent_findings", "deterministic_findings", "model", "precision", "recall", "run_id"]
missing = [k for k in required if k not in end_data]
if missing:
state.fail(f"END payload missing fields: {missing}")
else:
state.ok("END payload fields check passed")
if "agent_llm_disabled" in stdout:
state.fail("inference logs still contain agent_llm_disabled marker")
recall = float(end_data.get("recall", 0.0))
if recall <= 0.05:
state.fail(f"Recall too low: {recall:.3f}")
else:
state.ok(f"Recall threshold check passed ({recall:.3f})")
scores: list[float] = [float(end_data.get("precision", 0.0))]
for _ in range(2):
p = subprocess.run(
[str(ROOT / ".venv" / "bin" / "python"), "inference.py", "sample_project"],
cwd=str(ROOT),
capture_output=True,
text=True,
timeout=1200,
check=False,
env=env,
)
end = [l for l in p.stdout.splitlines() if "[END]" in l]
if not end:
state.fail("Reproducibility run missing END log")
return
payload = json.loads(end[-1].split("[END]", 1)[1].strip())
scores.append(float(payload.get("precision", 0.0)))
variance = max(scores) - min(scores)
if variance >= 0.1:
state.fail(f"Precision variance too high: scores={scores}, variance={variance:.3f}")
else:
state.ok(f"Baseline reproducibility check passed: {scores}")
def section_8_training_graph(state: VerificationState) -> None:
state.info.append("\n=== SECTION 8: Training Graph Output ===")
# Build graph for latest run if needed.
rc, out, err = _run_python(
textwrap.dedent(
"""
from db.store import Store
from visualizer.training_graph import build_training_graph
store = Store(source_root='sample_project')
runs = store.list_training_runs(limit=1)
if runs:
path = build_training_graph(source_root='sample_project', run_id=runs[0].run_id)
print(path)
"""
),
timeout=180,
)
if rc != 0:
state.warn(f"Graph build helper failed for latest run: {err.strip() or out.strip()}")
graph_path = ROOT / "outputs" / "NodeAudit_graph.html"
if not graph_path.exists():
state.fail("Training graph HTML not generated at outputs/NodeAudit_graph.html")
return
content = graph_path.read_text(encoding="utf-8")
if len(content) <= 10_000:
state.fail("Training graph HTML too small")
elif "vis-network" not in content and "pyvis" not in content.lower():
state.fail("Training graph file does not look like a valid pyvis artifact")
else:
state.ok("Training graph structure check passed")
cdn_refs = re.findall(r'https?://(?!localhost)[^\s"\']+\.js', content)
external = [u for u in cdn_refs if "cdnjs" not in u and "unpkg" not in u]
if external:
state.warn(f"External JS refs remain in graph HTML: {external[:3]}")
if "training" not in content.lower() and "avg_reward" not in content.lower():
state.fail("Training graph is missing training outcome annotation text")
else:
state.ok("Training graph annotation text check passed")
def run_verification_suite() -> VerificationState:
state = VerificationState()
OUTPUTS.mkdir(parents=True, exist_ok=True)
section_1_rocm_and_unsloth(state)
section_2_static_analysis(state)
section_3_agent_judge(state)
section_4_training_data(state)
section_5_env_integrity(state)
section_6_hf_readiness(state)
section_7_inference_logs(state)
section_8_training_graph(state)
REPORT_PATH.write_text("\n".join(state.info) + "\n", encoding="utf-8")
return state
def test_verification_suite() -> None:
state = run_verification_suite()
assert not state.failures, "\n".join(state.failures)
if __name__ == "__main__":
result = run_verification_suite()
print("\n".join(result.info))
if result.failures:
print(f"\nVerification failed with {len(result.failures)} FAIL items")
sys.exit(1)
print("\nVerification passed with no FAIL items")