|
|
""" |
|
|
File system tools with security policies enforced |
|
|
""" |
|
|
import os |
|
|
import shutil |
|
|
import pathlib |
|
|
from typing import Dict, Any, List, Optional |
|
|
from send2trash import send2trash |
|
|
|
|
|
from autonomous_config import is_write_path_allowed |
|
|
from utils.format import format_file_info, format_directory_listing |
|
|
from utils.paths import resolve_path, get_safe_path |
|
|
|
|
|
|
|
|
async def handle_read(file_path: str) -> Dict[str, Any]: |
|
|
"""Read file contents""" |
|
|
try: |
|
|
resolved_path = resolve_path(file_path) |
|
|
if not resolved_path.exists(): |
|
|
return {"success": False, "error": f"File not found: {file_path}"} |
|
|
|
|
|
if resolved_path.is_dir(): |
|
|
return {"success": False, "error": f"Path is a directory: {file_path}"} |
|
|
|
|
|
content = resolved_path.read_text(encoding='utf-8') |
|
|
return { |
|
|
"success": True, |
|
|
"content": content, |
|
|
"file_path": str(resolved_path), |
|
|
"size": len(content) |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to read file: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_write(file_path: str, content: str) -> Dict[str, Any]: |
|
|
"""Write content to file with security policy enforcement""" |
|
|
try: |
|
|
resolved_path = resolve_path(file_path) |
|
|
|
|
|
|
|
|
if not is_write_path_allowed(str(resolved_path)): |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"🚫 Caminho não permitido pela política de segurança: {file_path}" |
|
|
} |
|
|
|
|
|
|
|
|
resolved_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
resolved_path.write_text(content, encoding='utf-8') |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"file_path": str(resolved_path), |
|
|
"bytes_written": len(content.encode('utf-8')), |
|
|
"message": f"✅ Arquivo gravado com sucesso: {file_path}" |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to write file: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_list(directory_path: str = ".") -> Dict[str, Any]: |
|
|
"""List directory contents""" |
|
|
try: |
|
|
resolved_path = resolve_path(directory_path) |
|
|
if not resolved_path.exists(): |
|
|
return {"success": False, "error": f"Directory not found: {directory_path}"} |
|
|
|
|
|
if not resolved_path.is_dir(): |
|
|
return {"success": False, "error": f"Path is not a directory: {directory_path}"} |
|
|
|
|
|
items = [] |
|
|
for item in resolved_path.iterdir(): |
|
|
items.append({ |
|
|
"name": item.name, |
|
|
"type": "directory" if item.is_dir() else "file", |
|
|
"size": item.stat().st_size if item.is_file() else None, |
|
|
"modified": item.stat().st_mtime |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"directory": str(resolved_path), |
|
|
"items": items, |
|
|
"total_items": len(items) |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to list directory: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_copy(source_path: str, dest_path: str) -> Dict[str, Any]: |
|
|
"""Copy file or directory with security policy enforcement""" |
|
|
try: |
|
|
source = resolve_path(source_path) |
|
|
dest = resolve_path(dest_path) |
|
|
|
|
|
if not source.exists(): |
|
|
return {"success": False, "error": f"Source not found: {source_path}"} |
|
|
|
|
|
|
|
|
if not is_write_path_allowed(str(dest)): |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"🚫 Destino não permitido pela política: {dest_path}" |
|
|
} |
|
|
|
|
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if source.is_file(): |
|
|
shutil.copy2(source, dest) |
|
|
operation = "File copied" |
|
|
else: |
|
|
shutil.copytree(source, dest, dirs_exist_ok=True) |
|
|
operation = "Directory copied" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"source": str(source), |
|
|
"destination": str(dest), |
|
|
"operation": operation, |
|
|
"message": f"✅ {operation}: {source_path} → {dest_path}" |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to copy: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_move(source_path: str, dest_path: str) -> Dict[str, Any]: |
|
|
"""Move file or directory with security policy enforcement""" |
|
|
try: |
|
|
source = resolve_path(source_path) |
|
|
dest = resolve_path(dest_path) |
|
|
|
|
|
if not source.exists(): |
|
|
return {"success": False, "error": f"Source not found: {source_path}"} |
|
|
|
|
|
|
|
|
if not is_write_path_allowed(str(dest)): |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"🚫 Destino não permitido pela política: {dest_path}" |
|
|
} |
|
|
|
|
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
shutil.move(str(source), str(dest)) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"source": str(source), |
|
|
"destination": str(dest), |
|
|
"message": f"✅ Movido: {source_path} → {dest_path}" |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to move: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_delete(file_path: str, mode: str = "trash") -> Dict[str, Any]: |
|
|
"""Delete file or directory (trash or hard delete)""" |
|
|
try: |
|
|
resolved_path = resolve_path(file_path) |
|
|
|
|
|
if not resolved_path.exists(): |
|
|
return {"success": False, "error": f"Path not found: {file_path}"} |
|
|
|
|
|
if mode == "trash": |
|
|
send2trash(str(resolved_path)) |
|
|
message = f"🗑️ Movido para lixeira: {file_path}" |
|
|
elif mode == "hard": |
|
|
if resolved_path.is_file(): |
|
|
resolved_path.unlink() |
|
|
message = f"💥 Arquivo deletado permanentemente: {file_path}" |
|
|
else: |
|
|
shutil.rmtree(resolved_path) |
|
|
message = f"💥 Diretório deletado permanentemente: {file_path}" |
|
|
else: |
|
|
return {"success": False, "error": f"Invalid delete mode: {mode}"} |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"path": str(resolved_path), |
|
|
"mode": mode, |
|
|
"message": message |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to delete: {str(e)}"} |
|
|
|
|
|
|
|
|
async def handle_find(pattern: str, directory: str = ".", max_results: int = 100) -> Dict[str, Any]: |
|
|
"""Find files matching pattern""" |
|
|
try: |
|
|
resolved_dir = resolve_path(directory) |
|
|
if not resolved_dir.exists() or not resolved_dir.is_dir(): |
|
|
return {"success": False, "error": f"Invalid directory: {directory}"} |
|
|
|
|
|
matches = [] |
|
|
for path in resolved_dir.rglob(pattern): |
|
|
if len(matches) >= max_results: |
|
|
break |
|
|
matches.append({ |
|
|
"path": str(path.relative_to(resolved_dir)), |
|
|
"full_path": str(path), |
|
|
"type": "directory" if path.is_dir() else "file", |
|
|
"size": path.stat().st_size if path.is_file() else None |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"pattern": pattern, |
|
|
"directory": str(resolved_dir), |
|
|
"matches": matches, |
|
|
"total_found": len(matches), |
|
|
"truncated": len(matches) >= max_results |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"Failed to find files: {str(e)}"} |