Spaces:
Sleeping
Sleeping
| """File management for code storage.""" | |
| from __future__ import annotations | |
| import logging | |
| import uuid | |
| from pathlib import Path | |
| from app.config import Settings, get_settings | |
| from app.models import FileResult | |
| logger = logging.getLogger(__name__) | |
| class FileManager: | |
| """Manages code files in the storage directory.""" | |
| def __init__(self, settings: Settings | None = None): | |
| self._settings = settings or get_settings() | |
| self._storage = Path(self._settings.code_storage_dir) | |
| self._storage.mkdir(parents=True, exist_ok=True) | |
| def _generate_filename(self, base: str | None = None) -> str: | |
| """Generate a unique filename.""" | |
| suffix = uuid.uuid4().hex[:8] | |
| base = base or "code" | |
| # Sanitize | |
| safe_base = "".join(c for c in base if c.isalnum() or c in ("_", "-"))[:100] | |
| if not safe_base: | |
| safe_base = "code" | |
| return f"{safe_base}_{suffix}.py" | |
| def _validate_path(self, file_path: str) -> Path | None: | |
| """Validate that a file path is within the storage directory.""" | |
| try: | |
| path = Path(file_path).resolve() | |
| storage = self._storage.resolve() | |
| if not str(path).startswith(str(storage)): | |
| return None | |
| return path | |
| except (ValueError, OSError): | |
| return None | |
| def create_file(self, content: str, filename: str | None = None) -> FileResult: | |
| """Create a new Python file with content.""" | |
| try: | |
| if len(content.encode("utf-8")) > self._settings.max_file_size: | |
| return FileResult( | |
| success=False, | |
| message=f"Content exceeds maximum file size of {self._settings.max_file_size} bytes", | |
| ) | |
| name = self._generate_filename(filename) | |
| file_path = self._storage / name | |
| file_path.write_text(content, encoding="utf-8") | |
| logger.info("Created file: %s", file_path) | |
| return FileResult( | |
| success=True, | |
| file_path=str(file_path), | |
| message=f"File created: {name}", | |
| ) | |
| except Exception as e: | |
| logger.exception("Error creating file") | |
| return FileResult(success=False, message=f"Error creating file: {str(e)}") | |
| def append_to_file(self, file_path: str, content: str) -> FileResult: | |
| """Append content to an existing file.""" | |
| try: | |
| path = self._validate_path(file_path) | |
| if path is None: | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message="Invalid file path: must be within code storage directory", | |
| ) | |
| if not path.exists(): | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message=f"File not found: {file_path}", | |
| ) | |
| current_size = path.stat().st_size | |
| append_size = len(content.encode("utf-8")) | |
| if current_size + append_size > self._settings.max_file_size: | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message=f"Appending would exceed maximum file size of {self._settings.max_file_size} bytes", | |
| ) | |
| with path.open("a", encoding="utf-8") as f: | |
| f.write(content) | |
| logger.info("Appended to file: %s (%d bytes)", file_path, append_size) | |
| return FileResult( | |
| success=True, | |
| file_path=file_path, | |
| message=f"Appended {append_size} bytes to file", | |
| ) | |
| except Exception as e: | |
| logger.exception("Error appending to file") | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message=f"Error appending to file: {str(e)}", | |
| ) | |
| def read_file(self, file_path: str) -> FileResult: | |
| """Read content of an existing file.""" | |
| try: | |
| path = self._validate_path(file_path) | |
| if path is None: | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message="Invalid file path: must be within code storage directory", | |
| ) | |
| if not path.exists(): | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message=f"File not found: {file_path}", | |
| ) | |
| content = path.read_text(encoding="utf-8") | |
| return FileResult( | |
| success=True, | |
| file_path=file_path, | |
| message="File read successfully", | |
| content=content, | |
| ) | |
| except Exception as e: | |
| logger.exception("Error reading file") | |
| return FileResult( | |
| success=False, | |
| file_path=file_path, | |
| message=f"Error reading file: {str(e)}", | |
| ) |