Spaces:
Runtime error
Runtime error
| """ | |
| Supabase file storage implementation for TreeTrack | |
| Handles images and audio uploads to private Supabase Storage buckets | |
| """ | |
| import os | |
| import uuid | |
| import logging | |
| import asyncio | |
| from typing import Dict, Any, Optional, List | |
| from pathlib import Path | |
| from supabase_client import get_supabase_client | |
| from config import get_settings | |
| logger = logging.getLogger(__name__) | |
| class SupabaseFileStorage: | |
| """Supabase Storage implementation for file uploads""" | |
| def __init__(self): | |
| try: | |
| self.client = get_supabase_client() | |
| self.connected = True | |
| logger.info("SupabaseFileStorage initialized") | |
| except ValueError: | |
| self.client = None | |
| self.connected = False | |
| logger.warning("SupabaseFileStorage not configured") | |
| # Use configured bucket names from environment (fallback to defaults in config) | |
| settings = get_settings() | |
| self.image_bucket = settings.supabase.image_bucket or "tree-images" | |
| self.audio_bucket = settings.supabase.audio_bucket or "tree-audios" | |
| logger.info(f"Using storage buckets - images: {self.image_bucket}, audio: {self.audio_bucket}") | |
| def upload_image(self, file_data: bytes, filename: str, category: str) -> Dict[str, Any]: | |
| """Upload image to Supabase Storage""" | |
| try: | |
| # Generate unique filename | |
| file_id = str(uuid.uuid4()) | |
| file_extension = Path(filename).suffix.lower() | |
| unique_filename = f"{category.lower()}/{file_id}{file_extension}" | |
| # Upload to Supabase Storage | |
| result = self.client.storage.from_(self.image_bucket).upload( | |
| unique_filename, file_data | |
| ) | |
| if result: | |
| logger.info(f"Image uploaded successfully: {unique_filename}") | |
| return { | |
| "success": True, | |
| "filename": unique_filename, | |
| "bucket": self.image_bucket, | |
| "category": category, | |
| "size": len(file_data), | |
| "file_id": file_id | |
| } | |
| else: | |
| raise Exception("Upload failed") | |
| except Exception as e: | |
| logger.error(f"Error uploading image {filename}: {e}") | |
| raise Exception(f"Failed to upload image: {str(e)}") | |
| def upload_audio(self, file_data: bytes, filename: str) -> Dict[str, Any]: | |
| """Upload audio to Supabase Storage""" | |
| try: | |
| # Generate unique filename | |
| file_id = str(uuid.uuid4()) | |
| file_extension = Path(filename).suffix.lower() | |
| unique_filename = f"audio/{file_id}{file_extension}" | |
| # Upload to Supabase Storage | |
| result = self.client.storage.from_(self.audio_bucket).upload( | |
| unique_filename, file_data | |
| ) | |
| if result: | |
| logger.info(f"Audio uploaded successfully: {unique_filename}") | |
| return { | |
| "success": True, | |
| "filename": unique_filename, | |
| "bucket": self.audio_bucket, | |
| "size": len(file_data), | |
| "file_id": file_id | |
| } | |
| else: | |
| raise Exception("Upload failed") | |
| except Exception as e: | |
| logger.error(f"Error uploading audio {filename}: {e}") | |
| raise Exception(f"Failed to upload audio: {str(e)}") | |
| def get_signed_url(self, bucket_name: str, file_path: str, expires_in: int = 3600) -> str: | |
| """Generate signed URL for private file access""" | |
| try: | |
| result = self.client.storage.from_(bucket_name).create_signed_url( | |
| file_path, expires_in | |
| ) | |
| if result and 'signedURL' in result: | |
| return result['signedURL'] | |
| else: | |
| raise Exception("Failed to generate signed URL") | |
| except Exception as e: | |
| logger.error(f"Error generating signed URL for {file_path}: {e}") | |
| raise Exception(f"Failed to generate file URL: {str(e)}") | |
| def get_image_url(self, image_path: str, expires_in: int = 3600) -> str: | |
| """Get signed URL for image""" | |
| return self.get_signed_url(self.image_bucket, image_path, expires_in) | |
| def get_audio_url(self, audio_path: str, expires_in: int = 3600) -> str: | |
| """Get signed URL for audio""" | |
| return self.get_signed_url(self.audio_bucket, audio_path, expires_in) | |
| def delete_file(self, bucket_name: str, file_path: str) -> bool: | |
| """Delete file from Supabase Storage""" | |
| try: | |
| result = self.client.storage.from_(bucket_name).remove([file_path]) | |
| logger.info(f"File deleted: {bucket_name}/{file_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting file {file_path}: {e}") | |
| return False | |
| def delete_image(self, image_path: str) -> bool: | |
| """Delete image from storage""" | |
| return self.delete_file(self.image_bucket, image_path) | |
| def delete_audio(self, audio_path: str) -> bool: | |
| """Delete audio from storage""" | |
| return self.delete_file(self.audio_bucket, audio_path) | |
| def process_tree_files(self, tree_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process tree data to add signed URLs for files with better error handling""" | |
| processed_data = tree_data.copy() | |
| try: | |
| # Process photographs with better error handling | |
| if processed_data.get('photographs'): | |
| photos = processed_data['photographs'] | |
| if isinstance(photos, dict): | |
| # Create a copy of items to avoid dictionary size change during iteration | |
| photo_items = list(photos.items()) | |
| for category, file_path in photo_items: | |
| if file_path and file_path.strip(): # Check for valid path | |
| try: | |
| url = self.get_image_url(file_path, expires_in=7200) # 2 hours | |
| photos[f"{category}_url"] = url | |
| except Exception as e: | |
| logger.warning(f"Failed to generate URL for photo {file_path}: {e}") | |
| # Set placeholder or None instead of breaking | |
| photos[f"{category}_url"] = None | |
| # Process storytelling audio | |
| if processed_data.get('storytelling_audio'): | |
| audio_path = processed_data['storytelling_audio'] | |
| if audio_path and audio_path.strip(): # Check for valid path | |
| try: | |
| processed_data['storytelling_audio_url'] = self.get_audio_url(audio_path, expires_in=7200) | |
| except Exception as e: | |
| logger.warning(f"Failed to generate URL for audio {audio_path}: {e}") | |
| processed_data['storytelling_audio_url'] = None | |
| return processed_data | |
| except Exception as e: | |
| logger.error(f"Error processing tree files: {e}") | |
| return processed_data | |
| def process_multiple_trees(self, trees_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Process multiple trees efficiently with batch operations""" | |
| processed_trees = [] | |
| for tree_data in trees_data: | |
| try: | |
| processed_tree = self.process_tree_files(tree_data) | |
| processed_trees.append(processed_tree) | |
| except Exception as e: | |
| logger.error(f"Error processing tree {tree_data.get('id', 'unknown')}: {e}") | |
| # Add original tree data without URLs on error | |
| processed_trees.append(tree_data) | |
| return processed_trees | |