Spaces:
Sleeping
Sleeping
Add Phase 06 Plan for Adaptive Judging and Edge Intelligence; Create initial project outline for GraphReview RL Environment
86c3e08 | from __future__ import annotations | |
| import argparse | |
| import ast | |
| import os | |
| from pathlib import Path | |
| from pydantic import BaseModel | |
| from db.schema import EdgeType | |
| from db.store import Store | |
| from llm.edge_summarizer import EdgeSummarizer, EdgeSummaryInput | |
| from parser.linter import run_linters | |
| from parser.summarizer import summarize_module | |
| _SKIP_DIRS = { | |
| ".git", | |
| "__pycache__", | |
| ".pytest_cache", | |
| ".mypy_cache", | |
| ".venv", | |
| "venv", | |
| "node_modules", | |
| "dist", | |
| "build", | |
| } | |
| def _iter_python_files(target_dir: Path) -> list[Path]: | |
| max_files = int(os.getenv("GRAPHREVIEW_MAX_FILES", "5000")) | |
| py_files: list[Path] = [] | |
| for path in sorted(target_dir.rglob("*.py")): | |
| if any(part in _SKIP_DIRS for part in path.parts): | |
| continue | |
| py_files.append(path) | |
| if len(py_files) >= max_files: | |
| break | |
| return py_files | |
| class ImportRef(BaseModel): | |
| target_module: str | |
| import_line: str | |
| scope: str = "module_level" | |
| weight: float = 1.0 | |
| edge_type: EdgeType = EdgeType.EXPLICIT_IMPORT | |
| class ParsedModule(BaseModel): | |
| module_id: str | |
| raw_code: str | |
| function_signatures: list[str] | |
| classes: list[str] | |
| imports: list[ImportRef] | |
| constants: list[str] | |
| dependencies: list[str] | |
| class _Visitor(ast.NodeVisitor): | |
| def __init__(self) -> None: | |
| self.function_signatures: list[str] = [] | |
| self.classes: list[str] = [] | |
| self.constants: list[str] = [] | |
| self.imports: list[tuple[str, str, str]] = [] | |
| self._scope_stack: list[str] = [] | |
| def _scope(self) -> str: | |
| return "function_level" if self._scope_stack else "module_level" | |
| def visit_FunctionDef(self, node: ast.FunctionDef) -> None: | |
| args: list[str] = [] | |
| for arg in node.args.args: | |
| if arg.annotation is not None: | |
| args.append(f"{arg.arg}: {ast.unparse(arg.annotation)}") | |
| else: | |
| args.append(arg.arg) | |
| returns = ast.unparse(node.returns) if node.returns is not None else "None" | |
| self.function_signatures.append(f"{node.name}({', '.join(args)})->{returns}") | |
| self._scope_stack.append(node.name) | |
| try: | |
| self.generic_visit(node) | |
| finally: | |
| self._scope_stack.pop() | |
| def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: | |
| fake = ast.FunctionDef( | |
| name=node.name, | |
| args=node.args, | |
| body=node.body, | |
| decorator_list=node.decorator_list, | |
| returns=node.returns, | |
| type_comment=node.type_comment, | |
| ) | |
| self.visit_FunctionDef(fake) | |
| def visit_ClassDef(self, node: ast.ClassDef) -> None: | |
| self.classes.append(node.name) | |
| self._scope_stack.append(node.name) | |
| try: | |
| self.generic_visit(node) | |
| finally: | |
| self._scope_stack.pop() | |
| def visit_Import(self, node: ast.Import) -> None: | |
| line = ast.get_source_segment(self._source, node) or "import" | |
| for alias in node.names: | |
| self.imports.append((alias.name, line, self._scope)) | |
| def visit_ImportFrom(self, node: ast.ImportFrom) -> None: | |
| module = node.module or "" | |
| level = node.level or 0 | |
| dotted = "." * level + module | |
| line = ast.get_source_segment(self._source, node) or "from" | |
| # Keep the base module edge and add candidate submodule edges. | |
| # Non-module symbols are filtered later against known module ids. | |
| if dotted: | |
| self.imports.append((dotted, line, self._scope)) | |
| for alias in node.names: | |
| if alias.name == "*": | |
| continue | |
| if dotted.endswith("."): | |
| candidate = f"{dotted}{alias.name}" | |
| else: | |
| candidate = f"{dotted}.{alias.name}" if dotted else alias.name | |
| self.imports.append((candidate, line, self._scope)) | |
| def visit_Assign(self, node: ast.Assign) -> None: | |
| if isinstance(node.value, ast.Constant): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name) and target.id.isupper(): | |
| self.constants.append(target.id) | |
| self.generic_visit(node) | |
| def parse(self, tree: ast.AST, source: str) -> None: | |
| self._source = source | |
| self.visit(tree) | |
| def _to_module_id(path: Path, root: Path) -> str: | |
| rel = path.resolve().relative_to(root.resolve()) | |
| return str(rel.with_suffix("")).replace("/", ".") | |
| def _resolve_relative_import(current_module: str, ref: str) -> str: | |
| if not ref.startswith("."): | |
| return ref | |
| dots = len(ref) - len(ref.lstrip(".")) | |
| suffix = ref.lstrip(".") | |
| parts = current_module.split(".") | |
| base = parts[:-dots] if dots <= len(parts) else [] | |
| if suffix: | |
| base.append(suffix) | |
| return ".".join(part for part in base if part) | |
| def parse_python_file(path: Path, root_dir: Path) -> ParsedModule: | |
| source = path.read_text(encoding="utf-8") | |
| module_id = _to_module_id(path, root_dir) | |
| try: | |
| tree = ast.parse(source) | |
| except SyntaxError: | |
| return ParsedModule( | |
| module_id=module_id, | |
| raw_code=source, | |
| function_signatures=[], | |
| classes=[], | |
| imports=[], | |
| constants=[], | |
| dependencies=[], | |
| ) | |
| visitor = _Visitor() | |
| visitor.parse(tree, source) | |
| imports = [ | |
| ImportRef( | |
| target_module=_resolve_relative_import(module_id, name), | |
| import_line=line, | |
| scope=scope, | |
| weight=0.5 if scope == "function_level" else 1.0, | |
| edge_type=EdgeType.EXPLICIT_IMPORT, | |
| ) | |
| for name, line, scope in visitor.imports | |
| ] | |
| dependencies = [imp.target_module for imp in imports if imp.target_module] | |
| return ParsedModule( | |
| module_id=module_id, | |
| raw_code=source, | |
| function_signatures=visitor.function_signatures, | |
| classes=visitor.classes, | |
| imports=imports, | |
| constants=visitor.constants, | |
| dependencies=dependencies, | |
| ) | |
| def parse_directory(target_dir: Path, db_path: str | None = None) -> Store: | |
| target_dir = target_dir.resolve() | |
| store = Store(source_root=str(target_dir), db_path=db_path) | |
| store.clear_source_graph() | |
| py_files = _iter_python_files(target_dir) | |
| parsed_modules = [parse_python_file(py_file, target_dir) for py_file in py_files] | |
| known_module_ids = {parsed.module_id for parsed in parsed_modules} | |
| edge_summarizer = EdgeSummarizer() | |
| for py_file, parsed in zip(py_files, parsed_modules): | |
| issues = run_linters(py_file) | |
| summary = summarize_module(parsed, issues) | |
| dep_reason = "Imports used by module-level and callable logic" | |
| store.upsert_node( | |
| module_id=parsed.module_id, | |
| raw_code=parsed.raw_code, | |
| ast_summary=summary, | |
| dependency_reason=dep_reason, | |
| ) | |
| store.replace_findings_for_module( | |
| parsed.module_id, | |
| [issue.model_dump() for issue in issues], | |
| ) | |
| for imported in parsed.imports: | |
| if imported.target_module and imported.target_module in known_module_ids: | |
| connection_summary = edge_summarizer.summarize( | |
| EdgeSummaryInput( | |
| source_module_id=parsed.module_id, | |
| target_module_id=imported.target_module, | |
| edge_type=imported.edge_type.value, | |
| import_line=imported.import_line, | |
| scope=imported.scope, | |
| ) | |
| ) | |
| store.upsert_edge( | |
| source_module_id=parsed.module_id, | |
| target_module_id=imported.target_module, | |
| edge_type=imported.edge_type, | |
| import_line=imported.import_line, | |
| weight=imported.weight, | |
| connection_summary=connection_summary, | |
| ) | |
| return store | |
| def _build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(description="Parse Python codebase into SQLite graph") | |
| parser.add_argument("target", help="Path to target codebase") | |
| parser.add_argument("--db-path", default=None, help="Path to SQLite database") | |
| return parser | |
| def main() -> None: | |
| args = _build_parser().parse_args() | |
| target_dir = Path(args.target) | |
| store = parse_directory(target_dir=target_dir, db_path=args.db_path) | |
| snapshot = store.get_full_graph() | |
| print( | |
| f"Populated DB for {target_dir} with " | |
| f"{len(snapshot.nodes)} nodes and {len(snapshot.edges)} edges" | |
| ) | |
| if __name__ == "__main__": | |
| main() | |