| """Deterministic query planning and audit logging for code-index evidence.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List |
|
|
|
|
| def build_query_plan(task: Dict[str, Any], complexity: Dict[str, Any]) -> Dict[str, Any]: |
| """Create a deterministic investigation plan from task and repo metadata.""" |
|
|
| repo_id = str(task.get("repo_id") or task.get("repo_path") or "repo") |
| level = str(task.get("curriculum_level", "easy")) |
| query = str(task.get("query", "Explain this codebase.")) |
| goals = [ |
| "collect indexed code evidence", |
| "identify architectural or control-flow entry points", |
| "capture evidence paths for rubric checks", |
| ] |
| calls: List[Dict[str, Any]] = [ |
| { |
| "type": "semantic_search", |
| "repo_id": repo_id, |
| "query": query, |
| "purpose": "Retrieve high-signal chunks for the visible task.", |
| } |
| ] |
| if level in {"medium", "hard"}: |
| |
| |
| calls.append( |
| { |
| "type": "graph_search", |
| "searchType": "keyword", |
| "query": "import class def", |
| "purpose": "Keyword search over indexed CodeChunk fulltext (graph-adjacent discovery).", |
| } |
| ) |
| if level == "hard": |
| calls.append( |
| { |
| "type": "semantic_search", |
| "repo_id": repo_id, |
| "query": "Find the main modules, service boundaries, and call relationships.", |
| "purpose": "Collect architecture-level evidence.", |
| } |
| ) |
|
|
| return { |
| "repo_id": repo_id, |
| "curriculum_level": level, |
| "visible_query": query, |
| "complexity_band": complexity.get("difficulty_band"), |
| "goals": goals, |
| "calls": calls, |
| } |
|
|
|
|
| def apply_neo4j_repo_id(plan: Dict[str, Any], neo4j_repo_id: str) -> None: |
| """Patch planned calls to use the Neo4j `repo_id` from ingest (sha256 of realpath).""" |
| for call in plan.get("calls", []): |
| if call.get("type") in ("semantic_search", "graph_search"): |
| call["repo_id"] = neo4j_repo_id |
|
|
|
|
| def write_query_plan(plan: Dict[str, Any], path: Path) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(plan, indent=2)) |
|
|
|
|
| def append_query_log(path: Path, entry: Dict[str, Any]) -> Dict[str, Any]: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| enriched = { |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| **entry, |
| } |
| with path.open("a") as handle: |
| handle.write(json.dumps(enriched) + "\n") |
| return enriched |
|
|
|
|
| def summarize_query_logs(logs: Iterable[Dict[str, Any]]) -> Dict[str, Any]: |
| items = list(logs) |
| successes = [item for item in items if item.get("success")] |
| return { |
| "query_count": len(items), |
| "successful_query_count": len(successes), |
| "query_success_rate": len(successes) / max(1, len(items)), |
| } |
|
|