File size: 7,528 Bytes
737cb33
 
 
e245176
 
 
 
15b9bc1
 
 
 
 
737cb33
 
 
 
 
 
 
 
 
 
 
 
 
e245176
15b9bc1
e245176
 
 
15b9bc1
 
 
 
 
e245176
 
 
 
15b9bc1
e245176
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15b9bc1
e245176
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15b9bc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1139dbb
15b9bc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import numpy as np
import cv2
from fastapi import UploadFile, HTTPException
from rembg import remove
import time
import uuid
from typing import Tuple, Optional
from db.supabase_client import SupabaseClient

# Initialize Supabase client
supabase = SupabaseClient().get_client()


async def read_image_file(file: UploadFile) -> np.ndarray:
    """Read and process an image file from FastAPI UploadFile"""
    if not file.content_type.startswith("image/"):
        raise HTTPException(400, "File must be an image")

    image_data = await file.read()
    image = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR)
    
    if image is None:
        raise HTTPException(400, "Invalid image data")
        
    return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)


def remove_background(image_bytes: bytes) -> bytes:
    """Remove white background from image using rembg"""
    try:
        return remove(image_bytes,
                      alpha_matting=True,
                      alpha_matting_background_threshold=5,
                      alpha_matting_foreground_threshold=220,
                      alpha_matting_erode_size=5)
    except Exception as e:
        print(f"Error removing background: {str(e)}")
        raise Exception(f"Background removal error: {str(e)}")


def upscale_image(image_bytes: bytes, scale_factor: int = 2) -> bytes:
    """Upscale image using OpenCV"""
    try:
        # Create a numpy array from the image bytes
        nparr = np.frombuffer(image_bytes, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
        
        # Handle images with alpha channel
        if len(img.shape) > 2 and img.shape[2] == 4:
            # Split channels
            b, g, r, a = cv2.split(img)
            
            # Scale RGB channels
            rgb_channels = cv2.merge([b, g, r])
            scaled_rgb = cv2.resize(rgb_channels, None, fx=scale_factor, fy=scale_factor, 
                                   interpolation=cv2.INTER_CUBIC)
            
            # Scale alpha channel separately
            scaled_alpha = cv2.resize(a, None, fx=scale_factor, fy=scale_factor, 
                                     interpolation=cv2.INTER_CUBIC)
            
            # Merge channels back together
            scaled_img = cv2.merge([
                scaled_rgb[:, :, 0],
                scaled_rgb[:, :, 1],
                scaled_rgb[:, :, 2],
                scaled_alpha
            ])
        else:
            # Regular RGB image
            scaled_img = cv2.resize(img, None, fx=scale_factor, fy=scale_factor, 
                                   interpolation=cv2.INTER_CUBIC)
        
        # Encode the image back to bytes
        success, buffer = cv2.imencode('.png', scaled_img)
        if not success:
            raise Exception("Failed to encode upscaled image")
        
        return buffer.tobytes()
    except Exception as e:
        print(f"Error upscaling image: {str(e)}")
        raise Exception(f"Image upscaling error: {str(e)}")


async def process_product_image(
    file: UploadFile, 
    remove_bg: bool = True, 
    upscale: bool = True, 
    scale_factor: int = 2,
    process_order: str = "remove_first"
) -> Tuple[bytes, str]:
    """Process a product image with background removal and upscaling"""
    # Read the file content
    content = await file.read()
    file.file.seek(0)  # Reset file pointer for potential reuse
    
    # Create a descriptive filename with timestamp for uniqueness
    timestamp = int(time.time())
    original_filename = file.filename.split('.')
    base_name = original_filename[0] if len(original_filename) > 0 else 'product'
    extension = 'png'  # Always use PNG to preserve transparency
    
    # Process the image based on the parameters and order
    processed_content = content
    
    if process_order == "remove_first" and remove_bg and upscale:
        processed_content = remove_background(processed_content)
        processed_content = upscale_image(processed_content, scale_factor)
    elif process_order == "upscale_first" and remove_bg and upscale:
        processed_content = upscale_image(processed_content, scale_factor)
        processed_content = remove_background(processed_content)
    elif remove_bg:
        processed_content = remove_background(processed_content)
    elif upscale:
        processed_content = upscale_image(processed_content, scale_factor)
    
    # Create descriptive filename with processing info
    processed_filename = f"{base_name}_{'nobg' if remove_bg else ''}_{'upx' + str(scale_factor) if upscale else ''}_{timestamp}.{extension}"
    
    return processed_content, processed_filename


async def upload_processed_image(
        processed_image: bytes,
        filename: str,
        bucket: str = "product-images"
) -> Tuple[str, str]:
    """
    Upload a processed image to Supabase Storage

    Returns:
        Tuple[str, str]: (image_path, image_url)
    """
    # Generate a unique ID for the image
    image_id = str(uuid.uuid4())
    image_path = f"{image_id}_{filename}"

    # Upload the processed image to Supabase Storage
    supabase.storage.from_(bucket).upload(
        file=processed_image,
        path=image_path,
        file_options={"content-type": "image/png", "upsert": "true"}
    )

    # Get the public URL for the uploaded image
    image_url = supabase.storage.from_(bucket).get_public_url(image_path)

    return image_path, image_url

async def update_product_image(product_id: str, image_url: str) -> dict[str, any]:
    """
    Update the product_image field for a product

    Returns:
        Dict[str, Any]: The updated product data
    """
    if not product_id:
        raise ValueError("Product ID is required")

    result = supabase.table("products").update({
        "product_image": image_url
    }).eq("product_id", product_id).execute()

    if not result.data:
        raise Exception(f"Failed to update product {product_id}")

    return result.data[0]

async def process_and_store_product_image(
        file: UploadFile,
        remove_bg: bool = True,
        upscale: bool = True,
        scale_factor: int = 2,
        process_order: str = "remove_first",
        product_id: Optional[str] = None
) -> dict[str, any]:
    """
    Complete workflow for processing a product image and storing it

    This function:
    1. Processes the image (remove background, upscale)
    2. Uploads it to storage
    3. Updates the product record if product_id is provided

    Returns:
        Dict[str, Any]: Result with status, urls, and processing info
    """
    # Process the image
    processed_image, filename = await process_product_image(
        file,
        remove_bg=remove_bg,
        upscale=upscale,
        scale_factor=scale_factor,
        process_order=process_order
    )

    # Upload to storage
    image_path, image_url = await upload_processed_image(processed_image, filename)

    # Update product record if needed
    product_data = None
    if product_id:
        product_data = await update_product_image(product_id, image_url)

    # Return comprehensive result
    return {
        "status": "success",
        "message": "Image processed successfully",
        "image_url": image_url,
        "image_path": image_path,
        "product_data": product_data,
        "processing": {
            "background_removed": remove_bg,
            "upscaled": upscale,
            "scale_factor": scale_factor if upscale else None,
            "process_order": process_order
        }
    }