Spaces:
Sleeping
Sleeping
File size: 4,612 Bytes
b0b150b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
"""
MEXAR Core Engine - Storage Service
Handles file uploads to Supabase Storage.
"""
import os
import logging
from typing import Optional
from pathlib import Path
import uuid
from fastapi import UploadFile, HTTPException
from supabase import create_client, Client
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StorageService:
"""Service for managing file uploads to Supabase Storage."""
def __init__(self):
"""Initialize Supabase client."""
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
self.client: Client = create_client(supabase_url, supabase_key)
logger.info("Supabase Storage client initialized")
async def upload_file(
self,
file: UploadFile,
bucket: str,
folder: str = ""
) -> dict:
"""
Upload file to Supabase Storage and return file info.
Args:
file: FastAPI UploadFile object
bucket: Bucket name (e.g., 'agent-uploads', 'chat-media')
folder: Optional folder path within bucket
Returns:
Dict containing:
- path: File path in storage
- url: Public URL (if bucket is public)
- size: File size in bytes
"""
try:
# Generate unique filename
ext = Path(file.filename).suffix
filename = f"{uuid.uuid4()}{ext}"
path = f"{folder}/{filename}" if folder else filename
# Read file content
content = await file.read()
file_size = len(content)
# Upload to Supabase
logger.info(f"Uploading file to {bucket}/{path}")
response = self.client.storage.from_(bucket).upload(
path=path,
file=content,
file_options={"content-type": file.content_type or "application/octet-stream"}
)
# Get public URL (works for public buckets)
public_url = self.client.storage.from_(bucket).get_public_url(path)
logger.info(f"File uploaded successfully: {path}")
return {
"path": path,
"url": public_url,
"size": file_size,
"bucket": bucket,
"original_filename": file.filename
}
except Exception as e:
logger.error(f"Error uploading file to Supabase Storage: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to upload file: {str(e)}"
)
def delete_file(self, bucket: str, path: str) -> bool:
"""
Delete file from storage.
Args:
bucket: Bucket name
path: File path in bucket
Returns:
True if successful
"""
try:
logger.info(f"Deleting file from {bucket}/{path}")
self.client.storage.from_(bucket).remove([path])
logger.info(f"File deleted successfully: {path}")
return True
except Exception as e:
logger.error(f"Error deleting file: {str(e)}")
return False
def get_signed_url(self, bucket: str, path: str, expires_in: int = 3600) -> str:
"""
Generate a signed URL for private files.
Args:
bucket: Bucket name
path: File path
expires_in: URL expiration time in seconds (default: 1 hour)
Returns:
Signed URL string
"""
try:
response = self.client.storage.from_(bucket).create_signed_url(
path=path,
expires_in=expires_in
)
return response.get("signedURL", "")
except Exception as e:
logger.error(f"Error generating signed URL: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to generate signed URL: {str(e)}"
)
# Factory function for easy instantiation
def create_storage_service() -> StorageService:
"""Create a new StorageService instance."""
return StorageService()
# Global instance
storage_service = create_storage_service()
|