import numpy as np import cv2 from fastapi import UploadFile, HTTPException from rembg import remove import time import uuid from typing import Tuple, Optional from db.supabase_client import SupabaseClient # Initialize Supabase client supabase = SupabaseClient().get_client() async def read_image_file(file: UploadFile) -> np.ndarray: """Read and process an image file from FastAPI UploadFile""" if not file.content_type.startswith("image/"): raise HTTPException(400, "File must be an image") image_data = await file.read() image = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR) if image is None: raise HTTPException(400, "Invalid image data") return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) def remove_background(image_bytes: bytes) -> bytes: """Remove white background from image using rembg""" try: return remove(image_bytes, alpha_matting=True, alpha_matting_background_threshold=5, alpha_matting_foreground_threshold=220, alpha_matting_erode_size=5) except Exception as e: print(f"Error removing background: {str(e)}") raise Exception(f"Background removal error: {str(e)}") def upscale_image(image_bytes: bytes, scale_factor: int = 2) -> bytes: """Upscale image using OpenCV""" try: # Create a numpy array from the image bytes nparr = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) # Handle images with alpha channel if len(img.shape) > 2 and img.shape[2] == 4: # Split channels b, g, r, a = cv2.split(img) # Scale RGB channels rgb_channels = cv2.merge([b, g, r]) scaled_rgb = cv2.resize(rgb_channels, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_CUBIC) # Scale alpha channel separately scaled_alpha = cv2.resize(a, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_CUBIC) # Merge channels back together scaled_img = cv2.merge([ scaled_rgb[:, :, 0], scaled_rgb[:, :, 1], scaled_rgb[:, :, 2], scaled_alpha ]) else: # Regular RGB image scaled_img = cv2.resize(img, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_CUBIC) # Encode the image back to bytes success, buffer = cv2.imencode('.png', scaled_img) if not success: raise Exception("Failed to encode upscaled image") return buffer.tobytes() except Exception as e: print(f"Error upscaling image: {str(e)}") raise Exception(f"Image upscaling error: {str(e)}") async def process_product_image( file: UploadFile, remove_bg: bool = True, upscale: bool = True, scale_factor: int = 2, process_order: str = "remove_first" ) -> Tuple[bytes, str]: """Process a product image with background removal and upscaling""" # Read the file content content = await file.read() file.file.seek(0) # Reset file pointer for potential reuse # Create a descriptive filename with timestamp for uniqueness timestamp = int(time.time()) original_filename = file.filename.split('.') base_name = original_filename[0] if len(original_filename) > 0 else 'product' extension = 'png' # Always use PNG to preserve transparency # Process the image based on the parameters and order processed_content = content if process_order == "remove_first" and remove_bg and upscale: processed_content = remove_background(processed_content) processed_content = upscale_image(processed_content, scale_factor) elif process_order == "upscale_first" and remove_bg and upscale: processed_content = upscale_image(processed_content, scale_factor) processed_content = remove_background(processed_content) elif remove_bg: processed_content = remove_background(processed_content) elif upscale: processed_content = upscale_image(processed_content, scale_factor) # Create descriptive filename with processing info processed_filename = f"{base_name}_{'nobg' if remove_bg else ''}_{'upx' + str(scale_factor) if upscale else ''}_{timestamp}.{extension}" return processed_content, processed_filename async def upload_processed_image( processed_image: bytes, filename: str, bucket: str = "product-images" ) -> Tuple[str, str]: """ Upload a processed image to Supabase Storage Returns: Tuple[str, str]: (image_path, image_url) """ # Generate a unique ID for the image image_id = str(uuid.uuid4()) image_path = f"{image_id}_{filename}" # Upload the processed image to Supabase Storage supabase.storage.from_(bucket).upload( file=processed_image, path=image_path, file_options={"content-type": "image/png", "upsert": "true"} ) # Get the public URL for the uploaded image image_url = supabase.storage.from_(bucket).get_public_url(image_path) return image_path, image_url async def update_product_image(product_id: str, image_url: str) -> dict[str, any]: """ Update the product_image field for a product Returns: Dict[str, Any]: The updated product data """ if not product_id: raise ValueError("Product ID is required") result = supabase.table("products").update({ "product_image": image_url }).eq("product_id", product_id).execute() if not result.data: raise Exception(f"Failed to update product {product_id}") return result.data[0] async def process_and_store_product_image( file: UploadFile, remove_bg: bool = True, upscale: bool = True, scale_factor: int = 2, process_order: str = "remove_first", product_id: Optional[str] = None ) -> dict[str, any]: """ Complete workflow for processing a product image and storing it This function: 1. Processes the image (remove background, upscale) 2. Uploads it to storage 3. Updates the product record if product_id is provided Returns: Dict[str, Any]: Result with status, urls, and processing info """ # Process the image processed_image, filename = await process_product_image( file, remove_bg=remove_bg, upscale=upscale, scale_factor=scale_factor, process_order=process_order ) # Upload to storage image_path, image_url = await upload_processed_image(processed_image, filename) # Update product record if needed product_data = None if product_id: product_data = await update_product_image(product_id, image_url) # Return comprehensive result return { "status": "success", "message": "Image processed successfully", "image_url": image_url, "image_path": image_path, "product_data": product_data, "processing": { "background_removed": remove_bg, "upscaled": upscale, "scale_factor": scale_factor if upscale else None, "process_order": process_order } }