from __future__ import annotations import argparse from datetime import UTC, datetime import json import os from pathlib import Path import uuid from openai import OpenAI from db.seed import seed_project from db.store import Store from env.runtime_config import load_runtime_config from parser.semantic_checks import detect_semantic_issues from training.run_manager import TrainingRunManager from training.weights import WeightSafetyManager # Submission-required runtime variables. HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HFTOKEN") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") # Hosted fallback: if HF_TOKEN exists and endpoint/model are not explicitly provided, # use Hugging Face Router with a stable instruct model. if HF_TOKEN and not os.getenv("API_BASE_URL") and not os.getenv("GRAPHREVIEW_LLM_BASE_URL"): API_BASE_URL = "https://router.huggingface.co/v1" else: API_BASE_URL = os.getenv("API_BASE_URL", os.getenv("GRAPHREVIEW_LLM_BASE_URL", "http://localhost:11434/v1")) if HF_TOKEN and not os.getenv("MODEL_NAME"): MODEL_NAME = "meta-llama/Meta-Llama-3.1-8B-Instruct" else: MODEL_NAME = os.getenv("MODEL_NAME", "gemma4:e4b") # Keep current behavior for local Ollama while supporting hosted providers via HF_TOKEN. API_KEY = HF_TOKEN or os.getenv("GRAPHREVIEW_LLM_API_KEY", "ollama") def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="GraphReview deterministic inference/training harness") parser.add_argument("target", help="Path to target Python project") parser.add_argument("--db-path", default=None, help="Optional DB path") parser.add_argument("--force-seed", action="store_true", help="Force re-seed") parser.add_argument( "--register-weights", action="store_true", help="Register model weights and write verification manifest", ) parser.add_argument( "--deterministic-output", default="outputs/training/deterministic_findings.jsonl", help="Path to write normalized deterministic findings", ) parser.add_argument("--baseline-precision", type=float, default=None, help="Optional precision floor baseline") parser.add_argument("--baseline-recall", type=float, default=None, help="Optional recall floor baseline") parser.add_argument( "--regression-tolerance", type=float, default=0.01, help="Allowed drop from baseline precision/recall", ) return parser def _finding_key(analyzer: str, module_id: str, rule_id: str, line: int) -> str: return f"{analyzer}:{module_id}:{rule_id}:{line}" def _target_key(module_id: str, line: int) -> str: return f"{module_id}:{line}" def _safe_float(raw: str | None, default: float) -> float: if raw is None: return default try: return float(raw) except ValueError: return default def _build_agent_prompt(module_id: str, code: str, ast_summary: str) -> str: return ( "You are reviewing one Python module in a dependency-aware code review environment. " "Do not rely on prior analyzer findings because they are hidden from you. " "Find concrete, actionable issues only, with line numbers and confidence.\n\n" "Your objectives are:\n" "1) Identify real bug, security, or dependency-risk issues in the provided code.\n" "2) Prefer deterministic evidence over speculative style feedback.\n" "3) If you suspect cascade risk, explain likely upstream/downstream impact in rationale.\n" "4) Return strictly valid JSON matching this schema: " "{\"findings\": [{\"line\": int, \"category\": \"bug|security|dependency\", \"rule_hint\": str, \"message\": str, \"confidence\": float}]}.\n\n" f"Module: {module_id}\n" f"AST Summary: {ast_summary}\n" "Code:\n" f"{code}\n" ) def _extract_agent_findings(store: Store, config) -> set[str]: model = MODEL_NAME base_url = API_BASE_URL api_key = API_KEY enabled = os.getenv("GRAPHREVIEW_AGENT_INFERENCE_ENABLED", "true").strip().lower() == "true" findings: set[str] = set() node_snapshot = store.get_full_graph().nodes use_llm = enabled and base_url and model client = OpenAI(api_key=api_key, base_url=base_url, timeout=12.0) if use_llm else None llm_enabled = client is not None if llm_enabled: try: models = client.models.list() available = {item.id for item in models.data if getattr(item, "id", None)} if model not in available: print( f"[STEP] agent_llm_fallback reason=model-not-found model={model} " f"available_count={len(available)}" ) llm_enabled = False except Exception as exc: print(f"[STEP] agent_llm_fallback reason=model-list-failed error={type(exc).__name__}") llm_enabled = False for node in node_snapshot: node_row = store.get_node(node.module_id) if node_row is None: continue module_id = node_row.module_id code = node_row.raw_code ast_summary = node_row.ast_summary collected = False if llm_enabled and client is not None: prompt = _build_agent_prompt(module_id=module_id, code=code, ast_summary=ast_summary) try: resp = client.chat.completions.create( model=model, temperature=0.0, response_format={"type": "json_object"}, messages=[ { "role": "system", "content": "Return only JSON. Do not include markdown. Keep claims concrete and line-specific.", }, {"role": "user", "content": prompt}, ], ) text = (resp.choices[0].message.content or "{}").strip() payload = json.loads(text) rows = payload.get("findings", []) if isinstance(payload, dict) else [] if isinstance(rows, list): for item in rows: if not isinstance(item, dict): continue confidence = _safe_float(str(item.get("confidence", "0.0")), 0.0) if confidence < 0.45: continue line = max(1, int(item.get("line", 1))) category = str(item.get("category", "bug")).lower() analyzer = "agent-security" if category == "security" else "agent-logic" rule_hint = str(item.get("rule_hint") or "agent")[:80] findings.add(_finding_key(analyzer, module_id, rule_hint, line)) collected = True except Exception as exc: print( f"[STEP] agent_llm_fallback reason=completion-failed error={type(exc).__name__} " f"module={module_id}" ) llm_enabled = False collected = False if collected: continue # Deterministic fallback so training bootstrap still works offline. deterministic_rows = store.get_analyzer_findings_for_module(module_id) for finding in deterministic_rows[:2]: findings.add(_finding_key("agent-fallback", module_id, finding.rule_id, finding.line)) for issue in detect_semantic_issues(code): findings.add(_finding_key("agent-heuristic", module_id, issue.stage, max(issue.line, 1))) return findings def main() -> None: args = _build_parser().parse_args() config = load_runtime_config() target = Path(args.target).resolve() print(f"[START] target={target} model={MODEL_NAME} mode=deterministic-ground-truth") weight_manager = WeightSafetyManager(Path(config.llm_weight_manifest_dir)) verified_weight_path: str | None = None if args.register_weights: try: manifest = weight_manager.register_existing( model_name=MODEL_NAME, weight_path=Path(config.llm_model_agent_path), ) print( "[STEP] weights_registered " + json.dumps( { "model": manifest.model_name, "sha256": manifest.sha256, "size_bytes": manifest.size_bytes, }, sort_keys=True, ) ) except FileNotFoundError: print( f"[STEP] weights_register_skipped reason=missing-local-weights model={MODEL_NAME} " f"path={config.llm_model_agent_path}" ) try: verified_weight_path = str(weight_manager.load_verified(MODEL_NAME)) except FileNotFoundError: try: manifest = weight_manager.register_existing( model_name=MODEL_NAME, weight_path=Path(config.llm_model_agent_path), ) print( "[STEP] weights_registered " + json.dumps( { "model": manifest.model_name, "sha256": manifest.sha256, "size_bytes": manifest.size_bytes, }, sort_keys=True, ) ) verified_weight_path = str(weight_manager.load_verified(MODEL_NAME)) except FileNotFoundError: print( f"[STEP] weights_unavailable reason=missing-local-weights model={MODEL_NAME} " f"path={config.llm_model_agent_path}" ) if verified_weight_path is not None: print(f"[STEP] weights_verified path={verified_weight_path}") else: print("[STEP] weights_verified path=unavailable mode=api-only") seed_result = seed_project(target_dir=target, db_path=args.db_path, force=args.force_seed) print(f"[STEP] seeded {json.dumps(seed_result, sort_keys=True)}") store = Store(source_root=str(target), db_path=args.db_path) deterministic_findings = store.get_analyzer_findings() deterministic_keys = { _finding_key(item.analyzer, item.module_id, item.rule_id, item.line) for item in deterministic_findings } deterministic_targets = { _target_key(item.module_id, item.line) for item in deterministic_findings } agent_keys = _extract_agent_findings(store=store, config=config) agent_targets: set[str] = set() for item in agent_keys: parts = item.split(":") if len(parts) < 4: continue module_id = parts[1] try: line = int(parts[-1]) except ValueError: continue agent_targets.add(_target_key(module_id, line)) manager = TrainingRunManager() comparison = manager.compare(deterministic_findings=deterministic_targets, agent_findings=agent_targets) records: list[dict[str, object]] = [] for finding in deterministic_findings: reasoning_text = ( "\n" f"Deterministic analyzer {finding.analyzer} reported {finding.rule_id} at line {finding.line} in {finding.module_id}. " "This is treated as supervised high-confidence signal for bootstrap training.\n" "\n" "\n" + json.dumps( { "action_type": "FLAG_BUG", "target_line": finding.line, "content": finding.message, "attributed_to": None, }, sort_keys=True, ) + "\n" ) records.append( { **manager.build_preference_record( prompt=( "Review the module and detect concrete bugs, security issues, and " "dependency-attributed cascade problems without relying on prior findings." ), agent_output=reasoning_text, deterministic_targets=[ _finding_key( finding.analyzer, finding.module_id, finding.rule_id, finding.line, ) ], reward=1.0, ), "module_id": f"{target.name}/{finding.module_id}", "text": reasoning_text, "chosen": reasoning_text, } ) # Add a second deterministic variant to keep training volume healthy for small corpora. reasoning_text_variant = ( "\n" f"Cross-check confirms a reproducible issue in {finding.module_id} at line {finding.line}. " f"Rule hint={finding.rule_id}; analyzer={finding.analyzer}. " "Action should prioritize precise attribution and concrete remediation notes.\n" "\n" "\n" + json.dumps( { "action_type": "FLAG_BUG", "target_line": finding.line, "content": f"[verified] {finding.message}", "attributed_to": None, }, sort_keys=True, ) + "\n" ) records.append( { **manager.build_preference_record( prompt=( "Re-check this module and emit an evidence-based action with strict line attribution." ), agent_output=reasoning_text_variant, deterministic_targets=[ _finding_key( finding.analyzer, finding.module_id, finding.rule_id, finding.line, ) ], reward=1.0, ), "module_id": f"{target.name}/{finding.module_id}", "text": reasoning_text_variant, "chosen": reasoning_text_variant, } ) output_path = Path(args.deterministic_output) manager.save_records(output_path, records) baseline_precision = args.baseline_precision baseline_recall = args.baseline_recall prior_runs = store.list_training_runs(limit=100) if baseline_precision is None and prior_runs: baseline_precision = max(item.precision for item in prior_runs) if baseline_recall is None and prior_runs: baseline_recall = max(item.recall for item in prior_runs) passed_non_regression = True if baseline_precision is not None and baseline_recall is not None: try: manager.assert_non_regression( baseline_precision=baseline_precision, baseline_recall=baseline_recall, current_precision=comparison.precision, current_recall=comparison.recall, tolerance=args.regression_tolerance, ) except ValueError as exc: passed_non_regression = False print(f"[STEP] non_regression_guard_failed reason={str(exc)}") else: print( "[STEP] non_regression_guard " + json.dumps( { "baseline_precision": baseline_precision, "baseline_recall": baseline_recall, "tolerance": args.regression_tolerance, }, sort_keys=True, ) ) print( "[STEP] training_dataset " + json.dumps( { "output": str(output_path), "records": len(records), "precision": comparison.precision, "recall": comparison.recall, "false_negatives": comparison.false_negatives, }, sort_keys=True, ) ) run_id = f"tr-{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}" run_config = { "target": str(target), "model": MODEL_NAME, "model_path": config.llm_model_agent_path, "agent_inference_enabled": os.getenv("GRAPHREVIEW_AGENT_INFERENCE_ENABLED", "true"), "regression_tolerance": args.regression_tolerance, "baseline_precision": baseline_precision, "baseline_recall": baseline_recall, } sha256 = "unavailable" if verified_weight_path is not None: sha256 = weight_manager.checksum(Path(verified_weight_path)) store.create_training_run( run_id=run_id, model_name=MODEL_NAME, model_sha256=sha256, deterministic_findings=len(deterministic_keys), agent_findings=len(agent_keys), true_positives=comparison.true_positives, false_positives=comparison.false_positives, false_negatives=comparison.false_negatives, precision=comparison.precision, recall=comparison.recall, passed_non_regression=passed_non_regression, output_path=str(output_path), run_config_json=json.dumps(run_config, sort_keys=True), ) print(f"[STEP] training_run_id={run_id}") print( "[END] " + json.dumps( { "ok": True, "deterministic_findings": len(deterministic_findings), "agent_findings": len(agent_keys), "model_weight": verified_weight_path or "unavailable", "model": MODEL_NAME, "precision": comparison.precision, "recall": comparison.recall, "run_id": run_id, }, sort_keys=True, ) ) if __name__ == "__main__": main()