Spaces:
Running
Running
| """ | |
| MEXAR Core Engine - Storage Service | |
| Handles file uploads to Supabase Storage. | |
| """ | |
| import os | |
| import logging | |
| from typing import Optional | |
| from pathlib import Path | |
| import uuid | |
| from fastapi import UploadFile, HTTPException | |
| from supabase import create_client, Client | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class StorageService: | |
| """Service for managing file uploads to Supabase Storage.""" | |
| def __init__(self): | |
| """Initialize Supabase client.""" | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_KEY") | |
| if not supabase_url or not supabase_key: | |
| raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables") | |
| self.client: Client = create_client(supabase_url, supabase_key) | |
| logger.info("Supabase Storage client initialized") | |
| async def upload_file( | |
| self, | |
| file: UploadFile, | |
| bucket: str, | |
| folder: str = "" | |
| ) -> dict: | |
| """ | |
| Upload file to Supabase Storage and return file info. | |
| Args: | |
| file: FastAPI UploadFile object | |
| bucket: Bucket name (e.g., 'agent-uploads', 'chat-media') | |
| folder: Optional folder path within bucket | |
| Returns: | |
| Dict containing: | |
| - path: File path in storage | |
| - url: Public URL (if bucket is public) | |
| - size: File size in bytes | |
| """ | |
| try: | |
| # Generate unique filename | |
| ext = Path(file.filename).suffix | |
| filename = f"{uuid.uuid4()}{ext}" | |
| path = f"{folder}/{filename}" if folder else filename | |
| # Read file content | |
| content = await file.read() | |
| file_size = len(content) | |
| # Upload to Supabase | |
| logger.info(f"Uploading file to {bucket}/{path}") | |
| response = self.client.storage.from_(bucket).upload( | |
| path=path, | |
| file=content, | |
| file_options={"content-type": file.content_type or "application/octet-stream"} | |
| ) | |
| # Get public URL (works for public buckets) | |
| public_url = self.client.storage.from_(bucket).get_public_url(path) | |
| logger.info(f"File uploaded successfully: {path}") | |
| return { | |
| "path": path, | |
| "url": public_url, | |
| "size": file_size, | |
| "bucket": bucket, | |
| "original_filename": file.filename | |
| } | |
| except Exception as e: | |
| logger.error(f"Error uploading file to Supabase Storage: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to upload file: {str(e)}" | |
| ) | |
| def delete_file(self, bucket: str, path: str) -> bool: | |
| """ | |
| Delete file from storage. | |
| Args: | |
| bucket: Bucket name | |
| path: File path in bucket | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| logger.info(f"Deleting file from {bucket}/{path}") | |
| self.client.storage.from_(bucket).remove([path]) | |
| logger.info(f"File deleted successfully: {path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting file: {str(e)}") | |
| return False | |
| def get_signed_url(self, bucket: str, path: str, expires_in: int = 3600) -> str: | |
| """ | |
| Generate a signed URL for private files. | |
| Args: | |
| bucket: Bucket name | |
| path: File path | |
| expires_in: URL expiration time in seconds (default: 1 hour) | |
| Returns: | |
| Signed URL string | |
| """ | |
| try: | |
| response = self.client.storage.from_(bucket).create_signed_url( | |
| path=path, | |
| expires_in=expires_in | |
| ) | |
| return response.get("signedURL", "") | |
| except Exception as e: | |
| logger.error(f"Error generating signed URL: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to generate signed URL: {str(e)}" | |
| ) | |
| # Factory function for easy instantiation | |
| def create_storage_service() -> StorageService: | |
| """Create a new StorageService instance.""" | |
| return StorageService() | |
| # Global instance | |
| storage_service = create_storage_service() | |