Spaces:
Sleeping
Sleeping
| """Parse a Python repository (directory tree) into a KnowledgeGraph. | |
| Usage | |
| ----- | |
| from graphforge.repo_parser import parse_repo | |
| kg = parse_repo("/path/to/my_package") | |
| What it extracts | |
| ---------------- | |
| Nodes : repo, package, module, class, function, method | |
| Edges : contains, calls (same-file), imports, inherits | |
| Cross-file call resolution is best-effort: if function A in file X calls | |
| function B and B appears anywhere in the graph, an edge is added. | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from graphforge.knowledge_graph import KGEdge, KGNode, KnowledgeGraph | |
| # ββ helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _node_id(node_type: str, file_path: str, *names: str) -> str: | |
| parts = [node_type, file_path] + list(names) | |
| return ":".join(p for p in parts if p) | |
| def _sig(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str: | |
| args = [] | |
| for arg in node.args.args: | |
| ann = f": {ast.unparse(arg.annotation)}" if arg.annotation else "" | |
| args.append(f"{arg.arg}{ann}") | |
| ret = f" -> {ast.unparse(node.returns)}" if node.returns else "" | |
| return f"({', '.join(args)}){ret}" | |
| def _source_slice(source_lines: list[str], start: int, end: int) -> str: | |
| """1-indexed, inclusive.""" | |
| return "\n".join(source_lines[start - 1 : end]) | |
| def _direct_calls(func_node: ast.FunctionDef | ast.AsyncFunctionDef) -> set[str]: | |
| """Collect names of directly called functions (Name-style calls only).""" | |
| calls: set[str] = set() | |
| for node in ast.walk(func_node): | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): | |
| calls.add(node.func.id) | |
| return calls | |
| # ββ single-file parser ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_file( | |
| file_path: str, # relative to repo root | |
| abs_path: str, | |
| kg: KnowledgeGraph, | |
| parent_id: str, | |
| ) -> None: | |
| try: | |
| source = Path(abs_path).read_text(encoding="utf-8", errors="replace") | |
| except Exception: | |
| return | |
| try: | |
| tree = ast.parse(source, filename=abs_path) | |
| except SyntaxError: | |
| return | |
| lines = source.splitlines() | |
| mod_id = _node_id("module", file_path) | |
| # Module node | |
| mod_doc = ast.get_docstring(tree) or "" | |
| kg.add_node(KGNode( | |
| node_id=mod_id, | |
| node_type="module", | |
| name=Path(file_path).stem, | |
| file_path=file_path, | |
| line_start=1, | |
| line_end=len(lines), | |
| source=source, | |
| docstring=mod_doc, | |
| )) | |
| kg.add_edge(KGEdge("contains", parent_id, mod_id)) | |
| # Import edges (resolve module names) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| imp_id = _node_id("module", alias.name.replace(".", "/") + ".py") | |
| kg.add_edge(KGEdge("imports", mod_id, imp_id)) | |
| elif isinstance(node, ast.ImportFrom) and node.module: | |
| imp_id = _node_id("module", node.module.replace(".", "/") + ".py") | |
| kg.add_edge(KGEdge("imports", mod_id, imp_id)) | |
| # Top-level classes and functions | |
| func_name_to_id: dict[str, str] = {} # for call resolution within file | |
| for stmt in tree.body: | |
| if isinstance(stmt, ast.ClassDef): | |
| _parse_class(stmt, file_path, lines, kg, mod_id, func_name_to_id) | |
| elif isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| _parse_function(stmt, file_path, lines, kg, mod_id, func_name_to_id) | |
| # Same-file call edges | |
| _resolve_calls(func_name_to_id, kg) | |
| def _parse_class( | |
| cls_node: ast.ClassDef, | |
| file_path: str, | |
| lines: list[str], | |
| kg: KnowledgeGraph, | |
| parent_id: str, | |
| func_name_to_id: dict[str, str], | |
| ) -> None: | |
| cls_id = _node_id("class", file_path, cls_node.name) | |
| doc = ast.get_docstring(cls_node) or "" | |
| kg.add_node(KGNode( | |
| node_id=cls_id, | |
| node_type="class", | |
| name=cls_node.name, | |
| file_path=file_path, | |
| line_start=cls_node.lineno, | |
| line_end=cls_node.end_lineno, | |
| source=_source_slice(lines, cls_node.lineno, cls_node.end_lineno), | |
| docstring=doc, | |
| )) | |
| kg.add_edge(KGEdge("contains", parent_id, cls_id)) | |
| # Inheritance edges | |
| for base in cls_node.bases: | |
| if isinstance(base, ast.Name): | |
| base_id = _node_id("class", file_path, base.id) | |
| kg.add_edge(KGEdge("inherits", cls_id, base_id)) | |
| # Methods | |
| for item in cls_node.body: | |
| if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| _parse_method(item, file_path, lines, kg, cls_id, cls_node.name, func_name_to_id) | |
| def _parse_function( | |
| fn: ast.FunctionDef | ast.AsyncFunctionDef, | |
| file_path: str, | |
| lines: list[str], | |
| kg: KnowledgeGraph, | |
| parent_id: str, | |
| func_name_to_id: dict[str, str], | |
| ) -> None: | |
| fn_id = _node_id("function", file_path, fn.name) | |
| doc = ast.get_docstring(fn) or "" | |
| kg.add_node(KGNode( | |
| node_id=fn_id, | |
| node_type="function", | |
| name=fn.name, | |
| file_path=file_path, | |
| line_start=fn.lineno, | |
| line_end=fn.end_lineno, | |
| source=_source_slice(lines, fn.lineno, fn.end_lineno), | |
| docstring=doc, | |
| metadata={"signature": _sig(fn), "calls": list(_direct_calls(fn))}, | |
| )) | |
| kg.add_edge(KGEdge("contains", parent_id, fn_id)) | |
| func_name_to_id[fn.name] = fn_id | |
| def _parse_method( | |
| fn: ast.FunctionDef | ast.AsyncFunctionDef, | |
| file_path: str, | |
| lines: list[str], | |
| kg: KnowledgeGraph, | |
| parent_id: str, | |
| class_name: str, | |
| func_name_to_id: dict[str, str], | |
| ) -> None: | |
| method_id = _node_id("method", file_path, class_name, fn.name) | |
| doc = ast.get_docstring(fn) or "" | |
| kg.add_node(KGNode( | |
| node_id=method_id, | |
| node_type="method", | |
| name=fn.name, | |
| file_path=file_path, | |
| line_start=fn.lineno, | |
| line_end=fn.end_lineno, | |
| source=_source_slice(lines, fn.lineno, fn.end_lineno), | |
| docstring=doc, | |
| metadata={"signature": _sig(fn), "calls": list(_direct_calls(fn))}, | |
| )) | |
| kg.add_edge(KGEdge("contains", parent_id, method_id)) | |
| # register under unqualified name too for call resolution | |
| func_name_to_id[fn.name] = method_id | |
| def _resolve_calls(func_name_to_id: dict[str, str], kg: KnowledgeGraph) -> None: | |
| """Add calls edges based on direct-call names collected during parse.""" | |
| for fn_id, node in [(nid, n) for nid, n in kg._nodes.items() | |
| if n.node_type in ("function", "method")]: | |
| calls: list[str] = node.metadata.get("calls", []) | |
| for callee_name in calls: | |
| if callee_name in func_name_to_id: | |
| callee_id = func_name_to_id[callee_name] | |
| if callee_id != fn_id: | |
| kg.add_edge(KGEdge("calls", fn_id, callee_id)) | |
| # ββ repo walker βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_repo(repo_path: str, exclude_dirs: set[str] | None = None) -> KnowledgeGraph: | |
| """Walk repo_path recursively and return a KnowledgeGraph. | |
| Parameters | |
| ---------- | |
| repo_path : str | |
| Absolute or relative path to the root of the repo. | |
| exclude_dirs : set[str], optional | |
| Directory names to skip (e.g. {"__pycache__", ".git", "tests"}). | |
| """ | |
| if exclude_dirs is None: | |
| exclude_dirs = {"__pycache__", ".git", ".venv", "venv", "env", | |
| "node_modules", ".mypy_cache", ".pytest_cache", "dist", "build"} | |
| abs_root = str(Path(repo_path).resolve()) | |
| kg = KnowledgeGraph(repo_path=repo_path) | |
| # Root repo node | |
| repo_name = Path(abs_root).name | |
| repo_id = _node_id("repo", "", repo_name) | |
| kg.add_node(KGNode( | |
| node_id=repo_id, | |
| node_type="repo", | |
| name=repo_name, | |
| file_path="", | |
| )) | |
| # Walk directory tree | |
| for dirpath, dirnames, filenames in os.walk(abs_root): | |
| # Prune excluded dirs in-place (modifies os.walk traversal) | |
| dirnames[:] = [d for d in dirnames if d not in exclude_dirs] | |
| rel_dir = os.path.relpath(dirpath, abs_root) | |
| if rel_dir == ".": | |
| rel_dir = "" | |
| parent_id = repo_id | |
| if rel_dir: | |
| pkg_id = _node_id("package", rel_dir) | |
| if pkg_id not in kg._nodes: | |
| kg.add_node(KGNode( | |
| node_id=pkg_id, | |
| node_type="package", | |
| name=Path(rel_dir).name, | |
| file_path=rel_dir, | |
| )) | |
| kg.add_edge(KGEdge("contains", repo_id, pkg_id)) | |
| parent_id = pkg_id | |
| for fname in sorted(filenames): | |
| if not fname.endswith(".py"): | |
| continue | |
| rel_file = os.path.join(rel_dir, fname) if rel_dir else fname | |
| abs_file = os.path.join(dirpath, fname) | |
| _parse_file(rel_file, abs_file, kg, parent_id) | |
| return kg | |