"""Deterministic query planning and audit logging for code-index evidence.""" from __future__ import annotations import json from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List def build_query_plan(task: Dict[str, Any], complexity: Dict[str, Any]) -> Dict[str, Any]: """Create a deterministic investigation plan from task and repo metadata.""" repo_id = str(task.get("repo_id") or task.get("repo_path") or "repo") level = str(task.get("curriculum_level", "easy")) query = str(task.get("query", "Explain this codebase.")) goals = [ "collect indexed code evidence", "identify architectural or control-flow entry points", "capture evidence paths for rubric checks", ] calls: List[Dict[str, Any]] = [ { "type": "semantic_search", "repo_id": repo_id, "query": query, "purpose": "Retrieve high-signal chunks for the visible task.", } ] if level in {"medium", "hard"}: # code_indexing_mcp exposes `search_code_graph`, not raw Cypher. Use keyword search # for structural/cross-file terms; optional Neo4j Cypher MCP is separate. calls.append( { "type": "graph_search", "searchType": "keyword", "query": "import class def", "purpose": "Keyword search over indexed CodeChunk fulltext (graph-adjacent discovery).", } ) if level == "hard": calls.append( { "type": "semantic_search", "repo_id": repo_id, "query": "Find the main modules, service boundaries, and call relationships.", "purpose": "Collect architecture-level evidence.", } ) return { "repo_id": repo_id, "curriculum_level": level, "visible_query": query, "complexity_band": complexity.get("difficulty_band"), "goals": goals, "calls": calls, } def apply_neo4j_repo_id(plan: Dict[str, Any], neo4j_repo_id: str) -> None: """Patch planned calls to use the Neo4j `repo_id` from ingest (sha256 of realpath).""" for call in plan.get("calls", []): if call.get("type") in ("semantic_search", "graph_search"): call["repo_id"] = neo4j_repo_id def write_query_plan(plan: Dict[str, Any], path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(plan, indent=2)) def append_query_log(path: Path, entry: Dict[str, Any]) -> Dict[str, Any]: path.parent.mkdir(parents=True, exist_ok=True) enriched = { "timestamp": datetime.now(timezone.utc).isoformat(), **entry, } with path.open("a") as handle: handle.write(json.dumps(enriched) + "\n") return enriched def summarize_query_logs(logs: Iterable[Dict[str, Any]]) -> Dict[str, Any]: items = list(logs) successes = [item for item in items if item.get("success")] return { "query_count": len(items), "successful_query_count": len(successes), "query_success_rate": len(successes) / max(1, len(items)), }