shreyas-joshi's picture
Add Phase 06 Plan for Adaptive Judging and Edge Intelligence; Create initial project outline for GraphReview RL Environment
86c3e08
from __future__ import annotations
import argparse
import ast
import os
from pathlib import Path
from pydantic import BaseModel
from db.schema import EdgeType
from db.store import Store
from llm.edge_summarizer import EdgeSummarizer, EdgeSummaryInput
from parser.linter import run_linters
from parser.summarizer import summarize_module
_SKIP_DIRS = {
".git",
"__pycache__",
".pytest_cache",
".mypy_cache",
".venv",
"venv",
"node_modules",
"dist",
"build",
}
def _iter_python_files(target_dir: Path) -> list[Path]:
max_files = int(os.getenv("GRAPHREVIEW_MAX_FILES", "5000"))
py_files: list[Path] = []
for path in sorted(target_dir.rglob("*.py")):
if any(part in _SKIP_DIRS for part in path.parts):
continue
py_files.append(path)
if len(py_files) >= max_files:
break
return py_files
class ImportRef(BaseModel):
target_module: str
import_line: str
scope: str = "module_level"
weight: float = 1.0
edge_type: EdgeType = EdgeType.EXPLICIT_IMPORT
class ParsedModule(BaseModel):
module_id: str
raw_code: str
function_signatures: list[str]
classes: list[str]
imports: list[ImportRef]
constants: list[str]
dependencies: list[str]
class _Visitor(ast.NodeVisitor):
def __init__(self) -> None:
self.function_signatures: list[str] = []
self.classes: list[str] = []
self.constants: list[str] = []
self.imports: list[tuple[str, str, str]] = []
self._scope_stack: list[str] = []
@property
def _scope(self) -> str:
return "function_level" if self._scope_stack else "module_level"
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
args: list[str] = []
for arg in node.args.args:
if arg.annotation is not None:
args.append(f"{arg.arg}: {ast.unparse(arg.annotation)}")
else:
args.append(arg.arg)
returns = ast.unparse(node.returns) if node.returns is not None else "None"
self.function_signatures.append(f"{node.name}({', '.join(args)})->{returns}")
self._scope_stack.append(node.name)
try:
self.generic_visit(node)
finally:
self._scope_stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
fake = ast.FunctionDef(
name=node.name,
args=node.args,
body=node.body,
decorator_list=node.decorator_list,
returns=node.returns,
type_comment=node.type_comment,
)
self.visit_FunctionDef(fake)
def visit_ClassDef(self, node: ast.ClassDef) -> None:
self.classes.append(node.name)
self._scope_stack.append(node.name)
try:
self.generic_visit(node)
finally:
self._scope_stack.pop()
def visit_Import(self, node: ast.Import) -> None:
line = ast.get_source_segment(self._source, node) or "import"
for alias in node.names:
self.imports.append((alias.name, line, self._scope))
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
module = node.module or ""
level = node.level or 0
dotted = "." * level + module
line = ast.get_source_segment(self._source, node) or "from"
# Keep the base module edge and add candidate submodule edges.
# Non-module symbols are filtered later against known module ids.
if dotted:
self.imports.append((dotted, line, self._scope))
for alias in node.names:
if alias.name == "*":
continue
if dotted.endswith("."):
candidate = f"{dotted}{alias.name}"
else:
candidate = f"{dotted}.{alias.name}" if dotted else alias.name
self.imports.append((candidate, line, self._scope))
def visit_Assign(self, node: ast.Assign) -> None:
if isinstance(node.value, ast.Constant):
for target in node.targets:
if isinstance(target, ast.Name) and target.id.isupper():
self.constants.append(target.id)
self.generic_visit(node)
def parse(self, tree: ast.AST, source: str) -> None:
self._source = source
self.visit(tree)
def _to_module_id(path: Path, root: Path) -> str:
rel = path.resolve().relative_to(root.resolve())
return str(rel.with_suffix("")).replace("/", ".")
def _resolve_relative_import(current_module: str, ref: str) -> str:
if not ref.startswith("."):
return ref
dots = len(ref) - len(ref.lstrip("."))
suffix = ref.lstrip(".")
parts = current_module.split(".")
base = parts[:-dots] if dots <= len(parts) else []
if suffix:
base.append(suffix)
return ".".join(part for part in base if part)
def parse_python_file(path: Path, root_dir: Path) -> ParsedModule:
source = path.read_text(encoding="utf-8")
module_id = _to_module_id(path, root_dir)
try:
tree = ast.parse(source)
except SyntaxError:
return ParsedModule(
module_id=module_id,
raw_code=source,
function_signatures=[],
classes=[],
imports=[],
constants=[],
dependencies=[],
)
visitor = _Visitor()
visitor.parse(tree, source)
imports = [
ImportRef(
target_module=_resolve_relative_import(module_id, name),
import_line=line,
scope=scope,
weight=0.5 if scope == "function_level" else 1.0,
edge_type=EdgeType.EXPLICIT_IMPORT,
)
for name, line, scope in visitor.imports
]
dependencies = [imp.target_module for imp in imports if imp.target_module]
return ParsedModule(
module_id=module_id,
raw_code=source,
function_signatures=visitor.function_signatures,
classes=visitor.classes,
imports=imports,
constants=visitor.constants,
dependencies=dependencies,
)
def parse_directory(target_dir: Path, db_path: str | None = None) -> Store:
target_dir = target_dir.resolve()
store = Store(source_root=str(target_dir), db_path=db_path)
store.clear_source_graph()
py_files = _iter_python_files(target_dir)
parsed_modules = [parse_python_file(py_file, target_dir) for py_file in py_files]
known_module_ids = {parsed.module_id for parsed in parsed_modules}
edge_summarizer = EdgeSummarizer()
for py_file, parsed in zip(py_files, parsed_modules):
issues = run_linters(py_file)
summary = summarize_module(parsed, issues)
dep_reason = "Imports used by module-level and callable logic"
store.upsert_node(
module_id=parsed.module_id,
raw_code=parsed.raw_code,
ast_summary=summary,
dependency_reason=dep_reason,
)
store.replace_findings_for_module(
parsed.module_id,
[issue.model_dump() for issue in issues],
)
for imported in parsed.imports:
if imported.target_module and imported.target_module in known_module_ids:
connection_summary = edge_summarizer.summarize(
EdgeSummaryInput(
source_module_id=parsed.module_id,
target_module_id=imported.target_module,
edge_type=imported.edge_type.value,
import_line=imported.import_line,
scope=imported.scope,
)
)
store.upsert_edge(
source_module_id=parsed.module_id,
target_module_id=imported.target_module,
edge_type=imported.edge_type,
import_line=imported.import_line,
weight=imported.weight,
connection_summary=connection_summary,
)
return store
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Parse Python codebase into SQLite graph")
parser.add_argument("target", help="Path to target codebase")
parser.add_argument("--db-path", default=None, help="Path to SQLite database")
return parser
def main() -> None:
args = _build_parser().parse_args()
target_dir = Path(args.target)
store = parse_directory(target_dir=target_dir, db_path=args.db_path)
snapshot = store.get_full_graph()
print(
f"Populated DB for {target_dir} with "
f"{len(snapshot.nodes)} nodes and {len(snapshot.edges)} edges"
)
if __name__ == "__main__":
main()