File size: 7,879 Bytes
fea1bd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""
File system tools with security policies enforced
"""
import os
import shutil
import pathlib
from typing import Dict, Any, List, Optional
from send2trash import send2trash

from autonomous_config import is_write_path_allowed
from utils.format import format_file_info, format_directory_listing
from utils.paths import resolve_path, get_safe_path


async def handle_read(file_path: str) -> Dict[str, Any]:
    """Read file contents"""
    try:
        resolved_path = resolve_path(file_path)
        if not resolved_path.exists():
            return {"success": False, "error": f"File not found: {file_path}"}
        
        if resolved_path.is_dir():
            return {"success": False, "error": f"Path is a directory: {file_path}"}
            
        content = resolved_path.read_text(encoding='utf-8')
        return {
            "success": True,
            "content": content,
            "file_path": str(resolved_path),
            "size": len(content)
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to read file: {str(e)}"}


async def handle_write(file_path: str, content: str) -> Dict[str, Any]:
    """Write content to file with security policy enforcement"""
    try:
        resolved_path = resolve_path(file_path)
        
        # 🚨 ENFORCE SECURITY POLICY
        if not is_write_path_allowed(str(resolved_path)):
            return {
                "success": False, 
                "error": f"🚫 Caminho não permitido pela política de segurança: {file_path}"
            }
        
        # Create parent directories if needed
        resolved_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Write content
        resolved_path.write_text(content, encoding='utf-8')
        
        return {
            "success": True,
            "file_path": str(resolved_path),
            "bytes_written": len(content.encode('utf-8')),
            "message": f"✅ Arquivo gravado com sucesso: {file_path}"
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to write file: {str(e)}"}


async def handle_list(directory_path: str = ".") -> Dict[str, Any]:
    """List directory contents"""
    try:
        resolved_path = resolve_path(directory_path)
        if not resolved_path.exists():
            return {"success": False, "error": f"Directory not found: {directory_path}"}
        
        if not resolved_path.is_dir():
            return {"success": False, "error": f"Path is not a directory: {directory_path}"}
        
        items = []
        for item in resolved_path.iterdir():
            items.append({
                "name": item.name,
                "type": "directory" if item.is_dir() else "file",
                "size": item.stat().st_size if item.is_file() else None,
                "modified": item.stat().st_mtime
            })
        
        return {
            "success": True,
            "directory": str(resolved_path),
            "items": items,
            "total_items": len(items)
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to list directory: {str(e)}"}


async def handle_copy(source_path: str, dest_path: str) -> Dict[str, Any]:
    """Copy file or directory with security policy enforcement"""
    try:
        source = resolve_path(source_path)
        dest = resolve_path(dest_path)
        
        if not source.exists():
            return {"success": False, "error": f"Source not found: {source_path}"}
        
        # 🚨 ENFORCE SECURITY POLICY FOR DESTINATION
        if not is_write_path_allowed(str(dest)):
            return {
                "success": False,
                "error": f"🚫 Destino não permitido pela política: {dest_path}"
            }
        
        # Create parent directories if needed
        dest.parent.mkdir(parents=True, exist_ok=True)
        
        if source.is_file():
            shutil.copy2(source, dest)
            operation = "File copied"
        else:
            shutil.copytree(source, dest, dirs_exist_ok=True)
            operation = "Directory copied"
        
        return {
            "success": True,
            "source": str(source),
            "destination": str(dest),
            "operation": operation,
            "message": f"✅ {operation}: {source_path}{dest_path}"
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to copy: {str(e)}"}


async def handle_move(source_path: str, dest_path: str) -> Dict[str, Any]:
    """Move file or directory with security policy enforcement"""
    try:
        source = resolve_path(source_path)
        dest = resolve_path(dest_path)
        
        if not source.exists():
            return {"success": False, "error": f"Source not found: {source_path}"}
        
        # 🚨 ENFORCE SECURITY POLICY FOR DESTINATION
        if not is_write_path_allowed(str(dest)):
            return {
                "success": False,
                "error": f"🚫 Destino não permitido pela política: {dest_path}"
            }
        
        # Create parent directories if needed
        dest.parent.mkdir(parents=True, exist_ok=True)
        
        shutil.move(str(source), str(dest))
        
        return {
            "success": True,
            "source": str(source),
            "destination": str(dest),
            "message": f"✅ Movido: {source_path}{dest_path}"
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to move: {str(e)}"}


async def handle_delete(file_path: str, mode: str = "trash") -> Dict[str, Any]:
    """Delete file or directory (trash or hard delete)"""
    try:
        resolved_path = resolve_path(file_path)
        
        if not resolved_path.exists():
            return {"success": False, "error": f"Path not found: {file_path}"}
        
        if mode == "trash":
            send2trash(str(resolved_path))
            message = f"🗑️ Movido para lixeira: {file_path}"
        elif mode == "hard":
            if resolved_path.is_file():
                resolved_path.unlink()
                message = f"💥 Arquivo deletado permanentemente: {file_path}"
            else:
                shutil.rmtree(resolved_path)
                message = f"💥 Diretório deletado permanentemente: {file_path}"
        else:
            return {"success": False, "error": f"Invalid delete mode: {mode}"}
        
        return {
            "success": True,
            "path": str(resolved_path),
            "mode": mode,
            "message": message
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to delete: {str(e)}"}


async def handle_find(pattern: str, directory: str = ".", max_results: int = 100) -> Dict[str, Any]:
    """Find files matching pattern"""
    try:
        resolved_dir = resolve_path(directory)
        if not resolved_dir.exists() or not resolved_dir.is_dir():
            return {"success": False, "error": f"Invalid directory: {directory}"}
        
        matches = []
        for path in resolved_dir.rglob(pattern):
            if len(matches) >= max_results:
                break
            matches.append({
                "path": str(path.relative_to(resolved_dir)),
                "full_path": str(path),
                "type": "directory" if path.is_dir() else "file",
                "size": path.stat().st_size if path.is_file() else None
            })
        
        return {
            "success": True,
            "pattern": pattern,
            "directory": str(resolved_dir),
            "matches": matches,
            "total_found": len(matches),
            "truncated": len(matches) >= max_results
        }
    except Exception as e:
        return {"success": False, "error": f"Failed to find files: {str(e)}"}