Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import cv2 | |
| from fastapi import UploadFile, HTTPException | |
| from rembg import remove | |
| import time | |
| import uuid | |
| from typing import Tuple, Optional | |
| from db.supabase_client import SupabaseClient | |
| # Initialize Supabase client | |
| supabase = SupabaseClient().get_client() | |
| async def read_image_file(file: UploadFile) -> np.ndarray: | |
| """Read and process an image file from FastAPI UploadFile""" | |
| if not file.content_type.startswith("image/"): | |
| raise HTTPException(400, "File must be an image") | |
| image_data = await file.read() | |
| image = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise HTTPException(400, "Invalid image data") | |
| return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| def remove_background(image_bytes: bytes) -> bytes: | |
| """Remove white background from image using rembg""" | |
| try: | |
| return remove(image_bytes, | |
| alpha_matting=True, | |
| alpha_matting_background_threshold=5, | |
| alpha_matting_foreground_threshold=220, | |
| alpha_matting_erode_size=5) | |
| except Exception as e: | |
| print(f"Error removing background: {str(e)}") | |
| raise Exception(f"Background removal error: {str(e)}") | |
| def upscale_image(image_bytes: bytes, scale_factor: int = 2) -> bytes: | |
| """Upscale image using OpenCV""" | |
| try: | |
| # Create a numpy array from the image bytes | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) | |
| # Handle images with alpha channel | |
| if len(img.shape) > 2 and img.shape[2] == 4: | |
| # Split channels | |
| b, g, r, a = cv2.split(img) | |
| # Scale RGB channels | |
| rgb_channels = cv2.merge([b, g, r]) | |
| scaled_rgb = cv2.resize(rgb_channels, None, fx=scale_factor, fy=scale_factor, | |
| interpolation=cv2.INTER_CUBIC) | |
| # Scale alpha channel separately | |
| scaled_alpha = cv2.resize(a, None, fx=scale_factor, fy=scale_factor, | |
| interpolation=cv2.INTER_CUBIC) | |
| # Merge channels back together | |
| scaled_img = cv2.merge([ | |
| scaled_rgb[:, :, 0], | |
| scaled_rgb[:, :, 1], | |
| scaled_rgb[:, :, 2], | |
| scaled_alpha | |
| ]) | |
| else: | |
| # Regular RGB image | |
| scaled_img = cv2.resize(img, None, fx=scale_factor, fy=scale_factor, | |
| interpolation=cv2.INTER_CUBIC) | |
| # Encode the image back to bytes | |
| success, buffer = cv2.imencode('.png', scaled_img) | |
| if not success: | |
| raise Exception("Failed to encode upscaled image") | |
| return buffer.tobytes() | |
| except Exception as e: | |
| print(f"Error upscaling image: {str(e)}") | |
| raise Exception(f"Image upscaling error: {str(e)}") | |
| async def process_product_image( | |
| file: UploadFile, | |
| remove_bg: bool = True, | |
| upscale: bool = True, | |
| scale_factor: int = 2, | |
| process_order: str = "remove_first" | |
| ) -> Tuple[bytes, str]: | |
| """Process a product image with background removal and upscaling""" | |
| # Read the file content | |
| content = await file.read() | |
| file.file.seek(0) # Reset file pointer for potential reuse | |
| # Create a descriptive filename with timestamp for uniqueness | |
| timestamp = int(time.time()) | |
| original_filename = file.filename.split('.') | |
| base_name = original_filename[0] if len(original_filename) > 0 else 'product' | |
| extension = 'png' # Always use PNG to preserve transparency | |
| # Process the image based on the parameters and order | |
| processed_content = content | |
| if process_order == "remove_first" and remove_bg and upscale: | |
| processed_content = remove_background(processed_content) | |
| processed_content = upscale_image(processed_content, scale_factor) | |
| elif process_order == "upscale_first" and remove_bg and upscale: | |
| processed_content = upscale_image(processed_content, scale_factor) | |
| processed_content = remove_background(processed_content) | |
| elif remove_bg: | |
| processed_content = remove_background(processed_content) | |
| elif upscale: | |
| processed_content = upscale_image(processed_content, scale_factor) | |
| # Create descriptive filename with processing info | |
| processed_filename = f"{base_name}_{'nobg' if remove_bg else ''}_{'upx' + str(scale_factor) if upscale else ''}_{timestamp}.{extension}" | |
| return processed_content, processed_filename | |
| async def upload_processed_image( | |
| processed_image: bytes, | |
| filename: str, | |
| bucket: str = "product-images" | |
| ) -> Tuple[str, str]: | |
| """ | |
| Upload a processed image to Supabase Storage | |
| Returns: | |
| Tuple[str, str]: (image_path, image_url) | |
| """ | |
| # Generate a unique ID for the image | |
| image_id = str(uuid.uuid4()) | |
| image_path = f"{image_id}_{filename}" | |
| # Upload the processed image to Supabase Storage | |
| supabase.storage.from_(bucket).upload( | |
| file=processed_image, | |
| path=image_path, | |
| file_options={"content-type": "image/png", "upsert": "true"} | |
| ) | |
| # Get the public URL for the uploaded image | |
| image_url = supabase.storage.from_(bucket).get_public_url(image_path) | |
| return image_path, image_url | |
| async def update_product_image(product_id: str, image_url: str) -> dict[str, any]: | |
| """ | |
| Update the product_image field for a product | |
| Returns: | |
| Dict[str, Any]: The updated product data | |
| """ | |
| if not product_id: | |
| raise ValueError("Product ID is required") | |
| result = supabase.table("products").update({ | |
| "product_image": image_url | |
| }).eq("product_id", product_id).execute() | |
| if not result.data: | |
| raise Exception(f"Failed to update product {product_id}") | |
| return result.data[0] | |
| async def process_and_store_product_image( | |
| file: UploadFile, | |
| remove_bg: bool = True, | |
| upscale: bool = True, | |
| scale_factor: int = 2, | |
| process_order: str = "remove_first", | |
| product_id: Optional[str] = None | |
| ) -> dict[str, any]: | |
| """ | |
| Complete workflow for processing a product image and storing it | |
| This function: | |
| 1. Processes the image (remove background, upscale) | |
| 2. Uploads it to storage | |
| 3. Updates the product record if product_id is provided | |
| Returns: | |
| Dict[str, Any]: Result with status, urls, and processing info | |
| """ | |
| # Process the image | |
| processed_image, filename = await process_product_image( | |
| file, | |
| remove_bg=remove_bg, | |
| upscale=upscale, | |
| scale_factor=scale_factor, | |
| process_order=process_order | |
| ) | |
| # Upload to storage | |
| image_path, image_url = await upload_processed_image(processed_image, filename) | |
| # Update product record if needed | |
| product_data = None | |
| if product_id: | |
| product_data = await update_product_image(product_id, image_url) | |
| # Return comprehensive result | |
| return { | |
| "status": "success", | |
| "message": "Image processed successfully", | |
| "image_url": image_url, | |
| "image_path": image_path, | |
| "product_data": product_data, | |
| "processing": { | |
| "background_removed": remove_bg, | |
| "upscaled": upscale, | |
| "scale_factor": scale_factor if upscale else None, | |
| "process_order": process_order | |
| } | |
| } | |