Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import uuid | |
| import httpx | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| def upload_base64_file(file_base64: str, filename: str, folder: str = "attachments") -> str: | |
| """ | |
| Decodes a base64 string and uploads it directly to Supabase Storage via REST API. | |
| Returns the public URL of the uploaded file. | |
| """ | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_API_KEY") | |
| bucket = os.getenv("SUPABASE_BUCKET", "complaints") | |
| if not supabase_url or not supabase_key: | |
| raise RuntimeError( | |
| "Supabase is not configured. " | |
| "Set SUPABASE_URL and SUPABASE_API_KEY in Hugging Face Space secrets." | |
| ) | |
| try: | |
| if "," in file_base64: | |
| file_base64 = file_base64.split(",")[1] | |
| file_data = base64.b64decode(file_base64) | |
| unique_id = str(uuid.uuid4()) | |
| file_path = f"{folder}/{unique_id}_{filename}" | |
| # 1. Upload the file | |
| upload_url = f"{supabase_url}/storage/v1/object/{bucket}/{file_path}" | |
| headers = { | |
| "apikey": supabase_key, | |
| "Authorization": f"Bearer {supabase_key}", | |
| "Content-Type": "application/octet-stream" | |
| } | |
| with httpx.Client() as client: | |
| response = client.post(upload_url, headers=headers, content=file_data) | |
| if response.status_code != 200: | |
| print(f"Supabase returned error: {response.status_code} - {response.text}") | |
| response.raise_for_status() | |
| # 2. Return the public URL | |
| public_url = f"{supabase_url}/storage/v1/object/public/{bucket}/{file_path}" | |
| return public_url | |
| except Exception as e: | |
| print(f"Error uploading file to Supabase: {e}") | |
| raise e | |