Spaces:
Sleeping
Sleeping
| """Validate and upload chat/solve attachment images to Supabase Storage (image bucket).""" | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import uuid | |
| from typing import Any, Dict, Tuple | |
| from fastapi import HTTPException | |
| logger = logging.getLogger(__name__) | |
| def _get_next_image_version(session_id: str) -> int: | |
| """Same logic as worker.asset_manager.get_next_version for asset_type image.""" | |
| from app.supabase_client import get_supabase | |
| supabase = get_supabase() | |
| try: | |
| res = ( | |
| supabase.table("session_assets") | |
| .select("version") | |
| .eq("session_id", session_id) | |
| .eq("asset_type", "image") | |
| .order("version", desc=True) | |
| .limit(1) | |
| .execute() | |
| ) | |
| if res.data: | |
| return res.data[0]["version"] + 1 | |
| return 1 | |
| except Exception as e: | |
| logger.error("Error fetching image version: %s", e) | |
| return 1 | |
| _MAX_BYTES_DEFAULT = 10 * 1024 * 1024 | |
| _EXT_TO_MIME: dict[str, str] = { | |
| ".png": "image/png", | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".webp": "image/webp", | |
| ".gif": "image/gif", | |
| ".bmp": "image/bmp", | |
| } | |
| def _max_bytes() -> int: | |
| raw = os.getenv("CHAT_IMAGE_MAX_BYTES") | |
| if raw and raw.isdigit(): | |
| return min(int(raw), 50 * 1024 * 1024) | |
| return _MAX_BYTES_DEFAULT | |
| def _magic_ok(ext: str, body: bytes) -> bool: | |
| if len(body) < 12: | |
| return False | |
| if ext == ".png": | |
| return body.startswith(b"\x89PNG\r\n\x1a\n") | |
| if ext in (".jpg", ".jpeg"): | |
| return body.startswith(b"\xff\xd8\xff") | |
| if ext == ".webp": | |
| return body.startswith(b"RIFF") and body[8:12] == b"WEBP" | |
| if ext == ".gif": | |
| return body.startswith(b"GIF87a") or body.startswith(b"GIF89a") | |
| if ext == ".bmp": | |
| return body.startswith(b"BM") | |
| return False | |
| def validate_chat_image_bytes( | |
| filename: str | None, | |
| body: bytes, | |
| declared_content_type: str | None, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Validate size, extension, and magic bytes. | |
| Returns (extension_with_dot, content_type). | |
| """ | |
| max_b = _max_bytes() | |
| if not body: | |
| raise HTTPException(status_code=400, detail="Empty file.") | |
| if len(body) > max_b: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"Image too large (max {max_b // (1024 * 1024)} MB).", | |
| ) | |
| ext = os.path.splitext(filename or "")[1].lower() | |
| if not ext: | |
| ext = ".png" | |
| if ext not in _EXT_TO_MIME: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported image type: {ext}. Allowed: {', '.join(sorted(_EXT_TO_MIME))}", | |
| ) | |
| if not _magic_ok(ext, body): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="File content does not match declared image type.", | |
| ) | |
| mime = _EXT_TO_MIME[ext] | |
| if declared_content_type: | |
| decl = declared_content_type.split(";")[0].strip().lower() | |
| if decl and decl not in ("application/octet-stream", mime) and decl != mime: | |
| logger.warning( | |
| "Content-Type mismatch (declared=%s, inferred=%s); using inferred.", | |
| declared_content_type, | |
| mime, | |
| ) | |
| return ext, mime | |
| def upload_session_chat_image( | |
| session_id: str, | |
| job_id: str, | |
| file_bytes: bytes, | |
| ext_with_dot: str, | |
| content_type: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Upload to SUPABASE_IMAGE_BUCKET (default: image), insert session_assets row. | |
| Returns dict with public_url, storage_path, version, session_asset_id (if returned). | |
| """ | |
| from app.supabase_client import get_supabase | |
| supabase = get_supabase() | |
| bucket_name = os.getenv("SUPABASE_IMAGE_BUCKET", "image") | |
| raw_ext = ext_with_dot.lstrip(".").lower() | |
| version = _get_next_image_version(session_id) | |
| file_name = f"image_v{version}_{job_id}.{raw_ext}" | |
| storage_path = f"sessions/{session_id}/{file_name}" | |
| supabase.storage.from_(bucket_name).upload( | |
| path=storage_path, | |
| file=file_bytes, | |
| file_options={"content-type": content_type}, | |
| ) | |
| public_url = supabase.storage.from_(bucket_name).get_public_url(storage_path) | |
| if isinstance(public_url, dict): | |
| public_url = public_url.get("publicUrl") or public_url.get("public_url") or str(public_url) | |
| row = { | |
| "session_id": session_id, | |
| "job_id": job_id, | |
| "asset_type": "image", | |
| "storage_path": storage_path, | |
| "public_url": public_url, | |
| "version": version, | |
| } | |
| ins = supabase.table("session_assets").insert(row).select("id").execute() | |
| asset_id = None | |
| if ins.data and len(ins.data) > 0: | |
| asset_id = ins.data[0].get("id") | |
| log_data = { | |
| "public_url": public_url, | |
| "storage_path": storage_path, | |
| "version": version, | |
| "session_asset_id": str(asset_id) if asset_id else None, | |
| } | |
| logger.info("Uploaded chat image: %s", log_data) | |
| return { | |
| "public_url": public_url, | |
| "storage_path": storage_path, | |
| "version": version, | |
| "session_asset_id": str(asset_id) if asset_id else None, | |
| } | |
| def upload_ephemeral_ocr_blob( | |
| file_bytes: bytes, | |
| ext_with_dot: str, | |
| content_type: str, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Upload bytes to image bucket under _ocr_temp/ for worker-only OCR (no session_assets row). | |
| Returns (storage_path, public_url). Caller must delete_storage_object when done. | |
| """ | |
| from app.supabase_client import get_supabase | |
| bucket_name = os.getenv("SUPABASE_IMAGE_BUCKET", "image") | |
| raw_ext = ext_with_dot.lstrip(".").lower() or "png" | |
| name = f"_ocr_temp/{uuid.uuid4().hex}.{raw_ext}" | |
| supabase = get_supabase() | |
| supabase.storage.from_(bucket_name).upload( | |
| path=name, | |
| file=file_bytes, | |
| file_options={"content-type": content_type}, | |
| ) | |
| public_url = supabase.storage.from_(bucket_name).get_public_url(name) | |
| if isinstance(public_url, dict): | |
| public_url = public_url.get("publicUrl") or public_url.get("public_url") or str(public_url) | |
| return name, public_url | |
| def delete_storage_object(bucket_name: str, storage_path: str) -> None: | |
| try: | |
| from app.supabase_client import get_supabase | |
| get_supabase().storage.from_(bucket_name).remove([storage_path]) | |
| except Exception as e: | |
| logger.warning("delete_storage_object failed path=%s: %s", storage_path, e) | |