INFINA-RD
refactor: DI infrastructure, service decomposition, repository helpers, test suite
e066621
from __future__ import annotations
import ast
import pathlib
import re
import subprocess
from typing import List, Tuple
from langchain_core.tools import tool
PROJECT_ROOT_DEFAULT = (pathlib.Path.cwd() / "generated_project").resolve()
PROJECT_ROOT = PROJECT_ROOT_DEFAULT
def safe_path_for_project(path: str) -> pathlib.Path:
p = (PROJECT_ROOT / path).resolve()
if PROJECT_ROOT.resolve() not in p.parents and PROJECT_ROOT.resolve() != p.parent and PROJECT_ROOT.resolve() != p:
raise ValueError("Attempt to write outside project root")
return p
@tool
def write_file(path: str, content: str) -> str:
"""Writes content to a file at the specified path within the project root."""
p = safe_path_for_project(path)
p.parent.mkdir(parents=True, exist_ok=True)
with open(p, "w", encoding="utf-8") as f:
f.write(content)
return f"WROTE:{p}"
@tool
def read_file(path: str) -> str:
"""Reads content from a file at the specified path within the project root."""
p = safe_path_for_project(path)
if not p.exists():
return ""
with open(p, "r", encoding="utf-8") as f:
return f.read()
@tool
def get_current_directory() -> str:
"""Returns the current working directory."""
return str(PROJECT_ROOT)
@tool
def list_files(directory: str = ".") -> str:
"""Lists all files in the specified directory within the project root."""
p = safe_path_for_project(directory)
if not p.is_dir():
return f"ERROR: {p} is not a directory"
files = [str(f.relative_to(PROJECT_ROOT)) for f in p.glob("**/*") if f.is_file()]
return "\n".join(files) if files else "No files found."
@tool
def print_tree(path: str = ".", depth: int = 2) -> str:
"""
Prints a directory tree for the given path up to the specified depth.
"""
depth = max(1, depth)
target = safe_path_for_project(path)
if not target.exists():
return f"ERROR: Path does not exist: {target}"
lines = []
start_depth = len(target.parts)
for entry in sorted(target.rglob("*")):
rel_depth = len(entry.parts) - start_depth
if rel_depth > depth:
continue
indent = " " * rel_depth
name = entry.name + ("/" if entry.is_dir() else "")
lines.append(f"{indent}{name}")
if target.is_file():
return f"{target.name} (file)"
header = f"Tree for {target.relative_to(PROJECT_ROOT) if target != PROJECT_ROOT else '.'}"
body = "\n".join(lines) if lines else "(empty directory)"
return f"{header}\n{body}"
@tool
def search_files(query: str, path: str = ".", max_results: int = 20) -> str:
"""
Searches for the query string inside files rooted at the given path.
Returns up to max_results matches with file path and line content.
"""
p = safe_path_for_project(path)
if not p.exists():
return f"ERROR: Path does not exist: {p}"
matches = []
files = [p] if p.is_file() else sorted(f for f in p.rglob("*") if f.is_file())
for file_path in files:
try:
with open(file_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
if query in line:
rel_path = file_path.relative_to(PROJECT_ROOT)
matches.append(f"{rel_path}:{line_num}: {line.rstrip()}")
if len(matches) >= max_results:
return "\n".join(matches)
except UnicodeDecodeError:
continue
return "\n".join(matches) if matches else "No matches found."
def _extract_python_symbols(content: str) -> Tuple[List[str], List[str]]:
classes: List[str] = []
functions: List[str] = []
try:
tree = ast.parse(content)
except SyntaxError:
return classes, functions
for node in tree.body:
if isinstance(node, ast.ClassDef):
doc = ast.get_docstring(node)
snippet = f" - {doc[:60].strip()}" if doc else ""
classes.append(f"{node.name}{snippet}")
elif isinstance(node, ast.FunctionDef):
doc = ast.get_docstring(node)
snippet = f" - {doc[:60].strip()}" if doc else ""
functions.append(f"{node.name}{snippet}")
return classes, functions
def _extract_generic_symbols(content: str) -> Tuple[List[str], List[str]]:
class_pattern = re.compile(r"^\s*(?:export\s+)?(?:abstract\s+)?class\s+([A-Za-z_][\w]*)", re.MULTILINE)
func_pattern = re.compile(r"^\s*(?:export\s+)?(?:async\s+)?(?:def|function)\s+([A-Za-z_][\w]*)", re.MULTILINE)
arrow_pattern = re.compile(r"^\s*const\s+([A-Za-z_][\w]*)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", re.MULTILINE)
classes = class_pattern.findall(content)
functions = sorted(set(func_pattern.findall(content)) | set(arrow_pattern.findall(content)))
return classes, functions
def _summarize_file(path: pathlib.Path) -> str:
try:
content = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
content = path.read_text(encoding="latin-1", errors="ignore")
relative_path = path.relative_to(PROJECT_ROOT)
lines = content.count("\n") + 1 if content else 0
size = path.stat().st_size
if path.suffix == ".py":
classes, functions = _extract_python_symbols(content)
else:
classes, functions = _extract_generic_symbols(content)
snippet = ""
for line in content.splitlines():
stripped = line.strip()
if stripped:
snippet = stripped
break
file_summary = [
f"- {relative_path} (lines={lines}, bytes={size})"
]
if classes:
file_summary.append(f" Classes: {', '.join(classes[:5])}")
if functions:
file_summary.append(f" Functions: {', '.join(functions[:5])}")
if snippet:
file_summary.append(f" First code line: {snippet[:120]}")
return "\n".join(file_summary)
def _build_directory_tree(directory: pathlib.Path) -> str:
lines = []
for path in sorted(directory.rglob("*")):
depth = len(path.relative_to(directory).parts)
prefix = " " * depth
name = path.name + ("/" if path.is_dir() else "")
lines.append(f"{prefix}{name}")
return "No files found." if not lines else "\n".join(lines)
@tool
def summarize_project(directory: str = ".") -> str:
"""Summarizes the structure of a directory under the project root, including files, classes, and functions."""
target = safe_path_for_project(directory)
if not target.exists():
return f"Directory {directory} does not exist under project root."
if target.is_file():
return _summarize_file(target)
tree = _build_directory_tree(target)
summaries = []
file_count = 0
MAX_FILES = 200
for path in sorted(target.rglob("*")):
if path.is_file():
summaries.append(_summarize_file(path))
file_count += 1
if file_count >= MAX_FILES:
summaries.append("...output truncated after 200 files...")
break
rel_target = "." if target == PROJECT_ROOT else str(target.relative_to(PROJECT_ROOT))
header = [
f"PROJECT SUMMARY: {rel_target}",
"",
"DIRECTORY TREE:",
tree,
"",
"FILE SUMMARIES:",
"\n".join(summaries) if summaries else "No files found in this directory."
]
return "\n".join(header)
@tool
def run_cmd(cmd: str, cwd: str = None, timeout: int = 30) -> Tuple[int, str, str]:
"""Runs a shell command in the specified directory and returns the result."""
cwd_dir = safe_path_for_project(cwd) if cwd else PROJECT_ROOT
res = subprocess.run(cmd, shell=True, cwd=str(cwd_dir), capture_output=True, text=True, timeout=timeout)
return res.returncode, res.stdout, res.stderr
def init_project_root(path: str | pathlib.Path | None = None) -> str:
"""
Sets the project root to the provided path (absolute or relative). Defaults to the generated_project folder.
Returns the absolute path that will be used by all tools.
"""
global PROJECT_ROOT
if path:
target = pathlib.Path(path).expanduser()
if not target.is_absolute():
target = (pathlib.Path.cwd() / target).resolve()
else:
target = target.resolve()
else:
target = PROJECT_ROOT_DEFAULT.resolve()
PROJECT_ROOT = target
PROJECT_ROOT.mkdir(parents=True, exist_ok=True)
return str(PROJECT_ROOT)
@tool
def edit_file(path: str, updated_content: str) -> str:
"""
Edits an existing file within the project root by replacing its content.
Unlike write_file, this will NOT create the file if it doesn't exist.
"""
p = safe_path_for_project(path)
if not p.exists():
return f"ERROR: File does not exist: {p}"
if p.is_dir():
return f"ERROR: {p} is a directory, not a file"
with open(p, "w", encoding="utf-8") as f:
f.write(updated_content)
return f"UPDATED:{p}"
@tool
def delete_file(path: str) -> str:
"""
Deletes a file within the project root.
Will NOT delete directories.
Will NOT operate outside project root.
"""
p = safe_path_for_project(path)
if not p.exists():
return f"ERROR: File does not exist: {p}"
if p.is_dir():
return f"ERROR: {p} is a directory, not a file"
p.unlink()
return f"DELETED:{p}"