neurocaster-env / server /query_planner.py
NishithP2004's picture
Upload folder using huggingface_hub
9c75f36 verified
"""Deterministic query planning and audit logging for code-index evidence."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List
def build_query_plan(task: Dict[str, Any], complexity: Dict[str, Any]) -> Dict[str, Any]:
"""Create a deterministic investigation plan from task and repo metadata."""
repo_id = str(task.get("repo_id") or task.get("repo_path") or "repo")
level = str(task.get("curriculum_level", "easy"))
query = str(task.get("query", "Explain this codebase."))
goals = [
"collect indexed code evidence",
"identify architectural or control-flow entry points",
"capture evidence paths for rubric checks",
]
calls: List[Dict[str, Any]] = [
{
"type": "semantic_search",
"repo_id": repo_id,
"query": query,
"purpose": "Retrieve high-signal chunks for the visible task.",
}
]
if level in {"medium", "hard"}:
# code_indexing_mcp exposes `search_code_graph`, not raw Cypher. Use keyword search
# for structural/cross-file terms; optional Neo4j Cypher MCP is separate.
calls.append(
{
"type": "graph_search",
"searchType": "keyword",
"query": "import class def",
"purpose": "Keyword search over indexed CodeChunk fulltext (graph-adjacent discovery).",
}
)
if level == "hard":
calls.append(
{
"type": "semantic_search",
"repo_id": repo_id,
"query": "Find the main modules, service boundaries, and call relationships.",
"purpose": "Collect architecture-level evidence.",
}
)
return {
"repo_id": repo_id,
"curriculum_level": level,
"visible_query": query,
"complexity_band": complexity.get("difficulty_band"),
"goals": goals,
"calls": calls,
}
def apply_neo4j_repo_id(plan: Dict[str, Any], neo4j_repo_id: str) -> None:
"""Patch planned calls to use the Neo4j `repo_id` from ingest (sha256 of realpath)."""
for call in plan.get("calls", []):
if call.get("type") in ("semantic_search", "graph_search"):
call["repo_id"] = neo4j_repo_id
def write_query_plan(plan: Dict[str, Any], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(plan, indent=2))
def append_query_log(path: Path, entry: Dict[str, Any]) -> Dict[str, Any]:
path.parent.mkdir(parents=True, exist_ok=True)
enriched = {
"timestamp": datetime.now(timezone.utc).isoformat(),
**entry,
}
with path.open("a") as handle:
handle.write(json.dumps(enriched) + "\n")
return enriched
def summarize_query_logs(logs: Iterable[Dict[str, Any]]) -> Dict[str, Any]:
items = list(logs)
successes = [item for item in items if item.get("success")]
return {
"query_count": len(items),
"successful_query_count": len(successes),
"query_success_rate": len(successes) / max(1, len(items)),
}