SouravNath's picture
Initial commit
dc71cad
"""
ast_parser/cache.py
────────────────────
Per-repo AST and graph caching layer.
Cache strategy:
- Key: (repo_name, repo_commit_sha)
- Value: {file_path: FileSymbols JSON} + graph adjacency JSON
- Backend: diskcache (local) β€” zero external dependencies
On cache hit: skip all Tree-sitter parsing and graph construction.
On cache miss: parse all files, build graph, write to cache.
For a 500-file repo, this takes parsing from ~8s β†’ ~0ms on repeat runs.
Cache invalidation:
- Individual file: SHA-256 of file content differs from cached hash
- Full repo: commit SHA changed (new cache entry created)
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Optional
from ast_parser.python_parser import FileSymbols
from ast_parser.dependency_graph import RepoDependencyGraph, graph_to_dict, graph_from_dict
logger = logging.getLogger(__name__)
class ASTCache:
"""
Disk-backed cache for AST parse results and dependency graphs.
Uses diskcache if available, falls back to raw JSON files.
"""
def __init__(self, cache_dir: Path):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._dc = None
self._try_init_diskcache()
def _try_init_diskcache(self) -> None:
try:
import diskcache
self._dc = diskcache.Cache(str(self.cache_dir / "diskcache"))
logger.debug("ASTCache: using diskcache backend")
except ImportError:
logger.debug("ASTCache: diskcache not available, using JSON files")
# ── FileSymbols cache ─────────────────────────────────────────────────────
def get_file_symbols(self, repo_key: str, file_path: str) -> Optional[FileSymbols]:
"""Return cached FileSymbols or None if not cached / stale."""
key = f"symbols:{repo_key}:{file_path}"
raw = self._get(key)
if raw is None:
return None
try:
return FileSymbols.from_dict(json.loads(raw))
except (json.JSONDecodeError, KeyError) as e:
logger.debug("Cache decode error for %s: %s", key, e)
return None
def set_file_symbols(self, repo_key: str, fs: FileSymbols) -> None:
key = f"symbols:{repo_key}:{fs.file_path}"
self._set(key, json.dumps(fs.to_dict()))
def get_all_file_symbols(self, repo_key: str) -> Optional[list[FileSymbols]]:
"""Return all cached FileSymbols for a repo or None."""
key = f"all_symbols:{repo_key}"
raw = self._get(key)
if raw is None:
return None
try:
data = json.loads(raw)
return [FileSymbols.from_dict(d) for d in data]
except Exception as e:
logger.debug("Cache decode error for all_symbols: %s", e)
return None
def set_all_file_symbols(self, repo_key: str, symbols: list[FileSymbols]) -> None:
key = f"all_symbols:{repo_key}"
self._set(key, json.dumps([fs.to_dict() for fs in symbols]))
# ── Graph cache ───────────────────────────────────────────────────────────
def get_graph(self, repo_key: str) -> Optional[RepoDependencyGraph]:
"""Return cached dependency graph or None."""
key = f"graph:{repo_key}"
raw = self._get(key)
if raw is None:
return None
try:
return graph_from_dict(json.loads(raw))
except Exception as e:
logger.debug("Graph cache decode error: %s", e)
return None
def set_graph(self, repo_key: str, graph: RepoDependencyGraph) -> None:
key = f"graph:{repo_key}"
self._set(key, json.dumps(graph_to_dict(graph)))
# ── Combined: parse + cache a whole repo ──────────────────────────────────
def get_or_parse_repo(
self,
repo_root: Path,
repo_key: str,
force_reparse: bool = False,
) -> tuple[list[FileSymbols], RepoDependencyGraph]:
"""
High-level entry point: returns (symbols, graph) from cache or parses fresh.
Args:
repo_root: path to the cloned repository
repo_key: unique key e.g. 'django__django_abc1234' (repo + commit)
force_reparse: bypass cache entirely
Returns:
(file_symbols_list, dependency_graph)
"""
if not force_reparse:
cached_symbols = self.get_all_file_symbols(repo_key)
cached_graph = self.get_graph(repo_key)
if cached_symbols is not None and cached_graph is not None:
logger.info(
"Cache HIT for %s β€” %d files, %d graph nodes",
repo_key, len(cached_symbols), cached_graph.graph.number_of_nodes()
)
return cached_symbols, cached_graph
logger.info("Cache MISS for %s β€” parsing repo from scratch", repo_key)
# Parse all files
from ast_parser.python_parser import PythonASTParser
parser = PythonASTParser()
symbols = list(parser.parse_repo(repo_root))
# Build graph
graph = RepoDependencyGraph()
graph.build(symbols, repo_root)
# Write to cache
self.set_all_file_symbols(repo_key, symbols)
self.set_graph(repo_key, graph)
logger.info(
"Cached %d file symbols + graph (%d nodes) for %s",
len(symbols), graph.graph.number_of_nodes(), repo_key
)
return symbols, graph
# ── Backend helpers ───────────────────────────────────────────────────────
def _get(self, key: str) -> Optional[str]:
if self._dc is not None:
return self._dc.get(key)
# Fallback: JSON file
p = self._json_path(key)
if p.exists():
return p.read_text()
return None
def _set(self, key: str, value: str) -> None:
if self._dc is not None:
self._dc.set(key, value)
else:
p = self._json_path(key)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(value)
def _json_path(self, key: str) -> Path:
"""Convert cache key to a safe filesystem path."""
safe = key.replace(":", "_").replace("/", "_").replace("\\", "_")
return self.cache_dir / "json_cache" / f"{safe}.json"
def invalidate_repo(self, repo_key: str) -> None:
"""Remove all cached data for a repo."""
for prefix in ("all_symbols", "graph"):
key = f"{prefix}:{repo_key}"
if self._dc is not None:
self._dc.delete(key)
else:
p = self._json_path(key)
if p.exists():
p.unlink()
logger.info("Cache invalidated for %s", repo_key)