Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import hashlib | |
| import json | |
| import os | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from db.store import Store | |
| from analyzers.pipeline import AnalyzerPipeline | |
| from parser.ast_parser import parse_python_file | |
| from parser.chunker import chunk_module | |
| from parser.graph_builder import build_edges | |
| from parser.linter import run_linters | |
| from parser.summarizer import summarize_module | |
| from llm.edge_summarizer import EdgeSummarizer, EdgeSummaryInput | |
| _SKIP_DIRS = { | |
| ".git", | |
| "__pycache__", | |
| ".pytest_cache", | |
| ".mypy_cache", | |
| ".venv", | |
| "venv", | |
| "node_modules", | |
| "dist", | |
| "build", | |
| } | |
| def _progress_enabled() -> bool: | |
| return os.getenv("GRAPHREVIEW_PROGRESS", "true").lower() == "true" | |
| def _seed_workers() -> int: | |
| default_workers = min(4, os.cpu_count() or 1) | |
| configured = int(os.getenv("GRAPHREVIEW_SEED_WORKERS", str(default_workers))) | |
| return max(1, configured) | |
| def _iter_python_files(target_dir: Path) -> list[Path]: | |
| max_files = int(os.getenv("GRAPHREVIEW_MAX_FILES", "5000")) | |
| py_files: list[Path] = [] | |
| for path in sorted(target_dir.rglob("*.py")): | |
| if any(part in _SKIP_DIRS for part in path.parts): | |
| continue | |
| py_files.append(path) | |
| if len(py_files) >= max_files: | |
| break | |
| return py_files | |
| def _codebase_hash(target_dir: Path) -> str: | |
| digest = hashlib.sha256() | |
| for path in _iter_python_files(target_dir): | |
| rel = path.relative_to(target_dir).as_posix() | |
| digest.update(rel.encode("utf-8")) | |
| digest.update(path.read_bytes()) | |
| return digest.hexdigest() | |
| def _seed_meta_key(source_root: str) -> str: | |
| return f"seeded:{source_root}" | |
| def _analyze_file(path: Path, target_dir: Path) -> tuple[Path, object, list[object], str, object]: | |
| parsed = parse_python_file(path, target_dir) | |
| issues = run_linters(path) | |
| summary = summarize_module(parsed, issues) | |
| chunk_result = chunk_module(parsed, max_lines=300) | |
| return path, parsed, issues, summary, chunk_result | |
| def _render_progress(prefix: str, current: int, total: int, label: str) -> None: | |
| if total <= 0: | |
| return | |
| width = 24 | |
| ratio = max(0.0, min(1.0, current / total)) | |
| filled = int(width * ratio) | |
| bar = "#" * filled + "-" * (width - filled) | |
| clipped = label if len(label) <= 48 else f"{label[:45]}..." | |
| line = f"{prefix} [{bar}] {current}/{total} {clipped}" | |
| print(f"\r{line:<120}", end="", flush=True) | |
| if current >= total: | |
| print("", flush=True) | |
| def seed_project(target_dir: Path, db_path: str | None = None, force: bool = False) -> dict[str, object]: | |
| target_dir = target_dir.resolve() | |
| store = Store(source_root=str(target_dir), db_path=db_path) | |
| current_hash = _codebase_hash(target_dir) | |
| meta_key = _seed_meta_key(str(target_dir)) | |
| existing_raw = store.get_meta(meta_key) | |
| existing = json.loads(existing_raw) if existing_raw else {} | |
| if ( | |
| not force | |
| and store.has_nodes() | |
| and existing.get("codebase_hash") == current_hash | |
| and existing.get("seeded") is True | |
| ): | |
| return { | |
| "seeded": True, | |
| "loaded_from_cache": True, | |
| "codebase_hash": current_hash, | |
| "node_count": int(existing.get("node_count", 0)), | |
| "edge_count": int(existing.get("edge_count", 0)), | |
| } | |
| store.clear_source_graph() | |
| py_files = _iter_python_files(target_dir) | |
| total_files = len(py_files) | |
| analysis_by_path: dict[Path, tuple[object, list[object], str, object]] = {} | |
| workers = _seed_workers() | |
| if _progress_enabled(): | |
| print(f"[SEED] Analyzing {total_files} files with {workers} workers...", flush=True) | |
| with ThreadPoolExecutor(max_workers=workers) as pool: | |
| futures = [pool.submit(_analyze_file, path, target_dir) for path in py_files] | |
| for idx, future in enumerate(as_completed(futures), start=1): | |
| path, parsed, issues, summary, chunk_result = future.result() | |
| analysis_by_path[path] = (parsed, issues, summary, chunk_result) | |
| if _progress_enabled(): | |
| rel = path.relative_to(target_dir).as_posix() | |
| _render_progress("[SEED] analyzed", idx, total_files, rel) | |
| parsed_modules = [analysis_by_path[path][0] for path in py_files] | |
| module_ids = {parsed.module_id for parsed in parsed_modules} | |
| chunk_ids_by_parent: dict[str, set[str]] = {} | |
| for idx, path in enumerate(py_files, start=1): | |
| parsed, issues, summary, chunk_result = analysis_by_path[path] | |
| linter_flags = json.dumps([issue.model_dump() for issue in issues]) | |
| parent = chunk_result.parent | |
| if _progress_enabled(): | |
| rel = path.relative_to(target_dir).as_posix() | |
| _render_progress("[SEED] storing", idx, total_files, rel) | |
| store.upsert_node( | |
| module_id=parent.module_id, | |
| name=parent.name, | |
| raw_code=parent.code, | |
| ast_summary=summary, | |
| summary=summary, | |
| linter_flags=linter_flags, | |
| dependency_reason="Imports and symbol usage captured from AST", | |
| parent_module_id=parent.parent_module_id, | |
| is_chunk=parent.is_chunk, | |
| ) | |
| if chunk_result.chunks: | |
| chunk_ids_by_parent[parent.module_id] = {chunk.module_id for chunk in chunk_result.chunks} | |
| for chunk in chunk_result.chunks: | |
| chunk_summary = f"Chunk {chunk.name} lines {chunk.start_line}-{chunk.end_line}" | |
| store.upsert_node( | |
| module_id=chunk.module_id, | |
| name=chunk.name, | |
| raw_code=chunk.code, | |
| ast_summary=chunk_summary, | |
| summary=chunk_summary, | |
| linter_flags="[]", | |
| dependency_reason="Top-level class/function chunk", | |
| parent_module_id=chunk.parent_module_id, | |
| is_chunk=chunk.is_chunk, | |
| ) | |
| store.replace_findings_for_module(parsed.module_id, [issue.model_dump() for issue in issues]) | |
| edges = build_edges(parsed_modules, module_ids, chunk_ids_by_parent) | |
| edge_summarizer = EdgeSummarizer() | |
| total_edges = len(edges) | |
| for idx, edge in enumerate(edges, start=1): | |
| if _progress_enabled(): | |
| edge_label = f"{edge.source_module_id} -> {edge.target_module_id}" | |
| _render_progress("[SEED] edges", idx, total_edges, edge_label) | |
| connection_summary = edge_summarizer.summarize( | |
| EdgeSummaryInput( | |
| source_module_id=edge.source_module_id, | |
| target_module_id=edge.target_module_id, | |
| edge_type=edge.edge_type.value, | |
| import_line=edge.import_line, | |
| scope=edge.scope, | |
| ) | |
| ) | |
| store.upsert_edge( | |
| source_module_id=edge.source_module_id, | |
| target_module_id=edge.target_module_id, | |
| edge_type=edge.edge_type, | |
| import_line=edge.import_line, | |
| weight=edge.weight, | |
| connection_summary=connection_summary, | |
| ) | |
| analyzer_pipeline = AnalyzerPipeline(target_dir=target_dir) | |
| analyzer_findings, analyzer_runs = analyzer_pipeline.run_all() | |
| analyzer_findings_by_tool: dict[str, list[dict[str, str | int]]] = {} | |
| for run in analyzer_runs: | |
| created = store.create_analyzer_run( | |
| analyzer=run.analyzer, | |
| analyzer_version=run.analyzer_version, | |
| status=run.status, | |
| findings_count=run.findings, | |
| command=run.command, | |
| command_hash=run.command_hash, | |
| error_message=run.error_message, | |
| ) | |
| run_findings = [ | |
| item | |
| for item in analyzer_findings | |
| if item.analyzer == run.analyzer and item.module_id in module_ids | |
| ] | |
| unique_keys: set[str] = set() | |
| serialized = [ | |
| { | |
| "module_id": item.module_id, | |
| "line": item.line, | |
| "severity": item.severity, | |
| "rule_id": item.rule_id, | |
| "message": item.message, | |
| "evidence": item.evidence, | |
| } | |
| for item in run_findings | |
| if not ( | |
| (key := f"{item.module_id}:{item.line}:{item.rule_id}:{hashlib.sha256(item.message.encode('utf-8')).hexdigest()}") in unique_keys | |
| or unique_keys.add(key) | |
| ) | |
| ] | |
| if serialized: | |
| store.add_analyzer_findings( | |
| analyzer_run_id=int(created.id or 0), | |
| analyzer=run.analyzer, | |
| findings=serialized, | |
| ) | |
| analyzer_findings_by_tool[run.analyzer] = serialized | |
| for tool, findings in analyzer_findings_by_tool.items(): | |
| findings_by_module: dict[str, list[dict[str, str | int]]] = {} | |
| for item in findings: | |
| module = str(item["module_id"]) | |
| findings_by_module.setdefault(module, []).append( | |
| { | |
| "tool": tool, | |
| "line": int(item["line"]), | |
| "severity": str(item["severity"]), | |
| "code": str(item["rule_id"]), | |
| "message": str(item["message"]), | |
| } | |
| ) | |
| for module_id, module_findings in findings_by_module.items(): | |
| store.append_findings_for_module(module_id=module_id, findings=module_findings) | |
| snapshot = store.get_full_graph() | |
| meta_payload = { | |
| "seeded": True, | |
| "seeded_at": datetime.now(UTC).isoformat(), | |
| "codebase_hash": current_hash, | |
| "node_count": len(snapshot.nodes), | |
| "edge_count": len(snapshot.edges), | |
| "deterministic_analyzer_findings": len(analyzer_findings), | |
| } | |
| store.set_meta(meta_key, json.dumps(meta_payload)) | |
| return { | |
| "seeded": True, | |
| "loaded_from_cache": False, | |
| "codebase_hash": current_hash, | |
| "node_count": len(snapshot.nodes), | |
| "edge_count": len(snapshot.edges), | |
| "deterministic_analyzer_findings": len(analyzer_findings), | |
| } | |
| def _build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(description="Seed graph database from Python project") | |
| parser.add_argument("target", help="Path to target codebase") | |
| parser.add_argument("--db-path", default=None, help="Path to SQLite database") | |
| parser.add_argument("--force", action="store_true", help="Force re-parse even if seeded") | |
| return parser | |
| def main() -> None: | |
| args = _build_parser().parse_args() | |
| result = seed_project(Path(args.target), db_path=args.db_path, force=args.force) | |
| print(json.dumps(result, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |