NTO-TCP-HF / src /api /batch_api.py
ishworrsubedii's picture
update: inference time, added nto_cto combined
5ec57bf
"""
project @ CTO_TCP_ZERO_GPU
created @ 2024-11-14
author @ github.com/ishworrsubedii
"""
import base64
import gc
import time
from io import BytesIO
import json
import asyncio
import aiohttp
from PIL import Image
from fastapi import File, UploadFile, Form
from fastapi.routing import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List
from fastapi.responses import JSONResponse
from src.utils import returnBytesData
from src.utils.logger import logger
from src.api.nto_api import pipeline, replicate_run_cto, supabase_upload_and_return_url
batch_router = APIRouter()
class ClothingRequest(BaseModel):
c_list: List[str]
@batch_router.post("/rt_cto")
async def rt_cto(
image: UploadFile = File(...),
c_list: str = Form(...)
):
logger.info("-" * 50)
logger.info(">>> REAL-TIME CTO STARTED <<<")
logger.info(f"Parameters: clothing_list={c_list}")
setup_start_time = time.time()
try:
clothing_list = [item.strip() for item in c_list.split(",")]
image_bytes = await image.read()
pil_image = Image.open(BytesIO(image_bytes)).convert("RGB")
setup_time = round(time.time() - setup_start_time, 2)
logger.info(f">>> IMAGE LOADED SUCCESSFULLY in {setup_time}s <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return {"error": "Error reading image", "code": 500}
async def generate():
logger.info("-" * 50)
logger.info(">>> CLOTHING TRY ON V2 STARTED <<<")
# Mask generation timing
mask_start_time = time.time()
try:
mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=pil_image)
mask_time = round(time.time() - mask_start_time, 2)
logger.info(f">>> MASK GENERATION COMPLETED in {mask_time}s <<<")
except Exception as e:
logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<")
yield json.dumps({"error": "Error generating mask", "code": 500}) + "\n"
await asyncio.sleep(0.1)
return
# Encoding timing
encoding_start_time = time.time()
try:
mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO()
mask.save(mask_img_base_64, format="WEBP")
pil_image.save(act_img_base_64, format="WEBP")
mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
mask_data_uri = f"data:image/webp;base64,{mask_bytes_}"
image_data_uri = f"data:image/webp;base64,{image_bytes_}"
encoding_time = round(time.time() - encoding_start_time, 2)
logger.info(f">>> IMAGE ENCODING COMPLETED in {encoding_time}s <<<")
except Exception as e:
logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
yield json.dumps({"error": "Error converting images to base64", "code": 500}) + "\n"
await asyncio.sleep(0.1)
return
for idx, clothing_type in enumerate(clothing_list):
if not clothing_type:
continue
iteration_start_time = time.time()
try:
inference_start_time = time.time()
output = replicate_run_cto({
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 25
})
inference_time = round(time.time() - inference_start_time, 2)
logger.info(f">>> REPLICATE PROCESSING COMPLETED FOR {clothing_type} in {inference_time}s <<<")
output_url = str(output[0]) if output and output[0] else None
iteration_time = round(time.time() - iteration_start_time, 2)
result = {
"code": 200,
"output": output_url,
"timing": {
"setup": setup_time,
"mask_generation": mask_time,
"encoding": encoding_time,
"inference": inference_time,
"iteration": iteration_time
},
"clothing_type": clothing_type,
"progress": f"{idx + 1}/{len(clothing_list)}"
}
yield json.dumps(result) + "\n"
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<")
error_result = {
"error": "Error running CTO Replicate",
"details": str(e),
"code": 500,
"clothing_type": clothing_type,
"progress": f"{idx + 1}/{len(clothing_list)}"
}
yield json.dumps(error_result) + "\n"
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Transfer-Encoding": "chunked"
}
)
@batch_router.post("/rt_nto")
async def rt_nto(
image: UploadFile = File(...),
necklace_id_list: str = Form(...),
category_list: str = Form(...),
storename: str = Form(...)
):
logger.info("-" * 50)
logger.info(">>> REAL-TIME NECKLACE TRY ON STARTED <<<")
logger.info(f"Parameters: storename={storename}, categories={category_list}, necklace_ids={necklace_id_list}")
try:
# Parse the lists
necklace_ids = [id.strip() for id in necklace_id_list.split(",")]
categories = [cat.strip() for cat in category_list.split(",")]
if len(necklace_ids) != len(categories):
return JSONResponse(
content={"error": "Number of necklace IDs must match number of categories", "code": 400},
status_code=400
)
# Load the source image
image_bytes = await image.read()
source_image = Image.open(BytesIO(image_bytes))
logger.info(">>> SOURCE IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<")
return JSONResponse(
content={"error": "Error in initial setup", "details": str(e), "code": 500},
status_code=500
)
async def generate():
setup_start_time = time.time() # Add setup timing
# After loading images
setup_time = round(time.time() - setup_start_time, 2)
logger.info(f">>> SETUP COMPLETED in {setup_time}s <<<")
for idx, (necklace_id, category) in enumerate(zip(necklace_ids, categories)):
iteration_start_time = time.time()
try:
# Load jewellery timing
jewellery_load_start = time.time()
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{category}/image/{necklace_id}.png"
jewellery = Image.open(returnBytesData(url=jewellery_url))
jewellery_time = round(time.time() - jewellery_load_start, 2)
logger.info(f">>> JEWELLERY LOADED in {jewellery_time}s <<<")
# NTO timing
nto_start_time = time.time()
result, headetText, mask = await pipeline.necklaceTryOn_(
image=source_image,
jewellery=jewellery,
storename=storename
)
nto_time = round(time.time() - nto_start_time, 2)
# Upload timing
upload_start_time = time.time()
upload_tasks = [
supabase_upload_and_return_url(prefix="necklace_try_on", image=result),
supabase_upload_and_return_url(prefix="necklace_try_on_mask", image=mask)
]
result_url, mask_url = await asyncio.gather(*upload_tasks)
upload_time = round(time.time() - upload_start_time, 2)
result = {
"code": 200,
"output": result_url,
"mask": mask_url,
"timing": {
"setup": setup_time,
"jewellery_load": jewellery_time,
"nto_inference": nto_time,
"upload": upload_time,
"total_iteration": round(time.time() - iteration_start_time, 2)
},
"necklace_id": necklace_id,
"category": category,
"progress": f"{idx + 1}/{len(necklace_ids)}"
}
yield json.dumps(result) + "\n"
await asyncio.sleep(0.1)
del result
del mask
gc.collect()
except Exception as e:
logger.error(f">>> PROCESSING ERROR FOR {necklace_id}: {str(e)} <<<")
error_result = {
"error": f"Error processing necklace {necklace_id}",
"details": str(e),
"code": 500,
"necklace_id": necklace_id,
"category": category,
"progress": f"{idx + 1}/{len(necklace_ids)}"
}
yield json.dumps(error_result) + "\n"
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Transfer-Encoding": "chunked"
}
)
@batch_router.post("/rt_cto_nto")
async def rt_cto_nto(
image: UploadFile = File(...),
c_list: str = Form(...),
necklace_id: str = Form(...),
necklace_category: str = Form(...),
storename: str = Form(...)
):
logger.info("-" * 50)
logger.info(">>> REAL-TIME CTO-NTO STARTED <<<")
logger.info(f"Parameters: storename={storename}, necklace_category={necklace_category}, "
f"necklace_id={necklace_id}, clothing_list={c_list}")
try:
clothing_list = [item.strip() for item in c_list.split(",")]
image_bytes = await image.read()
source_image = Image.open(BytesIO(image_bytes)).convert("RGB")
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{necklace_category}/image/{necklace_id}.png"
jewellery = Image.open(returnBytesData(url=jewellery_url)).convert("RGBA")
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> INITIAL SETUP ERROR: {str(e)} <<<")
return JSONResponse(
content={"error": "Error in initial setup", "details": str(e), "code": 500},
status_code=500
)
async def generate():
setup_start_time = time.time()
# After mask generation
mask_time = round(time.time() - setup_start_time, 2)
# Encoding timing
encoding_start_time = time.time()
# After encoding
encoding_time = round(time.time() - encoding_start_time, 2)
for idx, clothing_type in enumerate(clothing_list):
iteration_start_time = time.time()
try:
# Perform CTO
cto_start_time = time.time()
cto_output = replicate_run_cto({
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 25
})
cto_time = round(time.time() - cto_start_time, 2)
logger.info(f">>> CTO COMPLETED for {clothing_type} in {cto_time}s <<<")
# Get CTO result and process NTO
nto_start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(str(cto_output[0])) as response:
if response.status != 200:
raise ValueError("Failed to fetch CTO output")
cto_result_bytes = await response.read()
with BytesIO(cto_result_bytes) as buf:
cto_result_image = Image.open(buf).convert("RGB")
result, headerText, mask = await pipeline.necklaceTryOn_(
image=cto_result_image,
jewellery=jewellery,
storename=storename
)
nto_time = round(time.time() - nto_start_time, 2)
logger.info(f">>> NTO COMPLETED for {clothing_type} in {nto_time}s <<<")
# Upload result
upload_start_time = time.time()
result_url = await supabase_upload_and_return_url(
prefix="clothing_necklace_try_on",
image=result
)
upload_time = round(time.time() - upload_start_time, 2)
# Stream result with detailed timing
output_result = {
"code": 200,
"output": result_url,
"timing": {
"setup": mask_time, # Include setup time
"encoding": encoding_time,
"cto_inference": cto_time,
"nto_inference": nto_time,
"upload": upload_time,
"total_iteration": round(time.time() - iteration_start_time, 2)
},
"clothing_type": clothing_type,
"progress": f"{idx + 1}/{len(clothing_list)}"
}
yield json.dumps(output_result) + "\n"
await asyncio.sleep(0.1)
del result
gc.collect()
except Exception as e:
logger.error(f">>> PROCESSING ERROR FOR {clothing_type}: {str(e)} <<<")
error_result = {
"error": f"Error processing clothing {clothing_type}",
"details": str(e),
"code": 500,
"clothing_type": clothing_type,
"progress": f"{idx + 1}/{len(clothing_list)}"
}
yield json.dumps(error_result) + "\n"
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Transfer-Encoding": "chunked"
}
)