voiceforge / backend /app /services /file_service.py
lordofgaming
Initial VoiceForge deployment (clean)
673435a
"""
File Service
Audio file management and processing
"""
import os
import uuid
import shutil
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
from datetime import datetime
from ..core.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class FileService:
"""
Service for managing audio file uploads and storage
"""
def __init__(self):
"""Initialize file service and ensure upload directory exists"""
self.upload_dir = Path(settings.upload_dir)
self.upload_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"File service initialized with upload dir: {self.upload_dir}")
def save_upload(
self,
file_content: bytes,
original_filename: str,
user_id: Optional[int] = None,
) -> Tuple[str, Dict[str, Any]]:
"""
Save an uploaded audio file
Args:
file_content: File bytes
original_filename: Original filename from upload
user_id: Optional user ID for organization
Returns:
Tuple of (storage_path, file_metadata)
"""
# Validate file extension
ext = Path(original_filename).suffix.lower()
if ext.lstrip('.') not in settings.supported_audio_formats_list:
raise ValueError(f"Unsupported audio format: {ext}")
# Validate file size
file_size = len(file_content)
max_size = settings.max_upload_size_mb * 1024 * 1024
if file_size > max_size:
raise ValueError(f"File too large: {file_size / 1024 / 1024:.1f}MB (max {settings.max_upload_size_mb}MB)")
# Generate unique filename
unique_id = str(uuid.uuid4())
date_prefix = datetime.now().strftime("%Y/%m/%d")
# Create subdirectory for user or general
if user_id:
subdir = self.upload_dir / f"user_{user_id}" / date_prefix
else:
subdir = self.upload_dir / "anonymous" / date_prefix
subdir.mkdir(parents=True, exist_ok=True)
# Save file
filename = f"{unique_id}{ext}"
storage_path = subdir / filename
with open(storage_path, "wb") as f:
f.write(file_content)
logger.info(f"Saved upload: {original_filename} -> {storage_path}")
# Get file metadata
metadata = self._get_file_metadata(storage_path)
metadata["original_filename"] = original_filename
metadata["file_size"] = file_size
return str(storage_path), metadata
def get_file(self, storage_path: str) -> Optional[bytes]:
"""
Get file content by storage path
Args:
storage_path: Path to stored file
Returns:
File bytes or None if not found
"""
path = Path(storage_path)
if not path.exists():
logger.warning(f"File not found: {storage_path}")
return None
with open(path, "rb") as f:
return f.read()
def delete_file(self, storage_path: str) -> bool:
"""
Delete a stored file
Args:
storage_path: Path to stored file
Returns:
True if deleted, False if not found
"""
path = Path(storage_path)
if not path.exists():
return False
try:
path.unlink()
logger.info(f"Deleted file: {storage_path}")
return True
except Exception as e:
logger.error(f"Failed to delete file: {e}")
return False
def _get_file_metadata(self, file_path: Path) -> Dict[str, Any]:
"""
Get metadata for an audio file
Uses ffprobe if available, otherwise basic info
Args:
file_path: Path to audio file
Returns:
Dict with file metadata
"""
ext = file_path.suffix.lower().lstrip('.')
metadata = {
"format": ext,
"storage_path": str(file_path),
}
# Try to get audio metadata using ffprobe
try:
import subprocess
import json
result = subprocess.run(
[
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
str(file_path)
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
probe_data = json.loads(result.stdout)
# Extract format info
if "format" in probe_data:
fmt = probe_data["format"]
metadata["duration"] = float(fmt.get("duration", 0))
metadata["bit_rate"] = int(fmt.get("bit_rate", 0))
# Extract stream info
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "audio":
metadata["sample_rate"] = int(stream.get("sample_rate", 0))
metadata["channels"] = int(stream.get("channels", 0))
metadata["codec"] = stream.get("codec_name", "")
break
logger.debug(f"Extracted metadata via ffprobe: {metadata}")
except FileNotFoundError:
logger.debug("ffprobe not available, using basic metadata")
except Exception as e:
logger.warning(f"Failed to extract metadata: {e}")
return metadata
def cleanup_temp_files(self, max_age_hours: int = 24) -> int:
"""
Clean up old temporary/anonymous files
Args:
max_age_hours: Delete files older than this
Returns:
Number of files deleted
"""
deleted = 0
anonymous_dir = self.upload_dir / "anonymous"
if not anonymous_dir.exists():
return 0
cutoff = datetime.now().timestamp() - (max_age_hours * 3600)
for file_path in anonymous_dir.rglob("*"):
if file_path.is_file() and file_path.stat().st_mtime < cutoff:
try:
file_path.unlink()
deleted += 1
except Exception as e:
logger.error(f"Failed to delete {file_path}: {e}")
if deleted:
logger.info(f"Cleaned up {deleted} old temporary files")
return deleted
# Singleton instance
_file_service: Optional[FileService] = None
def get_file_service() -> FileService:
"""Get singleton file service instance"""
global _file_service
if _file_service is None:
_file_service = FileService()
return _file_service