""" project @ CTO_TCP_ZERO_GPU created @ 2024-11-14 author @ github.com/ishworrsubedii """ import base64 import gc import time from io import BytesIO import json import asyncio import aiohttp from PIL import Image from fastapi import File, UploadFile, Form from fastapi.routing import APIRouter from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List from fastapi.responses import JSONResponse from src.utils import returnBytesData from src.utils.logger import logger from src.api.nto_api import pipeline, replicate_run_cto, supabase_upload_and_return_url batch_router = APIRouter() class ClothingRequest(BaseModel): c_list: List[str] @batch_router.post("/rt_cto") async def rt_cto( image: UploadFile = File(...), c_list: str = Form(...) ): logger.info("-" * 50) logger.info(">>> REAL-TIME CTO STARTED <<<") logger.info(f"Parameters: clothing_list={c_list}") setup_start_time = time.time() try: clothing_list = [item.strip() for item in c_list.split(",")] image_bytes = await image.read() pil_image = Image.open(BytesIO(image_bytes)).convert("RGB") setup_time = round(time.time() - setup_start_time, 2) logger.info(f">>> IMAGE LOADED SUCCESSFULLY in {setup_time}s <<<") except Exception as e: logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<") return {"error": "Error reading image", "code": 500} async def generate(): logger.info("-" * 50) logger.info(">>> CLOTHING TRY ON V2 STARTED <<<") # Mask generation timing mask_start_time = time.time() try: mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=pil_image) mask_time = round(time.time() - mask_start_time, 2) logger.info(f">>> MASK GENERATION COMPLETED in {mask_time}s <<<") except Exception as e: logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<") yield json.dumps({"error": "Error generating mask", "code": 500}) + "\n" await asyncio.sleep(0.1) return # Encoding timing encoding_start_time = time.time() try: mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO() mask.save(mask_img_base_64, format="WEBP") pil_image.save(act_img_base_64, format="WEBP") mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8") image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8") mask_data_uri = f"data:image/webp;base64,{mask_bytes_}" image_data_uri = f"data:image/webp;base64,{image_bytes_}" encoding_time = round(time.time() - encoding_start_time, 2) logger.info(f">>> IMAGE ENCODING COMPLETED in {encoding_time}s <<<") except Exception as e: logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<") yield json.dumps({"error": "Error converting images to base64", "code": 500}) + "\n" await asyncio.sleep(0.1) return for idx, clothing_type in enumerate(clothing_list): if not clothing_type: continue iteration_start_time = time.time() try: inference_start_time = time.time() output = replicate_run_cto({ "mask": mask_data_uri, "image": image_data_uri, "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse", "negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly", "num_inference_steps": 25 }) inference_time = round(time.time() - inference_start_time, 2) logger.info(f">>> REPLICATE PROCESSING COMPLETED FOR {clothing_type} in {inference_time}s <<<") output_url = str(output[0]) if output and output[0] else None iteration_time = round(time.time() - iteration_start_time, 2) result = { "code": 200, "output": output_url, "timing": { "setup": setup_time, "mask_generation": mask_time, "encoding": encoding_time, "inference": inference_time, "iteration": iteration_time }, "clothing_type": clothing_type, "progress": f"{idx + 1}/{len(clothing_list)}" } yield json.dumps(result) + "\n" await asyncio.sleep(0.1) except Exception as e: logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<") error_result = { "error": "Error running CTO Replicate", "details": str(e), "code": 500, "clothing_type": clothing_type, "progress": f"{idx + 1}/{len(clothing_list)}" } yield json.dumps(error_result) + "\n" await asyncio.sleep(0.1) return StreamingResponse( generate(), media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Transfer-Encoding": "chunked" } ) @batch_router.post("/rt_nto") async def rt_nto( image: UploadFile = File(...), necklace_id_list: str = Form(...), category_list: str = Form(...), storename: str = Form(...) ): logger.info("-" * 50) logger.info(">>> REAL-TIME NECKLACE TRY ON STARTED <<<") logger.info(f"Parameters: storename={storename}, categories={category_list}, necklace_ids={necklace_id_list}") try: # Parse the lists necklace_ids = [id.strip() for id in necklace_id_list.split(",")] categories = [cat.strip() for cat in category_list.split(",")] if len(necklace_ids) != len(categories): return JSONResponse( content={"error": "Number of necklace IDs must match number of categories", "code": 400}, status_code=400 ) # Load the source image image_bytes = await image.read() source_image = Image.open(BytesIO(image_bytes)) logger.info(">>> SOURCE IMAGE LOADED SUCCESSFULLY <<<") except Exception as e: logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<") return JSONResponse( content={"error": "Error in initial setup", "details": str(e), "code": 500}, status_code=500 ) async def generate(): setup_start_time = time.time() # Add setup timing # After loading images setup_time = round(time.time() - setup_start_time, 2) logger.info(f">>> SETUP COMPLETED in {setup_time}s <<<") for idx, (necklace_id, category) in enumerate(zip(necklace_ids, categories)): iteration_start_time = time.time() try: # Load jewellery timing jewellery_load_start = time.time() jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{category}/image/{necklace_id}.png" jewellery = Image.open(returnBytesData(url=jewellery_url)) jewellery_time = round(time.time() - jewellery_load_start, 2) logger.info(f">>> JEWELLERY LOADED in {jewellery_time}s <<<") # NTO timing nto_start_time = time.time() result, headetText, mask = await pipeline.necklaceTryOn_( image=source_image, jewellery=jewellery, storename=storename ) nto_time = round(time.time() - nto_start_time, 2) # Upload timing upload_start_time = time.time() upload_tasks = [ supabase_upload_and_return_url(prefix="necklace_try_on", image=result), supabase_upload_and_return_url(prefix="necklace_try_on_mask", image=mask) ] result_url, mask_url = await asyncio.gather(*upload_tasks) upload_time = round(time.time() - upload_start_time, 2) result = { "code": 200, "output": result_url, "mask": mask_url, "timing": { "setup": setup_time, "jewellery_load": jewellery_time, "nto_inference": nto_time, "upload": upload_time, "total_iteration": round(time.time() - iteration_start_time, 2) }, "necklace_id": necklace_id, "category": category, "progress": f"{idx + 1}/{len(necklace_ids)}" } yield json.dumps(result) + "\n" await asyncio.sleep(0.1) del result del mask gc.collect() except Exception as e: logger.error(f">>> PROCESSING ERROR FOR {necklace_id}: {str(e)} <<<") error_result = { "error": f"Error processing necklace {necklace_id}", "details": str(e), "code": 500, "necklace_id": necklace_id, "category": category, "progress": f"{idx + 1}/{len(necklace_ids)}" } yield json.dumps(error_result) + "\n" await asyncio.sleep(0.1) return StreamingResponse( generate(), media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Transfer-Encoding": "chunked" } ) @batch_router.post("/rt_cto_nto") async def rt_cto_nto( image: UploadFile = File(...), c_list: str = Form(...), necklace_id: str = Form(...), necklace_category: str = Form(...), storename: str = Form(...) ): logger.info("-" * 50) logger.info(">>> REAL-TIME CTO-NTO STARTED <<<") logger.info(f"Parameters: storename={storename}, necklace_category={necklace_category}, " f"necklace_id={necklace_id}, clothing_list={c_list}") try: clothing_list = [item.strip() for item in c_list.split(",")] image_bytes = await image.read() source_image = Image.open(BytesIO(image_bytes)).convert("RGB") jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{necklace_category}/image/{necklace_id}.png" jewellery = Image.open(returnBytesData(url=jewellery_url)).convert("RGBA") logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<") except Exception as e: logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<") return JSONResponse( content={"error": "Error in initial setup", "details": str(e), "code": 500}, status_code=500 ) async def generate(): setup_start_time = time.time() # After mask generation mask_time = round(time.time() - setup_start_time, 2) # Encoding timing encoding_start_time = time.time() # After encoding encoding_time = round(time.time() - encoding_start_time, 2) for idx, clothing_type in enumerate(clothing_list): iteration_start_time = time.time() try: # Perform CTO cto_start_time = time.time() cto_output = replicate_run_cto({ "mask": mask_data_uri, "image": image_data_uri, "prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse", "negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly", "num_inference_steps": 25 }) cto_time = round(time.time() - cto_start_time, 2) logger.info(f">>> CTO COMPLETED for {clothing_type} in {cto_time}s <<<") # Get CTO result and process NTO nto_start_time = time.time() async with aiohttp.ClientSession() as session: async with session.get(str(cto_output[0])) as response: if response.status != 200: raise ValueError("Failed to fetch CTO output") cto_result_bytes = await response.read() with BytesIO(cto_result_bytes) as buf: cto_result_image = Image.open(buf).convert("RGB") result, headerText, mask = await pipeline.necklaceTryOn_( image=cto_result_image, jewellery=jewellery, storename=storename ) nto_time = round(time.time() - nto_start_time, 2) logger.info(f">>> NTO COMPLETED for {clothing_type} in {nto_time}s <<<") # Upload result upload_start_time = time.time() result_url = await supabase_upload_and_return_url( prefix="clothing_necklace_try_on", image=result ) upload_time = round(time.time() - upload_start_time, 2) # Stream result with detailed timing output_result = { "code": 200, "output": result_url, "timing": { "setup": mask_time, # Include setup time "encoding": encoding_time, "cto_inference": cto_time, "nto_inference": nto_time, "upload": upload_time, "total_iteration": round(time.time() - iteration_start_time, 2) }, "clothing_type": clothing_type, "progress": f"{idx + 1}/{len(clothing_list)}" } yield json.dumps(output_result) + "\n" await asyncio.sleep(0.1) del result gc.collect() except Exception as e: logger.error(f">>> PROCESSING ERROR FOR {clothing_type}: {str(e)} <<<") error_result = { "error": f"Error processing clothing {clothing_type}", "details": str(e), "code": 500, "clothing_type": clothing_type, "progress": f"{idx + 1}/{len(clothing_list)}" } yield json.dumps(error_result) + "\n" await asyncio.sleep(0.1) return StreamingResponse( generate(), media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Transfer-Encoding": "chunked" } )