Spaces:
Sleeping
Sleeping
| """ | |
| File Service | |
| Audio file management and processing | |
| """ | |
| import os | |
| import uuid | |
| import shutil | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Tuple, Dict, Any | |
| from datetime import datetime | |
| from ..core.config import get_settings | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| class FileService: | |
| """ | |
| Service for managing audio file uploads and storage | |
| """ | |
| def __init__(self): | |
| """Initialize file service and ensure upload directory exists""" | |
| self.upload_dir = Path(settings.upload_dir) | |
| self.upload_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"File service initialized with upload dir: {self.upload_dir}") | |
| def save_upload( | |
| self, | |
| file_content: bytes, | |
| original_filename: str, | |
| user_id: Optional[int] = None, | |
| ) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Save an uploaded audio file | |
| Args: | |
| file_content: File bytes | |
| original_filename: Original filename from upload | |
| user_id: Optional user ID for organization | |
| Returns: | |
| Tuple of (storage_path, file_metadata) | |
| """ | |
| # Validate file extension | |
| ext = Path(original_filename).suffix.lower() | |
| if ext.lstrip('.') not in settings.supported_audio_formats_list: | |
| raise ValueError(f"Unsupported audio format: {ext}") | |
| # Validate file size | |
| file_size = len(file_content) | |
| max_size = settings.max_upload_size_mb * 1024 * 1024 | |
| if file_size > max_size: | |
| raise ValueError(f"File too large: {file_size / 1024 / 1024:.1f}MB (max {settings.max_upload_size_mb}MB)") | |
| # Generate unique filename | |
| unique_id = str(uuid.uuid4()) | |
| date_prefix = datetime.now().strftime("%Y/%m/%d") | |
| # Create subdirectory for user or general | |
| if user_id: | |
| subdir = self.upload_dir / f"user_{user_id}" / date_prefix | |
| else: | |
| subdir = self.upload_dir / "anonymous" / date_prefix | |
| subdir.mkdir(parents=True, exist_ok=True) | |
| # Save file | |
| filename = f"{unique_id}{ext}" | |
| storage_path = subdir / filename | |
| with open(storage_path, "wb") as f: | |
| f.write(file_content) | |
| logger.info(f"Saved upload: {original_filename} -> {storage_path}") | |
| # Get file metadata | |
| metadata = self._get_file_metadata(storage_path) | |
| metadata["original_filename"] = original_filename | |
| metadata["file_size"] = file_size | |
| return str(storage_path), metadata | |
| def get_file(self, storage_path: str) -> Optional[bytes]: | |
| """ | |
| Get file content by storage path | |
| Args: | |
| storage_path: Path to stored file | |
| Returns: | |
| File bytes or None if not found | |
| """ | |
| path = Path(storage_path) | |
| if not path.exists(): | |
| logger.warning(f"File not found: {storage_path}") | |
| return None | |
| with open(path, "rb") as f: | |
| return f.read() | |
| def delete_file(self, storage_path: str) -> bool: | |
| """ | |
| Delete a stored file | |
| Args: | |
| storage_path: Path to stored file | |
| Returns: | |
| True if deleted, False if not found | |
| """ | |
| path = Path(storage_path) | |
| if not path.exists(): | |
| return False | |
| try: | |
| path.unlink() | |
| logger.info(f"Deleted file: {storage_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete file: {e}") | |
| return False | |
| def _get_file_metadata(self, file_path: Path) -> Dict[str, Any]: | |
| """ | |
| Get metadata for an audio file | |
| Uses ffprobe if available, otherwise basic info | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| Dict with file metadata | |
| """ | |
| ext = file_path.suffix.lower().lstrip('.') | |
| metadata = { | |
| "format": ext, | |
| "storage_path": str(file_path), | |
| } | |
| # Try to get audio metadata using ffprobe | |
| try: | |
| import subprocess | |
| import json | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", "quiet", | |
| "-print_format", "json", | |
| "-show_format", | |
| "-show_streams", | |
| str(file_path) | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| if result.returncode == 0: | |
| probe_data = json.loads(result.stdout) | |
| # Extract format info | |
| if "format" in probe_data: | |
| fmt = probe_data["format"] | |
| metadata["duration"] = float(fmt.get("duration", 0)) | |
| metadata["bit_rate"] = int(fmt.get("bit_rate", 0)) | |
| # Extract stream info | |
| for stream in probe_data.get("streams", []): | |
| if stream.get("codec_type") == "audio": | |
| metadata["sample_rate"] = int(stream.get("sample_rate", 0)) | |
| metadata["channels"] = int(stream.get("channels", 0)) | |
| metadata["codec"] = stream.get("codec_name", "") | |
| break | |
| logger.debug(f"Extracted metadata via ffprobe: {metadata}") | |
| except FileNotFoundError: | |
| logger.debug("ffprobe not available, using basic metadata") | |
| except Exception as e: | |
| logger.warning(f"Failed to extract metadata: {e}") | |
| return metadata | |
| def cleanup_temp_files(self, max_age_hours: int = 24) -> int: | |
| """ | |
| Clean up old temporary/anonymous files | |
| Args: | |
| max_age_hours: Delete files older than this | |
| Returns: | |
| Number of files deleted | |
| """ | |
| deleted = 0 | |
| anonymous_dir = self.upload_dir / "anonymous" | |
| if not anonymous_dir.exists(): | |
| return 0 | |
| cutoff = datetime.now().timestamp() - (max_age_hours * 3600) | |
| for file_path in anonymous_dir.rglob("*"): | |
| if file_path.is_file() and file_path.stat().st_mtime < cutoff: | |
| try: | |
| file_path.unlink() | |
| deleted += 1 | |
| except Exception as e: | |
| logger.error(f"Failed to delete {file_path}: {e}") | |
| if deleted: | |
| logger.info(f"Cleaned up {deleted} old temporary files") | |
| return deleted | |
| # Singleton instance | |
| _file_service: Optional[FileService] = None | |
| def get_file_service() -> FileService: | |
| """Get singleton file service instance""" | |
| global _file_service | |
| if _file_service is None: | |
| _file_service = FileService() | |
| return _file_service | |