"""File management for code storage.""" from __future__ import annotations import logging import uuid from pathlib import Path from app.config import Settings, get_settings from app.models import FileResult logger = logging.getLogger(__name__) class FileManager: """Manages code files in the storage directory.""" def __init__(self, settings: Settings | None = None): self._settings = settings or get_settings() self._storage = Path(self._settings.code_storage_dir) self._storage.mkdir(parents=True, exist_ok=True) def _generate_filename(self, base: str | None = None) -> str: """Generate a unique filename.""" suffix = uuid.uuid4().hex[:8] base = base or "code" # Sanitize safe_base = "".join(c for c in base if c.isalnum() or c in ("_", "-"))[:100] if not safe_base: safe_base = "code" return f"{safe_base}_{suffix}.py" def _validate_path(self, file_path: str) -> Path | None: """Validate that a file path is within the storage directory.""" try: path = Path(file_path).resolve() storage = self._storage.resolve() if not str(path).startswith(str(storage)): return None return path except (ValueError, OSError): return None def create_file(self, content: str, filename: str | None = None) -> FileResult: """Create a new Python file with content.""" try: if len(content.encode("utf-8")) > self._settings.max_file_size: return FileResult( success=False, message=f"Content exceeds maximum file size of {self._settings.max_file_size} bytes", ) name = self._generate_filename(filename) file_path = self._storage / name file_path.write_text(content, encoding="utf-8") logger.info("Created file: %s", file_path) return FileResult( success=True, file_path=str(file_path), message=f"File created: {name}", ) except Exception as e: logger.exception("Error creating file") return FileResult(success=False, message=f"Error creating file: {str(e)}") def append_to_file(self, file_path: str, content: str) -> FileResult: """Append content to an existing file.""" try: path = self._validate_path(file_path) if path is None: return FileResult( success=False, file_path=file_path, message="Invalid file path: must be within code storage directory", ) if not path.exists(): return FileResult( success=False, file_path=file_path, message=f"File not found: {file_path}", ) current_size = path.stat().st_size append_size = len(content.encode("utf-8")) if current_size + append_size > self._settings.max_file_size: return FileResult( success=False, file_path=file_path, message=f"Appending would exceed maximum file size of {self._settings.max_file_size} bytes", ) with path.open("a", encoding="utf-8") as f: f.write(content) logger.info("Appended to file: %s (%d bytes)", file_path, append_size) return FileResult( success=True, file_path=file_path, message=f"Appended {append_size} bytes to file", ) except Exception as e: logger.exception("Error appending to file") return FileResult( success=False, file_path=file_path, message=f"Error appending to file: {str(e)}", ) def read_file(self, file_path: str) -> FileResult: """Read content of an existing file.""" try: path = self._validate_path(file_path) if path is None: return FileResult( success=False, file_path=file_path, message="Invalid file path: must be within code storage directory", ) if not path.exists(): return FileResult( success=False, file_path=file_path, message=f"File not found: {file_path}", ) content = path.read_text(encoding="utf-8") return FileResult( success=True, file_path=file_path, message="File read successfully", content=content, ) except Exception as e: logger.exception("Error reading file") return FileResult( success=False, file_path=file_path, message=f"Error reading file: {str(e)}", )