Spaces:
Sleeping
Sleeping
| """ | |
| Supabase Integration | |
| Handles Supabase Storage for document uploads (PDFs, DOCX, etc.) | |
| """ | |
| from fastapi import UploadFile, HTTPException | |
| from app.config import settings | |
| from supabase import create_client, Client | |
| from typing import Dict, Any | |
| import logging | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class SupabaseStorageService: | |
| """Service for uploading files to Supabase Storage""" | |
| def get_client() -> Client: | |
| """Get Supabase client""" | |
| return create_client(settings.SUPABASE_URL, settings.SUPABASE_SERVICE_KEY) | |
| def get_bucket_name(entity_type: str) -> str: | |
| """ | |
| Get Supabase storage bucket based on entity type | |
| Bucket structure: | |
| - documents-users - User documents (IDs, contracts, etc.) | |
| - documents-tickets - Ticket documents | |
| - documents-projects - Project documents | |
| - documents-general - Everything else | |
| """ | |
| bucket_map = { | |
| 'user': 'documents-users', | |
| 'ticket': 'documents-tickets', | |
| 'project': 'documents-projects', | |
| 'client': 'documents-clients', | |
| 'contractor': 'documents-contractors' | |
| } | |
| return bucket_map.get(entity_type, 'documents-general') | |
| def get_file_path(entity_type: str, entity_id: str, file_name: str) -> str: | |
| """ | |
| Generate file path in bucket | |
| Format: {entity_type}/{entity_id}/{filename} | |
| Note: filename should already be contextual and unique from FileNamingService | |
| """ | |
| return f"{entity_type}/{entity_id}/{file_name}" | |
| async def upload( | |
| file: UploadFile, | |
| entity_type: str, | |
| entity_id: str, | |
| document_type: str, | |
| contextual_filename: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Upload file to Supabase Storage | |
| Args: | |
| file: The file to upload | |
| entity_type: Type of entity (user, ticket, etc.) | |
| entity_id: ID of the entity | |
| document_type: Type of document | |
| contextual_filename: Optional contextual filename to use instead of original | |
| Returns: | |
| Dict containing upload response with public URL | |
| Raises: | |
| HTTPException: If upload fails | |
| """ | |
| try: | |
| client = SupabaseStorageService.get_client() | |
| # Read file content | |
| file_content = await file.read() | |
| # Use contextual filename if provided, otherwise use original | |
| filename_to_use = contextual_filename if contextual_filename else file.filename | |
| # Get bucket and file path | |
| bucket_name = SupabaseStorageService.get_bucket_name(entity_type) | |
| file_path = SupabaseStorageService.get_file_path(entity_type, entity_id, filename_to_use) | |
| # Upload to Supabase Storage | |
| response = client.storage.from_(bucket_name).upload( | |
| path=file_path, | |
| file=file_content, | |
| file_options={ | |
| "content-type": file.content_type, | |
| "upsert": "false" # Don't overwrite existing files | |
| } | |
| ) | |
| # Store permanent reference (bucket + path) | |
| # Don't store signed URLs as they expire | |
| # Instead, store a reference URL that can be used to generate signed URLs on-demand | |
| permanent_url = f"supabase://{bucket_name}/{file_path}" | |
| logger.info(f"Uploaded file to Supabase Storage: {permanent_url}") | |
| return { | |
| 'public_url': permanent_url, # Permanent reference, not a signed URL | |
| 'bucket': bucket_name, | |
| 'path': file_path, | |
| 'size': len(file_content), | |
| 'content_type': file.content_type, | |
| 'storage_type': 'supabase' | |
| } | |
| except Exception as e: | |
| logger.error(f"Supabase Storage upload failed: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to upload file to Supabase Storage: {str(e)}" | |
| ) | |
| def get_signed_url(bucket: str, file_path: str, expires_in: int = 3600) -> str: | |
| """ | |
| Generate a signed URL for accessing a private file | |
| Args: | |
| bucket: Bucket name | |
| file_path: Path to file in bucket | |
| expires_in: URL validity in seconds (default: 1 hour) | |
| Returns: | |
| Signed URL string | |
| """ | |
| try: | |
| client = SupabaseStorageService.get_client() | |
| response = client.storage.from_(bucket).create_signed_url(file_path, expires_in) | |
| if isinstance(response, dict) and 'signedURL' in response: | |
| return response['signedURL'] | |
| elif hasattr(response, 'signed_url'): | |
| return response.signed_url | |
| else: | |
| # Fallback: return authenticated URL | |
| return f"{settings.SUPABASE_URL}/storage/v1/object/authenticated/{bucket}/{file_path}" | |
| except Exception as e: | |
| logger.error(f"Failed to generate signed URL: {str(e)}") | |
| # Return authenticated URL as fallback | |
| return f"{settings.SUPABASE_URL}/storage/v1/object/authenticated/{bucket}/{file_path}" | |
| def delete(bucket: str, file_path: str) -> bool: | |
| """ | |
| Delete file from Supabase Storage | |
| Args: | |
| bucket: Bucket name | |
| file_path: Path to file in bucket | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| client = SupabaseStorageService.get_client() | |
| client.storage.from_(bucket).remove([file_path]) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Supabase Storage delete failed: {str(e)}") | |
| return False | |