Spaces:
Sleeping
Sleeping
| """ | |
| Storage abstraction for serving RAG images in production. | |
| Supports MinIO (S3-compatible) and Supabase Storage. | |
| When storage is disabled/unavailable, the RAG chain falls back to local /outputs. | |
| """ | |
| import io | |
| from datetime import timedelta | |
| from typing import Any, Optional | |
| from src.config.settings import settings | |
| def _using_supabase() -> bool: | |
| return getattr(settings, "storage_provider", "minio").lower() == "supabase" | |
| def get_active_bucket_name() -> str: | |
| """Bucket name for metadata / logging (matches active provider).""" | |
| if _using_supabase(): | |
| return settings.supabase_bucket | |
| return settings.minio_bucket | |
| def _minio_client(): | |
| if not getattr(settings, "minio_enabled", True): | |
| print("MinIO disabled") | |
| return None | |
| try: | |
| from minio import Minio | |
| client = Minio( | |
| settings.minio_endpoint, | |
| access_key=settings.minio_access_key, | |
| secret_key=settings.minio_secret_key, | |
| secure=settings.minio_secure, | |
| ) | |
| print("MinIO client created:", settings.minio_endpoint) | |
| return client | |
| except Exception as e: | |
| print("MinIO client creation error:", e) | |
| return None | |
| def _supabase_client(): | |
| url = getattr(settings, "supabase_url", "") | |
| key = getattr(settings, "supabase_service_role_key", "") | |
| print("SUPABASE URL:", url) | |
| if not url or not key: | |
| print("Supabase config missing (SUPABASE_URL/SUPABASE_SERVICE_ROLE_KEY)") | |
| return None | |
| try: | |
| from supabase import create_client | |
| return create_client(url, key) | |
| except Exception as e: | |
| print("Supabase client creation error:", e) | |
| return None | |
| def _bucket_name_from_item(bucket: Any) -> Optional[str]: | |
| if isinstance(bucket, dict): | |
| return bucket.get("name") | |
| name = getattr(bucket, "name", None) | |
| if isinstance(name, str): | |
| return name | |
| return None | |
| def ensure_bucket() -> bool: | |
| """Ensure storage bucket is usable for the active provider.""" | |
| if _using_supabase(): | |
| # Supabase bucket is expected to be pre-created in dashboard. | |
| client = _supabase_client() | |
| if not client: | |
| return False | |
| try: | |
| buckets = client.storage.list_buckets() or [] | |
| names = [] | |
| for b in buckets: | |
| n = _bucket_name_from_item(b) | |
| if n: | |
| names.append(n) | |
| return settings.supabase_bucket in names | |
| except Exception as e: | |
| print("Supabase bucket check error:", e) | |
| return False | |
| client = _minio_client() | |
| if not client: | |
| return False | |
| try: | |
| if not client.bucket_exists(settings.minio_bucket): | |
| client.make_bucket(settings.minio_bucket) | |
| return True | |
| except Exception: | |
| return False | |
| def put_object(object_key: str, data: bytes, content_type: str = "image/png") -> bool: | |
| if _using_supabase(): | |
| client = _supabase_client() | |
| if not client: | |
| return False | |
| try: | |
| # Supabase requires raw bytes (NOT BytesIO) | |
| safe_key = object_key.replace("\\", "/") | |
| file_options = { | |
| "content-type": content_type, | |
| "upsert": "true", | |
| } | |
| client.storage.from_(settings.supabase_bucket).upload( | |
| safe_key, | |
| data, # ✅ pass raw bytes directly | |
| file_options, | |
| ) | |
| print("Uploaded to Supabase:", safe_key) | |
| return True | |
| except Exception as e: | |
| print("SUPABASE UPLOAD ERROR:", e) | |
| return False | |
| client = _minio_client() | |
| if not client: | |
| print("MinIO client is None") | |
| return False | |
| try: | |
| ensure_bucket() | |
| client.put_object( | |
| settings.minio_bucket, | |
| object_key, | |
| io.BytesIO(data), | |
| length=len(data), | |
| content_type=content_type, | |
| ) | |
| print("Uploaded:", object_key) | |
| return True | |
| except Exception as e: | |
| print("UPLOAD ERROR:", e) | |
| return False | |
| def _extract_supabase_signed_url(signed: Any) -> Optional[str]: | |
| """Parse create_signed_url / SDK responses into a full HTTPS URL.""" | |
| if signed is None: | |
| return None | |
| if isinstance(signed, str) and signed.startswith("http"): | |
| return signed | |
| if not isinstance(signed, dict): | |
| return None | |
| url = ( | |
| signed.get("signedURL") | |
| or signed.get("signedUrl") | |
| or signed.get("signed_url") | |
| ) | |
| if isinstance(url, dict): | |
| url = url.get("signedURL") or url.get("signedUrl") | |
| if not url or not isinstance(url, str): | |
| return None | |
| if url.startswith("http"): | |
| return url | |
| base = (settings.supabase_url or "").rstrip("/") | |
| if not base: | |
| return None | |
| return f"{base}{url}" if url.startswith("/") else f"{base}/{url}" | |
| def get_presigned_url(object_key: str, expire_seconds: Optional[int] = None) -> Optional[str]: | |
| if _using_supabase(): | |
| client = _supabase_client() | |
| if not client: | |
| return None | |
| try: | |
| expire = expire_seconds or getattr( | |
| settings, "supabase_signed_url_expire_seconds", 3600 | |
| ) | |
| safe_key = object_key.replace("\\", "/") | |
| signed = client.storage.from_(settings.supabase_bucket).create_signed_url( | |
| path=safe_key, | |
| expires_in=int(expire), | |
| ) | |
| # Some SDK versions wrap payload (e.g. APIResponse with .data) | |
| payload = signed | |
| if hasattr(signed, "data") and signed.data is not None: | |
| payload = signed.data | |
| out = _extract_supabase_signed_url(payload) | |
| if out: | |
| return out | |
| print("SUPABASE PRESIGN: unexpected response shape:", type(signed), signed) | |
| return None | |
| except Exception as e: | |
| print("SUPABASE PRESIGN ERROR:", e) | |
| return None | |
| client = _minio_client() | |
| if not client: | |
| return None | |
| try: | |
| expire = expire_seconds or getattr( | |
| settings, "minio_presign_expire_seconds", 3600 | |
| ) | |
| expire_delta = timedelta(seconds=int(expire)) | |
| return client.presigned_get_object( | |
| settings.minio_bucket, | |
| object_key, | |
| expires=expire_delta, | |
| ) | |
| except Exception as e: | |
| print("PRESIGN ERROR:", e) | |
| return None | |
| def is_available() -> bool: | |
| """True if active storage provider is configured and reachable.""" | |
| return ensure_bucket() | |
| def storage_debug_snapshot() -> dict: | |
| """ | |
| Diagnostics for GET /debug/storage and upload pipeline. | |
| Does not expose secrets — only booleans and masked URL host. | |
| """ | |
| from urllib.parse import urlparse | |
| out: dict = { | |
| "storage_provider": getattr(settings, "storage_provider", "minio"), | |
| "using_supabase": _using_supabase(), | |
| "supabase_url_configured": bool((settings.supabase_url or "").strip()), | |
| "supabase_key_configured": bool((settings.supabase_service_role_key or "").strip()), | |
| "supabase_url_host": None, | |
| "supabase_import_ok": False, | |
| "supabase_import_error": None, | |
| "supabase_client_ok": False, | |
| "supabase_bucket": settings.supabase_bucket, | |
| "bucket_exists_in_project": None, | |
| "bucket_list_error": None, | |
| "sample_object_keys": [], | |
| "minio_enabled": settings.minio_enabled, | |
| } | |
| url = (settings.supabase_url or "").strip() | |
| if url: | |
| try: | |
| parsed = urlparse(url) | |
| out["supabase_url_host"] = parsed.netloc or None | |
| except Exception: | |
| out["supabase_url_host"] = "(invalid URL)" | |
| try: | |
| import supabase as _sb # noqa: F401 | |
| out["supabase_import_ok"] = True | |
| except ImportError as e: | |
| out["supabase_import_error"] = str(e) | |
| return out | |
| if not _using_supabase(): | |
| out["note"] = "Active provider is minio; Supabase checks skipped." | |
| return out | |
| client = _supabase_client() | |
| if not client: | |
| out["supabase_client_ok"] = False | |
| return out | |
| out["supabase_client_ok"] = True | |
| try: | |
| buckets = client.storage.list_buckets() or [] | |
| names = [] | |
| for b in buckets: | |
| n = _bucket_name_from_item(b) | |
| if n: | |
| names.append(n) | |
| out["bucket_names_in_project"] = names | |
| out["bucket_exists_in_project"] = settings.supabase_bucket in names | |
| except Exception as e: | |
| out["bucket_list_error"] = str(e) | |
| try: | |
| bucket_api = client.storage.from_(settings.supabase_bucket) | |
| listed = bucket_api.list() if hasattr(bucket_api, "list") else [] | |
| # Flatten first-level prefixes / files for visibility | |
| keys = [] | |
| if isinstance(listed, list): | |
| for item in listed[:10]: | |
| if isinstance(item, dict) and item.get("name"): | |
| keys.append(item["name"]) | |
| out["sample_object_keys"] = keys | |
| out["bucket_list_prefix_root_count"] = len(listed) if isinstance(listed, list) else None | |
| except Exception as e: | |
| out["bucket_list_files_error"] = str(e) | |
| return out | |