HASHIRU / tools /fs.py
mulambo's picture
Initial commit
fea1bd1
"""
File system tools with security policies enforced
"""
import os
import shutil
import pathlib
from typing import Dict, Any, List, Optional
from send2trash import send2trash
from autonomous_config import is_write_path_allowed
from utils.format import format_file_info, format_directory_listing
from utils.paths import resolve_path, get_safe_path
async def handle_read(file_path: str) -> Dict[str, Any]:
"""Read file contents"""
try:
resolved_path = resolve_path(file_path)
if not resolved_path.exists():
return {"success": False, "error": f"File not found: {file_path}"}
if resolved_path.is_dir():
return {"success": False, "error": f"Path is a directory: {file_path}"}
content = resolved_path.read_text(encoding='utf-8')
return {
"success": True,
"content": content,
"file_path": str(resolved_path),
"size": len(content)
}
except Exception as e:
return {"success": False, "error": f"Failed to read file: {str(e)}"}
async def handle_write(file_path: str, content: str) -> Dict[str, Any]:
"""Write content to file with security policy enforcement"""
try:
resolved_path = resolve_path(file_path)
# 🚨 ENFORCE SECURITY POLICY
if not is_write_path_allowed(str(resolved_path)):
return {
"success": False,
"error": f"🚫 Caminho não permitido pela política de segurança: {file_path}"
}
# Create parent directories if needed
resolved_path.parent.mkdir(parents=True, exist_ok=True)
# Write content
resolved_path.write_text(content, encoding='utf-8')
return {
"success": True,
"file_path": str(resolved_path),
"bytes_written": len(content.encode('utf-8')),
"message": f"✅ Arquivo gravado com sucesso: {file_path}"
}
except Exception as e:
return {"success": False, "error": f"Failed to write file: {str(e)}"}
async def handle_list(directory_path: str = ".") -> Dict[str, Any]:
"""List directory contents"""
try:
resolved_path = resolve_path(directory_path)
if not resolved_path.exists():
return {"success": False, "error": f"Directory not found: {directory_path}"}
if not resolved_path.is_dir():
return {"success": False, "error": f"Path is not a directory: {directory_path}"}
items = []
for item in resolved_path.iterdir():
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
"modified": item.stat().st_mtime
})
return {
"success": True,
"directory": str(resolved_path),
"items": items,
"total_items": len(items)
}
except Exception as e:
return {"success": False, "error": f"Failed to list directory: {str(e)}"}
async def handle_copy(source_path: str, dest_path: str) -> Dict[str, Any]:
"""Copy file or directory with security policy enforcement"""
try:
source = resolve_path(source_path)
dest = resolve_path(dest_path)
if not source.exists():
return {"success": False, "error": f"Source not found: {source_path}"}
# 🚨 ENFORCE SECURITY POLICY FOR DESTINATION
if not is_write_path_allowed(str(dest)):
return {
"success": False,
"error": f"🚫 Destino não permitido pela política: {dest_path}"
}
# Create parent directories if needed
dest.parent.mkdir(parents=True, exist_ok=True)
if source.is_file():
shutil.copy2(source, dest)
operation = "File copied"
else:
shutil.copytree(source, dest, dirs_exist_ok=True)
operation = "Directory copied"
return {
"success": True,
"source": str(source),
"destination": str(dest),
"operation": operation,
"message": f"✅ {operation}: {source_path}{dest_path}"
}
except Exception as e:
return {"success": False, "error": f"Failed to copy: {str(e)}"}
async def handle_move(source_path: str, dest_path: str) -> Dict[str, Any]:
"""Move file or directory with security policy enforcement"""
try:
source = resolve_path(source_path)
dest = resolve_path(dest_path)
if not source.exists():
return {"success": False, "error": f"Source not found: {source_path}"}
# 🚨 ENFORCE SECURITY POLICY FOR DESTINATION
if not is_write_path_allowed(str(dest)):
return {
"success": False,
"error": f"🚫 Destino não permitido pela política: {dest_path}"
}
# Create parent directories if needed
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest))
return {
"success": True,
"source": str(source),
"destination": str(dest),
"message": f"✅ Movido: {source_path}{dest_path}"
}
except Exception as e:
return {"success": False, "error": f"Failed to move: {str(e)}"}
async def handle_delete(file_path: str, mode: str = "trash") -> Dict[str, Any]:
"""Delete file or directory (trash or hard delete)"""
try:
resolved_path = resolve_path(file_path)
if not resolved_path.exists():
return {"success": False, "error": f"Path not found: {file_path}"}
if mode == "trash":
send2trash(str(resolved_path))
message = f"🗑️ Movido para lixeira: {file_path}"
elif mode == "hard":
if resolved_path.is_file():
resolved_path.unlink()
message = f"💥 Arquivo deletado permanentemente: {file_path}"
else:
shutil.rmtree(resolved_path)
message = f"💥 Diretório deletado permanentemente: {file_path}"
else:
return {"success": False, "error": f"Invalid delete mode: {mode}"}
return {
"success": True,
"path": str(resolved_path),
"mode": mode,
"message": message
}
except Exception as e:
return {"success": False, "error": f"Failed to delete: {str(e)}"}
async def handle_find(pattern: str, directory: str = ".", max_results: int = 100) -> Dict[str, Any]:
"""Find files matching pattern"""
try:
resolved_dir = resolve_path(directory)
if not resolved_dir.exists() or not resolved_dir.is_dir():
return {"success": False, "error": f"Invalid directory: {directory}"}
matches = []
for path in resolved_dir.rglob(pattern):
if len(matches) >= max_results:
break
matches.append({
"path": str(path.relative_to(resolved_dir)),
"full_path": str(path),
"type": "directory" if path.is_dir() else "file",
"size": path.stat().st_size if path.is_file() else None
})
return {
"success": True,
"pattern": pattern,
"directory": str(resolved_dir),
"matches": matches,
"total_found": len(matches),
"truncated": len(matches) >= max_results
}
except Exception as e:
return {"success": False, "error": f"Failed to find files: {str(e)}"}