Spaces:
Running
Running
| """ | |
| ast_parser/cache.py | |
| ββββββββββββββββββββ | |
| Per-repo AST and graph caching layer. | |
| Cache strategy: | |
| - Key: (repo_name, repo_commit_sha) | |
| - Value: {file_path: FileSymbols JSON} + graph adjacency JSON | |
| - Backend: diskcache (local) β zero external dependencies | |
| On cache hit: skip all Tree-sitter parsing and graph construction. | |
| On cache miss: parse all files, build graph, write to cache. | |
| For a 500-file repo, this takes parsing from ~8s β ~0ms on repeat runs. | |
| Cache invalidation: | |
| - Individual file: SHA-256 of file content differs from cached hash | |
| - Full repo: commit SHA changed (new cache entry created) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional | |
| from ast_parser.python_parser import FileSymbols | |
| from ast_parser.dependency_graph import RepoDependencyGraph, graph_to_dict, graph_from_dict | |
| logger = logging.getLogger(__name__) | |
| class ASTCache: | |
| """ | |
| Disk-backed cache for AST parse results and dependency graphs. | |
| Uses diskcache if available, falls back to raw JSON files. | |
| """ | |
| def __init__(self, cache_dir: Path): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| self._dc = None | |
| self._try_init_diskcache() | |
| def _try_init_diskcache(self) -> None: | |
| try: | |
| import diskcache | |
| self._dc = diskcache.Cache(str(self.cache_dir / "diskcache")) | |
| logger.debug("ASTCache: using diskcache backend") | |
| except ImportError: | |
| logger.debug("ASTCache: diskcache not available, using JSON files") | |
| # ββ FileSymbols cache βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_file_symbols(self, repo_key: str, file_path: str) -> Optional[FileSymbols]: | |
| """Return cached FileSymbols or None if not cached / stale.""" | |
| key = f"symbols:{repo_key}:{file_path}" | |
| raw = self._get(key) | |
| if raw is None: | |
| return None | |
| try: | |
| return FileSymbols.from_dict(json.loads(raw)) | |
| except (json.JSONDecodeError, KeyError) as e: | |
| logger.debug("Cache decode error for %s: %s", key, e) | |
| return None | |
| def set_file_symbols(self, repo_key: str, fs: FileSymbols) -> None: | |
| key = f"symbols:{repo_key}:{fs.file_path}" | |
| self._set(key, json.dumps(fs.to_dict())) | |
| def get_all_file_symbols(self, repo_key: str) -> Optional[list[FileSymbols]]: | |
| """Return all cached FileSymbols for a repo or None.""" | |
| key = f"all_symbols:{repo_key}" | |
| raw = self._get(key) | |
| if raw is None: | |
| return None | |
| try: | |
| data = json.loads(raw) | |
| return [FileSymbols.from_dict(d) for d in data] | |
| except Exception as e: | |
| logger.debug("Cache decode error for all_symbols: %s", e) | |
| return None | |
| def set_all_file_symbols(self, repo_key: str, symbols: list[FileSymbols]) -> None: | |
| key = f"all_symbols:{repo_key}" | |
| self._set(key, json.dumps([fs.to_dict() for fs in symbols])) | |
| # ββ Graph cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_graph(self, repo_key: str) -> Optional[RepoDependencyGraph]: | |
| """Return cached dependency graph or None.""" | |
| key = f"graph:{repo_key}" | |
| raw = self._get(key) | |
| if raw is None: | |
| return None | |
| try: | |
| return graph_from_dict(json.loads(raw)) | |
| except Exception as e: | |
| logger.debug("Graph cache decode error: %s", e) | |
| return None | |
| def set_graph(self, repo_key: str, graph: RepoDependencyGraph) -> None: | |
| key = f"graph:{repo_key}" | |
| self._set(key, json.dumps(graph_to_dict(graph))) | |
| # ββ Combined: parse + cache a whole repo ββββββββββββββββββββββββββββββββββ | |
| def get_or_parse_repo( | |
| self, | |
| repo_root: Path, | |
| repo_key: str, | |
| force_reparse: bool = False, | |
| ) -> tuple[list[FileSymbols], RepoDependencyGraph]: | |
| """ | |
| High-level entry point: returns (symbols, graph) from cache or parses fresh. | |
| Args: | |
| repo_root: path to the cloned repository | |
| repo_key: unique key e.g. 'django__django_abc1234' (repo + commit) | |
| force_reparse: bypass cache entirely | |
| Returns: | |
| (file_symbols_list, dependency_graph) | |
| """ | |
| if not force_reparse: | |
| cached_symbols = self.get_all_file_symbols(repo_key) | |
| cached_graph = self.get_graph(repo_key) | |
| if cached_symbols is not None and cached_graph is not None: | |
| logger.info( | |
| "Cache HIT for %s β %d files, %d graph nodes", | |
| repo_key, len(cached_symbols), cached_graph.graph.number_of_nodes() | |
| ) | |
| return cached_symbols, cached_graph | |
| logger.info("Cache MISS for %s β parsing repo from scratch", repo_key) | |
| # Parse all files | |
| from ast_parser.python_parser import PythonASTParser | |
| parser = PythonASTParser() | |
| symbols = list(parser.parse_repo(repo_root)) | |
| # Build graph | |
| graph = RepoDependencyGraph() | |
| graph.build(symbols, repo_root) | |
| # Write to cache | |
| self.set_all_file_symbols(repo_key, symbols) | |
| self.set_graph(repo_key, graph) | |
| logger.info( | |
| "Cached %d file symbols + graph (%d nodes) for %s", | |
| len(symbols), graph.graph.number_of_nodes(), repo_key | |
| ) | |
| return symbols, graph | |
| # ββ Backend helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get(self, key: str) -> Optional[str]: | |
| if self._dc is not None: | |
| return self._dc.get(key) | |
| # Fallback: JSON file | |
| p = self._json_path(key) | |
| if p.exists(): | |
| return p.read_text() | |
| return None | |
| def _set(self, key: str, value: str) -> None: | |
| if self._dc is not None: | |
| self._dc.set(key, value) | |
| else: | |
| p = self._json_path(key) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(value) | |
| def _json_path(self, key: str) -> Path: | |
| """Convert cache key to a safe filesystem path.""" | |
| safe = key.replace(":", "_").replace("/", "_").replace("\\", "_") | |
| return self.cache_dir / "json_cache" / f"{safe}.json" | |
| def invalidate_repo(self, repo_key: str) -> None: | |
| """Remove all cached data for a repo.""" | |
| for prefix in ("all_symbols", "graph"): | |
| key = f"{prefix}:{repo_key}" | |
| if self._dc is not None: | |
| self._dc.delete(key) | |
| else: | |
| p = self._json_path(key) | |
| if p.exists(): | |
| p.unlink() | |
| logger.info("Cache invalidated for %s", repo_key) | |