codeMcp / app /file_manager.py
sarveshpatel's picture
Create app/file_manager.py
d54d7d2 verified
"""File management for code storage."""
from __future__ import annotations
import logging
import uuid
from pathlib import Path
from app.config import Settings, get_settings
from app.models import FileResult
logger = logging.getLogger(__name__)
class FileManager:
"""Manages code files in the storage directory."""
def __init__(self, settings: Settings | None = None):
self._settings = settings or get_settings()
self._storage = Path(self._settings.code_storage_dir)
self._storage.mkdir(parents=True, exist_ok=True)
def _generate_filename(self, base: str | None = None) -> str:
"""Generate a unique filename."""
suffix = uuid.uuid4().hex[:8]
base = base or "code"
# Sanitize
safe_base = "".join(c for c in base if c.isalnum() or c in ("_", "-"))[:100]
if not safe_base:
safe_base = "code"
return f"{safe_base}_{suffix}.py"
def _validate_path(self, file_path: str) -> Path | None:
"""Validate that a file path is within the storage directory."""
try:
path = Path(file_path).resolve()
storage = self._storage.resolve()
if not str(path).startswith(str(storage)):
return None
return path
except (ValueError, OSError):
return None
def create_file(self, content: str, filename: str | None = None) -> FileResult:
"""Create a new Python file with content."""
try:
if len(content.encode("utf-8")) > self._settings.max_file_size:
return FileResult(
success=False,
message=f"Content exceeds maximum file size of {self._settings.max_file_size} bytes",
)
name = self._generate_filename(filename)
file_path = self._storage / name
file_path.write_text(content, encoding="utf-8")
logger.info("Created file: %s", file_path)
return FileResult(
success=True,
file_path=str(file_path),
message=f"File created: {name}",
)
except Exception as e:
logger.exception("Error creating file")
return FileResult(success=False, message=f"Error creating file: {str(e)}")
def append_to_file(self, file_path: str, content: str) -> FileResult:
"""Append content to an existing file."""
try:
path = self._validate_path(file_path)
if path is None:
return FileResult(
success=False,
file_path=file_path,
message="Invalid file path: must be within code storage directory",
)
if not path.exists():
return FileResult(
success=False,
file_path=file_path,
message=f"File not found: {file_path}",
)
current_size = path.stat().st_size
append_size = len(content.encode("utf-8"))
if current_size + append_size > self._settings.max_file_size:
return FileResult(
success=False,
file_path=file_path,
message=f"Appending would exceed maximum file size of {self._settings.max_file_size} bytes",
)
with path.open("a", encoding="utf-8") as f:
f.write(content)
logger.info("Appended to file: %s (%d bytes)", file_path, append_size)
return FileResult(
success=True,
file_path=file_path,
message=f"Appended {append_size} bytes to file",
)
except Exception as e:
logger.exception("Error appending to file")
return FileResult(
success=False,
file_path=file_path,
message=f"Error appending to file: {str(e)}",
)
def read_file(self, file_path: str) -> FileResult:
"""Read content of an existing file."""
try:
path = self._validate_path(file_path)
if path is None:
return FileResult(
success=False,
file_path=file_path,
message="Invalid file path: must be within code storage directory",
)
if not path.exists():
return FileResult(
success=False,
file_path=file_path,
message=f"File not found: {file_path}",
)
content = path.read_text(encoding="utf-8")
return FileResult(
success=True,
file_path=file_path,
message="File read successfully",
content=content,
)
except Exception as e:
logger.exception("Error reading file")
return FileResult(
success=False,
file_path=file_path,
message=f"Error reading file: {str(e)}",
)