Spaces:
Build error
Build error
| """ | |
| project @ CTO_TCP_ZERO_GPU | |
| created @ 2024-11-14 | |
| author @ github.com/ishworrsubedii | |
| """ | |
| import base64 | |
| import gc | |
| import time | |
| from io import BytesIO | |
| import json | |
| import asyncio | |
| import aiohttp | |
| from PIL import Image | |
| from fastapi import File, UploadFile, Form | |
| from fastapi.routing import APIRouter | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import List | |
| from fastapi.responses import JSONResponse | |
| from src.utils import returnBytesData | |
| from src.utils.logger import logger | |
| from src.api.nto_api import pipeline, replicate_run_cto, supabase_upload_and_return_url | |
| batch_router = APIRouter() | |
| class ClothingRequest(BaseModel): | |
| c_list: List[str] | |
| async def rt_cto( | |
| image: UploadFile = File(...), | |
| c_list: str = Form(...) | |
| ): | |
| logger.info("-" * 50) | |
| logger.info(">>> REAL-TIME CTO STARTED <<<") | |
| logger.info(f"Parameters: clothing_list={c_list}") | |
| setup_start_time = time.time() | |
| try: | |
| clothing_list = [item.strip() for item in c_list.split(",")] | |
| image_bytes = await image.read() | |
| pil_image = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| setup_time = round(time.time() - setup_start_time, 2) | |
| logger.info(f">>> IMAGE LOADED SUCCESSFULLY in {setup_time}s <<<") | |
| except Exception as e: | |
| logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<") | |
| return {"error": "Error reading image", "code": 500} | |
| async def generate(): | |
| logger.info("-" * 50) | |
| logger.info(">>> CLOTHING TRY ON V2 STARTED <<<") | |
| # Mask generation timing | |
| mask_start_time = time.time() | |
| try: | |
| mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=pil_image) | |
| mask_time = round(time.time() - mask_start_time, 2) | |
| logger.info(f">>> MASK GENERATION COMPLETED in {mask_time}s <<<") | |
| except Exception as e: | |
| logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<") | |
| yield json.dumps({"error": "Error generating mask", "code": 500}) + "\n" | |
| await asyncio.sleep(0.1) | |
| return | |
| # Encoding timing | |
| encoding_start_time = time.time() | |
| try: | |
| mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO() | |
| mask.save(mask_img_base_64, format="WEBP") | |
| pil_image.save(act_img_base_64, format="WEBP") | |
| mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8") | |
| image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8") | |
| mask_data_uri = f"data:image/webp;base64,{mask_bytes_}" | |
| image_data_uri = f"data:image/webp;base64,{image_bytes_}" | |
| encoding_time = round(time.time() - encoding_start_time, 2) | |
| logger.info(f">>> IMAGE ENCODING COMPLETED in {encoding_time}s <<<") | |
| except Exception as e: | |
| logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<") | |
| yield json.dumps({"error": "Error converting images to base64", "code": 500}) + "\n" | |
| await asyncio.sleep(0.1) | |
| return | |
| for idx, clothing_type in enumerate(clothing_list): | |
| if not clothing_type: | |
| continue | |
| iteration_start_time = time.time() | |
| try: | |
| inference_start_time = time.time() | |
| output = replicate_run_cto({ | |
| "mask": mask_data_uri, | |
| "image": image_data_uri, | |
| "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse", | |
| "negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly", | |
| "num_inference_steps": 25 | |
| }) | |
| inference_time = round(time.time() - inference_start_time, 2) | |
| logger.info(f">>> REPLICATE PROCESSING COMPLETED FOR {clothing_type} in {inference_time}s <<<") | |
| output_url = str(output[0]) if output and output[0] else None | |
| iteration_time = round(time.time() - iteration_start_time, 2) | |
| result = { | |
| "code": 200, | |
| "output": output_url, | |
| "timing": { | |
| "setup": setup_time, | |
| "mask_generation": mask_time, | |
| "encoding": encoding_time, | |
| "inference": inference_time, | |
| "iteration": iteration_time | |
| }, | |
| "clothing_type": clothing_type, | |
| "progress": f"{idx + 1}/{len(clothing_list)}" | |
| } | |
| yield json.dumps(result) + "\n" | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<") | |
| error_result = { | |
| "error": "Error running CTO Replicate", | |
| "details": str(e), | |
| "code": 500, | |
| "clothing_type": clothing_type, | |
| "progress": f"{idx + 1}/{len(clothing_list)}" | |
| } | |
| yield json.dumps(error_result) + "\n" | |
| await asyncio.sleep(0.1) | |
| return StreamingResponse( | |
| generate(), | |
| media_type="application/x-ndjson", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "Transfer-Encoding": "chunked" | |
| } | |
| ) | |
| async def rt_nto( | |
| image: UploadFile = File(...), | |
| necklace_id_list: str = Form(...), | |
| category_list: str = Form(...), | |
| storename: str = Form(...) | |
| ): | |
| logger.info("-" * 50) | |
| logger.info(">>> REAL-TIME NECKLACE TRY ON STARTED <<<") | |
| logger.info(f"Parameters: storename={storename}, categories={category_list}, necklace_ids={necklace_id_list}") | |
| try: | |
| # Parse the lists | |
| necklace_ids = [id.strip() for id in necklace_id_list.split(",")] | |
| categories = [cat.strip() for cat in category_list.split(",")] | |
| if len(necklace_ids) != len(categories): | |
| return JSONResponse( | |
| content={"error": "Number of necklace IDs must match number of categories", "code": 400}, | |
| status_code=400 | |
| ) | |
| # Load the source image | |
| image_bytes = await image.read() | |
| source_image = Image.open(BytesIO(image_bytes)) | |
| logger.info(">>> SOURCE IMAGE LOADED SUCCESSFULLY <<<") | |
| except Exception as e: | |
| logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<") | |
| return JSONResponse( | |
| content={"error": "Error in initial setup", "details": str(e), "code": 500}, | |
| status_code=500 | |
| ) | |
| async def generate(): | |
| setup_start_time = time.time() # Add setup timing | |
| # After loading images | |
| setup_time = round(time.time() - setup_start_time, 2) | |
| logger.info(f">>> SETUP COMPLETED in {setup_time}s <<<") | |
| for idx, (necklace_id, category) in enumerate(zip(necklace_ids, categories)): | |
| iteration_start_time = time.time() | |
| try: | |
| # Load jewellery timing | |
| jewellery_load_start = time.time() | |
| jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{category}/image/{necklace_id}.png" | |
| jewellery = Image.open(returnBytesData(url=jewellery_url)) | |
| jewellery_time = round(time.time() - jewellery_load_start, 2) | |
| logger.info(f">>> JEWELLERY LOADED in {jewellery_time}s <<<") | |
| # NTO timing | |
| nto_start_time = time.time() | |
| result, headetText, mask = await pipeline.necklaceTryOn_( | |
| image=source_image, | |
| jewellery=jewellery, | |
| storename=storename | |
| ) | |
| nto_time = round(time.time() - nto_start_time, 2) | |
| # Upload timing | |
| upload_start_time = time.time() | |
| upload_tasks = [ | |
| supabase_upload_and_return_url(prefix="necklace_try_on", image=result), | |
| supabase_upload_and_return_url(prefix="necklace_try_on_mask", image=mask) | |
| ] | |
| result_url, mask_url = await asyncio.gather(*upload_tasks) | |
| upload_time = round(time.time() - upload_start_time, 2) | |
| result = { | |
| "code": 200, | |
| "output": result_url, | |
| "mask": mask_url, | |
| "timing": { | |
| "setup": setup_time, | |
| "jewellery_load": jewellery_time, | |
| "nto_inference": nto_time, | |
| "upload": upload_time, | |
| "total_iteration": round(time.time() - iteration_start_time, 2) | |
| }, | |
| "necklace_id": necklace_id, | |
| "category": category, | |
| "progress": f"{idx + 1}/{len(necklace_ids)}" | |
| } | |
| yield json.dumps(result) + "\n" | |
| await asyncio.sleep(0.1) | |
| del result | |
| del mask | |
| gc.collect() | |
| except Exception as e: | |
| logger.error(f">>> PROCESSING ERROR FOR {necklace_id}: {str(e)} <<<") | |
| error_result = { | |
| "error": f"Error processing necklace {necklace_id}", | |
| "details": str(e), | |
| "code": 500, | |
| "necklace_id": necklace_id, | |
| "category": category, | |
| "progress": f"{idx + 1}/{len(necklace_ids)}" | |
| } | |
| yield json.dumps(error_result) + "\n" | |
| await asyncio.sleep(0.1) | |
| return StreamingResponse( | |
| generate(), | |
| media_type="application/x-ndjson", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "Transfer-Encoding": "chunked" | |
| } | |
| ) | |
| async def rt_cto_nto( | |
| image: UploadFile = File(...), | |
| c_list: str = Form(...), | |
| necklace_id: str = Form(...), | |
| necklace_category: str = Form(...), | |
| storename: str = Form(...) | |
| ): | |
| logger.info("-" * 50) | |
| logger.info(">>> REAL-TIME CTO-NTO STARTED <<<") | |
| logger.info(f"Parameters: storename={storename}, necklace_category={necklace_category}, " | |
| f"necklace_id={necklace_id}, clothing_list={c_list}") | |
| try: | |
| clothing_list = [item.strip() for item in c_list.split(",")] | |
| image_bytes = await image.read() | |
| source_image = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{necklace_category}/image/{necklace_id}.png" | |
| jewellery = Image.open(returnBytesData(url=jewellery_url)).convert("RGBA") | |
| logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<") | |
| except Exception as e: | |
| logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<") | |
| return JSONResponse( | |
| content={"error": "Error in initial setup", "details": str(e), "code": 500}, | |
| status_code=500 | |
| ) | |
| async def generate(): | |
| setup_start_time = time.time() | |
| # After mask generation | |
| mask_time = round(time.time() - setup_start_time, 2) | |
| # Encoding timing | |
| encoding_start_time = time.time() | |
| # After encoding | |
| encoding_time = round(time.time() - encoding_start_time, 2) | |
| for idx, clothing_type in enumerate(clothing_list): | |
| iteration_start_time = time.time() | |
| try: | |
| # Perform CTO | |
| cto_start_time = time.time() | |
| cto_output = replicate_run_cto({ | |
| "mask": mask_data_uri, | |
| "image": image_data_uri, | |
| "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse", | |
| "negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly", | |
| "num_inference_steps": 25 | |
| }) | |
| cto_time = round(time.time() - cto_start_time, 2) | |
| logger.info(f">>> CTO COMPLETED for {clothing_type} in {cto_time}s <<<") | |
| # Get CTO result and process NTO | |
| nto_start_time = time.time() | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(str(cto_output[0])) as response: | |
| if response.status != 200: | |
| raise ValueError("Failed to fetch CTO output") | |
| cto_result_bytes = await response.read() | |
| with BytesIO(cto_result_bytes) as buf: | |
| cto_result_image = Image.open(buf).convert("RGB") | |
| result, headerText, mask = await pipeline.necklaceTryOn_( | |
| image=cto_result_image, | |
| jewellery=jewellery, | |
| storename=storename | |
| ) | |
| nto_time = round(time.time() - nto_start_time, 2) | |
| logger.info(f">>> NTO COMPLETED for {clothing_type} in {nto_time}s <<<") | |
| # Upload result | |
| upload_start_time = time.time() | |
| result_url = await supabase_upload_and_return_url( | |
| prefix="clothing_necklace_try_on", | |
| image=result | |
| ) | |
| upload_time = round(time.time() - upload_start_time, 2) | |
| # Stream result with detailed timing | |
| output_result = { | |
| "code": 200, | |
| "output": result_url, | |
| "timing": { | |
| "setup": mask_time, # Include setup time | |
| "encoding": encoding_time, | |
| "cto_inference": cto_time, | |
| "nto_inference": nto_time, | |
| "upload": upload_time, | |
| "total_iteration": round(time.time() - iteration_start_time, 2) | |
| }, | |
| "clothing_type": clothing_type, | |
| "progress": f"{idx + 1}/{len(clothing_list)}" | |
| } | |
| yield json.dumps(output_result) + "\n" | |
| await asyncio.sleep(0.1) | |
| del result | |
| gc.collect() | |
| except Exception as e: | |
| logger.error(f">>> PROCESSING ERROR FOR {clothing_type}: {str(e)} <<<") | |
| error_result = { | |
| "error": f"Error processing clothing {clothing_type}", | |
| "details": str(e), | |
| "code": 500, | |
| "clothing_type": clothing_type, | |
| "progress": f"{idx + 1}/{len(clothing_list)}" | |
| } | |
| yield json.dumps(error_result) + "\n" | |
| await asyncio.sleep(0.1) | |
| return StreamingResponse( | |
| generate(), | |
| media_type="application/x-ndjson", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "Transfer-Encoding": "chunked" | |
| } | |
| ) | |