Spaces:
Sleeping
Sleeping
INFINA-RD
refactor: DI infrastructure, service decomposition, repository helpers, test suite
e066621 | from __future__ import annotations | |
| import ast | |
| import pathlib | |
| import re | |
| import subprocess | |
| from typing import List, Tuple | |
| from langchain_core.tools import tool | |
| PROJECT_ROOT_DEFAULT = (pathlib.Path.cwd() / "generated_project").resolve() | |
| PROJECT_ROOT = PROJECT_ROOT_DEFAULT | |
| def safe_path_for_project(path: str) -> pathlib.Path: | |
| p = (PROJECT_ROOT / path).resolve() | |
| if PROJECT_ROOT.resolve() not in p.parents and PROJECT_ROOT.resolve() != p.parent and PROJECT_ROOT.resolve() != p: | |
| raise ValueError("Attempt to write outside project root") | |
| return p | |
| def write_file(path: str, content: str) -> str: | |
| """Writes content to a file at the specified path within the project root.""" | |
| p = safe_path_for_project(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| with open(p, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return f"WROTE:{p}" | |
| def read_file(path: str) -> str: | |
| """Reads content from a file at the specified path within the project root.""" | |
| p = safe_path_for_project(path) | |
| if not p.exists(): | |
| return "" | |
| with open(p, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def get_current_directory() -> str: | |
| """Returns the current working directory.""" | |
| return str(PROJECT_ROOT) | |
| def list_files(directory: str = ".") -> str: | |
| """Lists all files in the specified directory within the project root.""" | |
| p = safe_path_for_project(directory) | |
| if not p.is_dir(): | |
| return f"ERROR: {p} is not a directory" | |
| files = [str(f.relative_to(PROJECT_ROOT)) for f in p.glob("**/*") if f.is_file()] | |
| return "\n".join(files) if files else "No files found." | |
| def print_tree(path: str = ".", depth: int = 2) -> str: | |
| """ | |
| Prints a directory tree for the given path up to the specified depth. | |
| """ | |
| depth = max(1, depth) | |
| target = safe_path_for_project(path) | |
| if not target.exists(): | |
| return f"ERROR: Path does not exist: {target}" | |
| lines = [] | |
| start_depth = len(target.parts) | |
| for entry in sorted(target.rglob("*")): | |
| rel_depth = len(entry.parts) - start_depth | |
| if rel_depth > depth: | |
| continue | |
| indent = " " * rel_depth | |
| name = entry.name + ("/" if entry.is_dir() else "") | |
| lines.append(f"{indent}{name}") | |
| if target.is_file(): | |
| return f"{target.name} (file)" | |
| header = f"Tree for {target.relative_to(PROJECT_ROOT) if target != PROJECT_ROOT else '.'}" | |
| body = "\n".join(lines) if lines else "(empty directory)" | |
| return f"{header}\n{body}" | |
| def search_files(query: str, path: str = ".", max_results: int = 20) -> str: | |
| """ | |
| Searches for the query string inside files rooted at the given path. | |
| Returns up to max_results matches with file path and line content. | |
| """ | |
| p = safe_path_for_project(path) | |
| if not p.exists(): | |
| return f"ERROR: Path does not exist: {p}" | |
| matches = [] | |
| files = [p] if p.is_file() else sorted(f for f in p.rglob("*") if f.is_file()) | |
| for file_path in files: | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| for line_num, line in enumerate(f, 1): | |
| if query in line: | |
| rel_path = file_path.relative_to(PROJECT_ROOT) | |
| matches.append(f"{rel_path}:{line_num}: {line.rstrip()}") | |
| if len(matches) >= max_results: | |
| return "\n".join(matches) | |
| except UnicodeDecodeError: | |
| continue | |
| return "\n".join(matches) if matches else "No matches found." | |
| def _extract_python_symbols(content: str) -> Tuple[List[str], List[str]]: | |
| classes: List[str] = [] | |
| functions: List[str] = [] | |
| try: | |
| tree = ast.parse(content) | |
| except SyntaxError: | |
| return classes, functions | |
| for node in tree.body: | |
| if isinstance(node, ast.ClassDef): | |
| doc = ast.get_docstring(node) | |
| snippet = f" - {doc[:60].strip()}" if doc else "" | |
| classes.append(f"{node.name}{snippet}") | |
| elif isinstance(node, ast.FunctionDef): | |
| doc = ast.get_docstring(node) | |
| snippet = f" - {doc[:60].strip()}" if doc else "" | |
| functions.append(f"{node.name}{snippet}") | |
| return classes, functions | |
| def _extract_generic_symbols(content: str) -> Tuple[List[str], List[str]]: | |
| class_pattern = re.compile(r"^\s*(?:export\s+)?(?:abstract\s+)?class\s+([A-Za-z_][\w]*)", re.MULTILINE) | |
| func_pattern = re.compile(r"^\s*(?:export\s+)?(?:async\s+)?(?:def|function)\s+([A-Za-z_][\w]*)", re.MULTILINE) | |
| arrow_pattern = re.compile(r"^\s*const\s+([A-Za-z_][\w]*)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", re.MULTILINE) | |
| classes = class_pattern.findall(content) | |
| functions = sorted(set(func_pattern.findall(content)) | set(arrow_pattern.findall(content))) | |
| return classes, functions | |
| def _summarize_file(path: pathlib.Path) -> str: | |
| try: | |
| content = path.read_text(encoding="utf-8") | |
| except UnicodeDecodeError: | |
| content = path.read_text(encoding="latin-1", errors="ignore") | |
| relative_path = path.relative_to(PROJECT_ROOT) | |
| lines = content.count("\n") + 1 if content else 0 | |
| size = path.stat().st_size | |
| if path.suffix == ".py": | |
| classes, functions = _extract_python_symbols(content) | |
| else: | |
| classes, functions = _extract_generic_symbols(content) | |
| snippet = "" | |
| for line in content.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| snippet = stripped | |
| break | |
| file_summary = [ | |
| f"- {relative_path} (lines={lines}, bytes={size})" | |
| ] | |
| if classes: | |
| file_summary.append(f" Classes: {', '.join(classes[:5])}") | |
| if functions: | |
| file_summary.append(f" Functions: {', '.join(functions[:5])}") | |
| if snippet: | |
| file_summary.append(f" First code line: {snippet[:120]}") | |
| return "\n".join(file_summary) | |
| def _build_directory_tree(directory: pathlib.Path) -> str: | |
| lines = [] | |
| for path in sorted(directory.rglob("*")): | |
| depth = len(path.relative_to(directory).parts) | |
| prefix = " " * depth | |
| name = path.name + ("/" if path.is_dir() else "") | |
| lines.append(f"{prefix}{name}") | |
| return "No files found." if not lines else "\n".join(lines) | |
| def summarize_project(directory: str = ".") -> str: | |
| """Summarizes the structure of a directory under the project root, including files, classes, and functions.""" | |
| target = safe_path_for_project(directory) | |
| if not target.exists(): | |
| return f"Directory {directory} does not exist under project root." | |
| if target.is_file(): | |
| return _summarize_file(target) | |
| tree = _build_directory_tree(target) | |
| summaries = [] | |
| file_count = 0 | |
| MAX_FILES = 200 | |
| for path in sorted(target.rglob("*")): | |
| if path.is_file(): | |
| summaries.append(_summarize_file(path)) | |
| file_count += 1 | |
| if file_count >= MAX_FILES: | |
| summaries.append("...output truncated after 200 files...") | |
| break | |
| rel_target = "." if target == PROJECT_ROOT else str(target.relative_to(PROJECT_ROOT)) | |
| header = [ | |
| f"PROJECT SUMMARY: {rel_target}", | |
| "", | |
| "DIRECTORY TREE:", | |
| tree, | |
| "", | |
| "FILE SUMMARIES:", | |
| "\n".join(summaries) if summaries else "No files found in this directory." | |
| ] | |
| return "\n".join(header) | |
| def run_cmd(cmd: str, cwd: str = None, timeout: int = 30) -> Tuple[int, str, str]: | |
| """Runs a shell command in the specified directory and returns the result.""" | |
| cwd_dir = safe_path_for_project(cwd) if cwd else PROJECT_ROOT | |
| res = subprocess.run(cmd, shell=True, cwd=str(cwd_dir), capture_output=True, text=True, timeout=timeout) | |
| return res.returncode, res.stdout, res.stderr | |
| def init_project_root(path: str | pathlib.Path | None = None) -> str: | |
| """ | |
| Sets the project root to the provided path (absolute or relative). Defaults to the generated_project folder. | |
| Returns the absolute path that will be used by all tools. | |
| """ | |
| global PROJECT_ROOT | |
| if path: | |
| target = pathlib.Path(path).expanduser() | |
| if not target.is_absolute(): | |
| target = (pathlib.Path.cwd() / target).resolve() | |
| else: | |
| target = target.resolve() | |
| else: | |
| target = PROJECT_ROOT_DEFAULT.resolve() | |
| PROJECT_ROOT = target | |
| PROJECT_ROOT.mkdir(parents=True, exist_ok=True) | |
| return str(PROJECT_ROOT) | |
| def edit_file(path: str, updated_content: str) -> str: | |
| """ | |
| Edits an existing file within the project root by replacing its content. | |
| Unlike write_file, this will NOT create the file if it doesn't exist. | |
| """ | |
| p = safe_path_for_project(path) | |
| if not p.exists(): | |
| return f"ERROR: File does not exist: {p}" | |
| if p.is_dir(): | |
| return f"ERROR: {p} is a directory, not a file" | |
| with open(p, "w", encoding="utf-8") as f: | |
| f.write(updated_content) | |
| return f"UPDATED:{p}" | |
| def delete_file(path: str) -> str: | |
| """ | |
| Deletes a file within the project root. | |
| Will NOT delete directories. | |
| Will NOT operate outside project root. | |
| """ | |
| p = safe_path_for_project(path) | |
| if not p.exists(): | |
| return f"ERROR: File does not exist: {p}" | |
| if p.is_dir(): | |
| return f"ERROR: {p} is a directory, not a file" | |
| p.unlink() | |
| return f"DELETED:{p}" | |