YussefGAFeer commited on
Commit
035cf1a
·
verified ·
1 Parent(s): e741a9e

Create system_utils.py

Browse files
Files changed (1) hide show
  1. system_utils.py +401 -0
system_utils.py ADDED
@@ -0,0 +1,401 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import subprocess
3
+ import asyncio
4
+ import shutil
5
+ import logging
6
+ import stat
7
+ import platform
8
+ from typing import Dict, List, Optional, Tuple, Union, Any
9
+ from pathlib import Path
10
+
11
+ # Configure logging
12
+ logging.basicConfig(level=logging.INFO)
13
+ logger = logging.getLogger(__name__)
14
+
15
+ class SystemUtils:
16
+ def __init__(self, allowed_dirs: List[str] = None, allowed_commands: List[str] = None):
17
+ """
18
+ Initialize SystemUtils with security constraints
19
+
20
+ Args:
21
+ allowed_dirs: List of directories where file operations are allowed
22
+ allowed_commands: List of allowed shell commands (if None, all commands are allowed - NOT RECOMMENDED)
23
+ """
24
+ # Default allowed directories (current working directory and home directory)
25
+ self.allowed_dirs = allowed_dirs or [
26
+ os.getcwd(),
27
+ os.path.expanduser("~"),
28
+ ]
29
+
30
+ # Default allowed commands (empty list means all commands are allowed - BE CAREFUL!)
31
+ self.allowed_commands = allowed_commands or []
32
+
33
+ # Platform-specific settings
34
+ self.is_windows = platform.system() == 'Windows'
35
+ self.shell = True if self.is_windows else False
36
+
37
+ # For tracking command history
38
+ self.command_history = []
39
+
40
+ async def execute_command(self, command: Union[str, List[str]], cwd: str = None,
41
+ timeout: int = 30) -> Dict[str, Any]:
42
+ """
43
+ Execute a shell command asynchronously
44
+
45
+ Args:
46
+ command: Command to execute (string or list of args)
47
+ cwd: Working directory for the command
48
+ timeout: Maximum time to wait for command completion (seconds)
49
+
50
+ Returns:
51
+ Dict containing command results or error information
52
+ """
53
+ if not command:
54
+ return {"status": "error", "message": "No command provided"}
55
+
56
+ # Convert command to string for logging and history
57
+ cmd_str = ' '.join(command) if isinstance(command, list) else command
58
+
59
+ # Log the command
60
+ logger.info(f"Executing command: {cmd_str}")
61
+
62
+ # Add to command history
63
+ self.command_history.append({
64
+ 'command': cmd_str,
65
+ 'time': asyncio.get_event_loop().time(),
66
+ 'cwd': cwd or os.getcwd()
67
+ })
68
+
69
+ # Check if command is allowed
70
+ if self.allowed_commands and not any(
71
+ cmd_str.startswith(allowed) for allowed in self.allowed_commands
72
+ ):
73
+ return {
74
+ "status": "error",
75
+ "message": "Command not allowed by security policy",
76
+ "command": cmd_str
77
+ }
78
+
79
+ # Set working directory
80
+ work_dir = cwd or os.getcwd()
81
+
82
+ try:
83
+ # Create subprocess
84
+ process = await asyncio.create_subprocess_shell(
85
+ cmd_str,
86
+ stdout=asyncio.subprocess.PIPE,
87
+ stderr=asyncio.subprocess.PIPE,
88
+ cwd=work_dir,
89
+ shell=self.shell
90
+ )
91
+
92
+ # Wait for the process to complete with timeout
93
+ try:
94
+ stdout, stderr = await asyncio.wait_for(
95
+ process.communicate(),
96
+ timeout=timeout
97
+ )
98
+ except asyncio.TimeoutError:
99
+ process.kill()
100
+ return {
101
+ "status": "error",
102
+ "message": f"Command timed out after {timeout} seconds",
103
+ "command": cmd_str
104
+ }
105
+
106
+ # Get return code and output
107
+ return_code = process.returncode
108
+ stdout_str = stdout.decode().strip()
109
+ stderr_str = stderr.decode().strip()
110
+
111
+ return {
112
+ "status": "success" if return_code == 0 else "error",
113
+ "return_code": return_code,
114
+ "stdout": stdout_str,
115
+ "stderr": stderr_str,
116
+ "command": cmd_str
117
+ }
118
+
119
+ except Exception as e:
120
+ logger.error(f"Error executing command: {str(e)}")
121
+ return {
122
+ "status": "error",
123
+ "message": str(e),
124
+ "command": cmd_str
125
+ }
126
+
127
+ async def list_directory(self, path: str = ".") -> Dict[str, Any]:
128
+ """List contents of a directory"""
129
+ try:
130
+ abs_path = os.path.abspath(path)
131
+
132
+ # Check if path is in allowed directories
133
+ if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
134
+ return {
135
+ "status": "error",
136
+ "message": f"Access to {abs_path} is not allowed by security policy"
137
+ }
138
+
139
+ if not os.path.exists(abs_path):
140
+ return {
141
+ "status": "error",
142
+ "message": f"Path does not exist: {abs_path}"
143
+ }
144
+
145
+ if not os.path.isdir(abs_path):
146
+ return {
147
+ "status": "error",
148
+ "message": f"Path is not a directory: {abs_path}"
149
+ }
150
+
151
+ # Get directory contents
152
+ contents = []
153
+ for entry in os.scandir(abs_path):
154
+ try:
155
+ stat_info = entry.stat()
156
+ contents.append({
157
+ 'name': entry.name,
158
+ 'path': entry.path,
159
+ 'type': 'file' if entry.is_file() else 'directory',
160
+ 'size': stat_info.st_size,
161
+ 'created': stat_info.st_ctime,
162
+ 'modified': stat_info.st_mtime,
163
+ 'mode': stat_info.st_mode
164
+ })
165
+ except OSError as e:
166
+ logger.warning(f"Error accessing {entry.path}: {str(e)}")
167
+
168
+ return {
169
+ "status": "success",
170
+ "path": abs_path,
171
+ "contents": contents
172
+ }
173
+
174
+ except Exception as e:
175
+ logger.error(f"Error listing directory {path}: {str(e)}")
176
+ return {
177
+ "status": "error",
178
+ "message": str(e)
179
+ }
180
+
181
+ async def read_file(self, file_path: str) -> Dict[str, Any]:
182
+ """Read the contents of a file"""
183
+ try:
184
+ abs_path = os.path.abspath(file_path)
185
+
186
+ # Check if path is in allowed directories
187
+ if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
188
+ return {
189
+ "status": "error",
190
+ "message": f"Access to {abs_path} is not allowed by security policy"
191
+ }
192
+
193
+ if not os.path.exists(abs_path):
194
+ return {
195
+ "status": "error",
196
+ "message": f"File does not exist: {abs_path}"
197
+ }
198
+
199
+ if not os.path.isfile(abs_path):
200
+ return {
201
+ "status": "error",
202
+ "message": f"Path is not a file: {abs_path}"
203
+ }
204
+
205
+ # Check file size before reading (limit to 10MB)
206
+ file_size = os.path.getsize(abs_path)
207
+ if file_size > 10 * 1024 * 1024: # 10MB
208
+ return {
209
+ "status": "error",
210
+ "message": f"File is too large ({file_size} bytes). Maximum allowed size is 10MB."
211
+ }
212
+
213
+ # Read file content
214
+ with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
215
+ content = f.read()
216
+
217
+ return {
218
+ "status": "success",
219
+ "path": abs_path,
220
+ "content": content,
221
+ "size": file_size
222
+ }
223
+
224
+ except Exception as e:
225
+ logger.error(f"Error reading file {file_path}: {str(e)}")
226
+ return {
227
+ "status": "error",
228
+ "message": str(e)
229
+ }
230
+
231
+ async def write_file(self, file_path: str, content: str,
232
+ mode: str = 'w', append: bool = False) -> Dict[str, Any]:
233
+ """
234
+ Write content to a file
235
+
236
+ Args:
237
+ file_path: Path to the file
238
+ content: Content to write
239
+ mode: File open mode ('w' for write, 'x' for exclusive creation, etc.)
240
+ append: If True, append to the file instead of overwriting
241
+ """
242
+ try:
243
+ abs_path = os.path.abspath(file_path)
244
+
245
+ # Check if parent directory is in allowed directories
246
+ parent_dir = os.path.dirname(abs_path)
247
+ if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs):
248
+ return {
249
+ "status": "error",
250
+ "message": f"Writing to {abs_path} is not allowed by security policy"
251
+ }
252
+
253
+ # Check if file exists and we're not in append mode
254
+ if os.path.exists(abs_path) and 'x' in mode:
255
+ return {
256
+ "status": "error",
257
+ "message": f"File already exists: {abs_path}"
258
+ }
259
+
260
+ # Ensure parent directory exists
261
+ os.makedirs(parent_dir, exist_ok=True)
262
+
263
+ # Write content to file
264
+ with open(abs_path, 'a' if append else mode, encoding='utf-8') as f:
265
+ f.write(content)
266
+
267
+ return {
268
+ "status": "success",
269
+ "path": abs_path,
270
+ "bytes_written": len(content.encode('utf-8'))
271
+ }
272
+
273
+ except Exception as e:
274
+ logger.error(f"Error writing to file {file_path}: {str(e)}")
275
+ return {
276
+ "status": "error",
277
+ "message": str(e)
278
+ }
279
+
280
+ async def create_directory(self, dir_path: str, parents: bool = True,
281
+ exist_ok: bool = True) -> Dict[str, Any]:
282
+ """Create a directory"""
283
+ try:
284
+ abs_path = os.path.abspath(dir_path)
285
+
286
+ # Check if parent directory is in allowed directories
287
+ parent_dir = os.path.dirname(abs_path)
288
+ if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs):
289
+ return {
290
+ "status": "error",
291
+ "message": f"Creating directory in {parent_dir} is not allowed by security policy"
292
+ }
293
+
294
+ # Create directory
295
+ os.makedirs(abs_path, exist_ok=exist_ok)
296
+
297
+ return {
298
+ "status": "success",
299
+ "path": abs_path
300
+ }
301
+
302
+ except Exception as e:
303
+ logger.error(f"Error creating directory {dir_path}: {str(e)}")
304
+ return {
305
+ "status": "error",
306
+ "message": str(e)
307
+ }
308
+
309
+ async def delete_path(self, path: str, recursive: bool = False) -> Dict[str, Any]:
310
+ """
311
+ Delete a file or directory
312
+
313
+ Args:
314
+ path: Path to delete
315
+ recursive: If True, delete directory and all its contents
316
+ """
317
+ try:
318
+ abs_path = os.path.abspath(path)
319
+
320
+ # Check if path is in allowed directories
321
+ if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
322
+ return {
323
+ "status": "error",
324
+ "message": f"Deleting {abs_path} is not allowed by security policy"
325
+ }
326
+
327
+ if not os.path.exists(abs_path):
328
+ return {
329
+ "status": "error",
330
+ "message": f"Path does not exist: {abs_path}"
331
+ }
332
+
333
+ # Delete file or directory
334
+ if os.path.isfile(abs_path):
335
+ os.remove(abs_path)
336
+ elif os.path.isdir(abs_path):
337
+ if recursive:
338
+ shutil.rmtree(abs_path)
339
+ else:
340
+ os.rmdir(abs_path)
341
+ else:
342
+ return {
343
+ "status": "error",
344
+ "message": f"Path is neither a file nor a directory: {abs_path}"
345
+ }
346
+
347
+ return {
348
+ "status": "success",
349
+ "path": abs_path,
350
+ "deleted": True
351
+ }
352
+
353
+ except Exception as e:
354
+ logger.error(f"Error deleting {path}: {str(e)}")
355
+ return {
356
+ "status": "error",
357
+ "message": str(e)
358
+ }
359
+
360
+ def get_command_history(self) -> List[Dict[str, Any]]:
361
+ """Get command execution history"""
362
+ return self.command_history
363
+
364
+ # Example usage
365
+ async def example_usage():
366
+ # Initialize with default settings (restricted to current directory and home)
367
+ utils = SystemUtils()
368
+
369
+ # List current directory
370
+ print("Current directory:")
371
+ result = await utils.list_directory(".")
372
+ if result["status"] == "success":
373
+ for item in result["contents"][:5]: # Show first 5 items
374
+ print(f"- {item['name']} ({item['type']}, {item['size']} bytes)")
375
+
376
+ # Create a test directory
377
+ test_dir = "./test_dir"
378
+ await utils.create_directory(test_dir)
379
+
380
+ # Write a test file
381
+ test_file = f"{test_dir}/test.txt"
382
+ await utils.write_file(test_file, "Hello, World!")
383
+
384
+ # Read the test file
385
+ print("\nTest file content:")
386
+ result = await utils.read_file(test_file)
387
+ if result["status"] == "success":
388
+ print(result["content"])
389
+
390
+ # Execute a command
391
+ print("\nSystem information:")
392
+ result = await utils.execute_command("uname -a" if not utils.is_windows else "ver")
393
+ if result["status"] == "success":
394
+ print(result["stdout"])
395
+
396
+ # Clean up
397
+ await utils.delete_path(test_dir, recursive=True)
398
+
399
+ if __name__ == "__main__":
400
+ import asyncio
401
+ asyncio.run(example_usage())