stage-backend / storage.py
github-actions[bot]
Sync backend folder from GitHub
6b13793
Raw
History Blame Contribute Delete
1.79 kB
import os
import base64
import uuid
import httpx
from dotenv import load_dotenv
load_dotenv()
def upload_base64_file(file_base64: str, filename: str, folder: str = "attachments") -> str:
"""
Decodes a base64 string and uploads it directly to Supabase Storage via REST API.
Returns the public URL of the uploaded file.
"""
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_API_KEY")
bucket = os.getenv("SUPABASE_BUCKET", "complaints")
if not supabase_url or not supabase_key:
raise RuntimeError(
"Supabase is not configured. "
"Set SUPABASE_URL and SUPABASE_API_KEY in Hugging Face Space secrets."
)
try:
if "," in file_base64:
file_base64 = file_base64.split(",")[1]
file_data = base64.b64decode(file_base64)
unique_id = str(uuid.uuid4())
file_path = f"{folder}/{unique_id}_{filename}"
# 1. Upload the file
upload_url = f"{supabase_url}/storage/v1/object/{bucket}/{file_path}"
headers = {
"apikey": supabase_key,
"Authorization": f"Bearer {supabase_key}",
"Content-Type": "application/octet-stream"
}
with httpx.Client() as client:
response = client.post(upload_url, headers=headers, content=file_data)
if response.status_code != 200:
print(f"Supabase returned error: {response.status_code} - {response.text}")
response.raise_for_status()
# 2. Return the public URL
public_url = f"{supabase_url}/storage/v1/object/public/{bucket}/{file_path}"
return public_url
except Exception as e:
print(f"Error uploading file to Supabase: {e}")
raise e