NTO-TCP-HF / src /api /nto_api.py
ishworrsubedii's picture
update: inference time, added nto_cto combined
5ec57bf
"""
project @ NTO-TCP-HF
created @ 2024-10-28
author @ github/ishworrsubedii
"""
import secrets
import tempfile
import time
import cv2
import numpy as np
from PIL.ImageOps import grayscale
from fastapi.encoders import jsonable_encoder
from src.utils import supabaseGetPublicURL, deductAndTrackCredit, returnBytesData
from fastapi import File, UploadFile, Header, HTTPException, Form, Depends, APIRouter
from src.pipelines.completePipeline import Pipeline
from fastapi.responses import JSONResponse
from supabase import create_client, Client
from typing import Dict, Union, List
from io import BytesIO
from PIL import Image
import pandas as pd
import base64
import os
from pydantic import BaseModel
import replicate
import requests
from src.utils.logger import logger
import secrets
import aiohttp
import asyncio
import gc
pipeline = Pipeline()
nto_cto_router = APIRouter()
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
supabase_storage: str = os.getenv("SUPABASE_STORAGE")
cto_replicate: str = os.getenv(
"CTO")
supabase = create_client(supabase_url=url, supabase_key=key)
bucket = supabase.storage.from_("JewelMirrorOutputs")
def replicate_run_cto(input):
output = replicate.run(
cto_replicate,
input=input)
return output
class NecklaceTryOnIDEntity(BaseModel):
necklaceImageId: str
necklaceCategory: str
storename: str
api_token: str
@nto_cto_router.post("/clothingTryOnV2")
async def clothing_try_on_v2(image: UploadFile = File(...), clothing_type: str = Form(...)):
logger.info("-" * 50)
logger.info(">>> CLOTHING TRY ON V2 STARTED <<<")
logger.info(f"Parameters: clothing_type={clothing_type}")
start_time = time.time()
try:
image_bytes = await image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGB")
logger.info(">>> IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": f"Error reading image", "code": 500})
try:
mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=image)
logger.info(">>> MASK GENERATION COMPLETED <<<")
except Exception as e:
logger.error(f">>> MASK GENERATION ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error generating mask", "code": 500})
try:
mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO()
mask.save(mask_img_base_64, format="WEBP")
image.save(act_img_base_64, format="WEBP")
mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
mask_data_uri = f"data:image/webp;base64,{mask_bytes_}"
image_data_uri = f"data:image/webp;base64,{image_bytes_}"
logger.info(">>> IMAGE ENCODING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error converting images to base64", "code": 500})
input = {
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 25
}
try:
output = replicate_run_cto(input)
logger.info(">>> REPLICATE PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error running CTO Replicate", "code": 500}, status_code=500)
total_inference_time = round((time.time() - start_time), 2)
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
response = {
"code": 200,
"output": f"{output[0]}",
"inference_time": total_inference_time
}
return JSONResponse(content=response, status_code=200)
@nto_cto_router.post("/clothingTryOn")
async def clothing_try_on(image: UploadFile = File(...),
mask: UploadFile = File(...), clothing_type: str = Form(...)):
logger.info("-" * 50)
logger.info(">>> CLOTHING TRY ON STARTED <<<")
logger.info(f"Parameters: clothing_type={clothing_type}")
start_time = time.time()
try:
image_bytes = await image.read()
mask_bytes = await mask.read()
image, mask = Image.open(BytesIO(image_bytes)).convert("RGB"), Image.open(BytesIO(mask_bytes)).convert("RGB")
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": f"Error reading image or mask", "code": 500})
try:
actual_image = image.copy()
jewellery_mask = Image.fromarray(np.bitwise_and(np.array(mask), np.array(image)))
arr_orig = np.array(grayscale(mask))
image = cv2.inpaint(np.array(image), arr_orig, 15, cv2.INPAINT_TELEA)
image = Image.fromarray(image).resize((512, 512))
arr = arr_orig.copy()
mask_y = np.where(arr == arr[arr != 0][0])[0][0]
arr[mask_y:, :] = 255
mask = Image.fromarray(arr).resize((512, 512))
logger.info(">>> IMAGE PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error processing image or mask", "code": 500})
try:
mask_img_base_64, act_img_base_64 = BytesIO(), BytesIO()
mask.save(mask_img_base_64, format="WEBP")
image.save(act_img_base_64, format="WEBP")
mask_bytes_ = base64.b64encode(mask_img_base_64.getvalue()).decode("utf-8")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
mask_data_uri = f"data:image/webp;base64,{mask_bytes_}"
image_data_uri = f"data:image/webp;base64,{image_bytes_}"
logger.info(">>> IMAGE ENCODING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error encoding images", "code": 500})
input = {
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 25
}
try:
output = replicate_run_cto(input)
logger.info(">>> REPLICATE PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> REPLICATE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error running clothing try on", "code": 500}, status_code=500)
try:
response = requests.get(output[0])
output_image = Image.open(BytesIO(response.content)).resize(actual_image.size)
output_image = np.bitwise_and(np.array(output_image),
np.bitwise_not(np.array(Image.fromarray(arr_orig).convert("RGB"))))
result = Image.fromarray(np.bitwise_or(np.array(output_image), np.array(jewellery_mask)))
in_mem_file = BytesIO()
result.save(in_mem_file, format="WEBP", quality=85)
base_64_output = base64.b64encode(in_mem_file.getvalue()).decode('utf-8')
total_inference_time = round((time.time() - start_time), 2)
logger.info(">>> OUTPUT IMAGE PROCESSING COMPLETED <<<")
response = {
"output": f"data:image/WEBP;base64,{base_64_output}",
"code": 200,
"inference_time": total_inference_time
}
except Exception as e:
logger.error(f">>> OUTPUT IMAGE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": f"Error processing output image", "code": 500})
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
@nto_cto_router.post("/productData/{storeId}")
async def product_data(
storeId: str,
filterattributes: List[Dict[str, Union[str, int, float]]],
storename: str = Header(default="default")
):
"""Filters product data based on the provided attributes and store ID."""
try:
response = supabase.table('MagicMirror').select("*").execute()
df = pd.DataFrame(response.dict()["data"])
df = df[df["StoreName"] == storeId]
# Preprocess filterattributes to handle multiple or duplicated attributes
attribute_dict = {}
for attr in filterattributes:
key, value = list(attr.items())[
0] # This will convert the dictionary into a list and get the key and value.
if key in attribute_dict: # This will check if the key is already present in the dictionary.
if isinstance(attribute_dict[key],
list): # This will create a list if there are multiple values for the same key and we are doing or operation.
attribute_dict[key].append(value) # This will append the value to the list.
else:
attribute_dict[key] = [attribute_dict[key], value]
else:
attribute_dict[key] = [value] # This will create a list if there is only one value for the key.
priceFrom = None
priceTo = None
weightFrom = None
weightTo = None
weightAscending = None
priceAscending = None
idAscending = None
dateAscending = None
for key, value in attribute_dict.items():
if key == 'priceFrom':
priceFrom = value[0]
elif key == "priceTo":
priceTo = value[0]
elif key == "priceAscending":
priceAscending = value[0]
elif key == "weightFrom":
weightFrom = value[0]
elif key == "weightTo":
weightTo = value[0]
elif key == "weightAscending":
weightAscending = value[0]
elif key == "idAscending":
idAscending = value[0]
elif key == "dateAscending":
dateAscending = value[0]
df["image_url"] = df.apply(
lambda row: supabaseGetPublicURL(f"{row['StoreName']}/{row['Category']}/image/{row['Id']}.png"),
axis=1)
df["thumbnail_url"] = df.apply(
lambda row: supabaseGetPublicURL(f"{row['StoreName']}/{row['Category']}/thumbnail/{row['Id']}.png"),
axis=1)
df.reset_index(drop=True, inplace=True)
for key, values in attribute_dict.items():
try:
df = df[df[key].isin(values)]
except:
pass
# applying filter for price and weight
if priceFrom is not None:
df = df[df["Price"] >= priceFrom]
if priceTo is not None:
df = df[df["Price"] <= priceTo]
if weightFrom is not None:
df = df[df["Weight"] >= weightFrom]
if weightTo is not None:
df = df[df["Weight"] <= weightTo]
if priceAscending is not None:
if priceAscending == 1:
value = True
else:
value = False
df = df.sort_values(by="Price", ascending=value)
if weightAscending is not None:
if weightAscending == 1:
value = True
else:
value = False
df = df.sort_values(by="Weight", ascending=value)
if idAscending is not None:
if idAscending == 1:
value = True
else:
value = False
df = df.sort_values(by="Id", ascending=value)
if dateAscending is not None:
if dateAscending == 1:
value = True
else:
value = False
df = df.sort_values(by="UpdatedAt", ascending=value)
df = df.drop(["CreatedAt", "EstimatedPrice"], axis=1)
result = {}
for _, row in df.iterrows():
category = row["Category"]
if category not in result: # this is for checking duplicate category
result[category] = []
result[category].append(row.to_dict())
return JSONResponse(content=jsonable_encoder(result)) # this will convert the result into json format.
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch or process data: {e}")
async def parse_necklace_try_on_id(necklaceImageId: str = Form(...),
necklaceCategory: str = Form(...),
storename: str = Form(...),
api_token: str = Form(...)) -> NecklaceTryOnIDEntity:
return NecklaceTryOnIDEntity(
necklaceImageId=necklaceImageId,
necklaceCategory=necklaceCategory,
storename=storename,
api_token=api_token
)
async def supabase_upload_and_return_url(prefix: str, image: Image.Image, quality: int = 85):
try:
filename = f"{prefix}_{secrets.token_hex(8)}.webp"
loop = asyncio.get_event_loop()
image_bytes = await loop.run_in_executor(
None,
process_image,
image,
quality
)
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "image/webp"
}
upload_url = f"{url}/storage/v1/object/JewelMirrorOutputs/{filename}"
async with session.post(
upload_url,
data=image_bytes,
headers=headers
) as response:
if response.status != 200:
raise Exception(f"Upload failed with status {response.status}")
return bucket.get_public_url(filename)
except Exception as e:
logger.error(f"Failed to upload image: {str(e)}")
return None
def process_image(image: Image.Image, quality: int) -> bytes:
try:
if image.mode in ['RGBA', 'P']:
image = image.convert('RGB')
max_size = 3000
if image.width > max_size or image.height > max_size:
ratio = min(max_size / image.width, max_size / image.height)
new_size = (int(image.width * ratio), int(image.height * ratio))
image = image.resize(new_size, Image.Resampling.LANCZOS)
with BytesIO() as buffer:
image.save(
buffer,
format='WEBP',
quality=quality,
optimize=True,
method=6
)
return buffer.getvalue()
except Exception as e:
logger.error(f"Image processing failed: {str(e)}")
raise
@nto_cto_router.post("/necklaceTryOnID")
async def necklace_try_on_id(necklace_try_on_id: NecklaceTryOnIDEntity = Depends(parse_necklace_try_on_id),
image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> NECKLACE TRY ON ID STARTED <<<")
logger.info(f"Parameters: storename={necklace_try_on_id.storename}, "
f"necklaceCategory={necklace_try_on_id.necklaceCategory}, "
f"necklaceImageId={necklace_try_on_id.necklaceImageId}")
start_time = time.time()
try:
imageBytes = await image.read()
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{necklace_try_on_id.storename}/{necklace_try_on_id.necklaceCategory}/image/{necklace_try_on_id.necklaceImageId}.png"
image, jewellery = Image.open(BytesIO(imageBytes)), Image.open(returnBytesData(url=jewellery_url))
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(content={
"error": f"The requested resource (Image, necklace category, or store) is not available. Please verify the availability and try again",
"code": 404}, status_code=404)
try:
result, headetText, mask = await pipeline.necklaceTryOn_(image=image, jewellery=jewellery,
storename=necklace_try_on_id.storename)
if result is None:
logger.error(">>> NO FACE DETECTED IN THE IMAGE <<<")
return JSONResponse(
content={"error": "No face detected in the image please try again with a different image",
"code": 400}, status_code=400)
logger.info(">>> NECKLACE TRY ON PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> NECKLACE TRY ON PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error during necklace try-on process", "code": 500},
status_code=500)
try:
logger.info(">>> SAVING RESULT IMAGES <<<")
start_time_saving = time.time()
# Upload both images concurrently
upload_tasks = [
supabase_upload_and_return_url(prefix="necklace_try_on", image=result),
supabase_upload_and_return_url(prefix="necklace_try_on_mask", image=mask)
]
result_url, mask_url = await asyncio.gather(*upload_tasks)
if not result_url or not mask_url:
raise Exception("Failed to upload one or both images")
logger.info(f">>> RESULT IMAGES SAVED IN {round((time.time() - start_time_saving), 2)}s <<<")
logger.info(">>> RESULT IMAGES SAVED <<<")
except Exception as e:
logger.error(f">>> RESULT SAVING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error saving result images", "code": 500}, status_code=500)
try:
try:
total_backend_time = round((time.time() - start_time), 2)
response = {
"code": 200,
"output": f"{result_url}",
"mask": f"{mask_url}",
"inference_time": total_backend_time
}
except Exception as e:
logger.error(f">>> RESPONSE GENERATION ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error generating response", "code": 500}, status_code=500)
logger.info(f">>> TOTAL INFERENCE TIME: {total_backend_time}s <<<")
logger.info(f">>> NECKLACE TRY ON COMPLETED <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
finally:
if 'result' in locals(): del result
gc.collect()
@nto_cto_router.post("/canvasPoints")
async def canvas_points(necklace_try_on_id: NecklaceTryOnIDEntity = Depends(parse_necklace_try_on_id),
image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> CANVAS POINTS STARTED <<<")
logger.info(f"Parameters: storename={necklace_try_on_id.storename}, "
f"necklaceCategory={necklace_try_on_id.necklaceCategory}, "
f"necklaceImageId={necklace_try_on_id.necklaceImageId}")
start_time = time.time()
try:
imageBytes = await image.read()
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{necklace_try_on_id.storename}/{necklace_try_on_id.necklaceCategory}/image/{necklace_try_on_id.necklaceImageId}.png"
image, jewellery = Image.open(BytesIO(imageBytes)), Image.open(returnBytesData(url=jewellery_url))
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(content={
"error": f"The requested resource (Image, necklace category, or store) is not available. Please verify the availability and try again. Error",
"code": 404}, status_code=404)
try:
response = await pipeline.canvasPoint(image=image, jewellery=jewellery, storename=necklace_try_on_id.storename)
response = {"code": 200, "output": response}
logger.info(">>> CANVAS POINTS PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> CANVAS POINTS PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error during canvas point process", "code": 500},
status_code=500)
try:
creditResponse = deductAndTrackCredit(storename=necklace_try_on_id.storename, endpoint="/necklaceTryOnID")
if creditResponse == "No Credits Available":
logger.error(">>> NO CREDITS REMAINING <<<")
return JSONResponse(content={"error": "No Credits Remaining", "code": 402}, status_code=402)
logger.info(">>> CREDITS DEDUCTED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> CREDIT DEDUCTION ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error deducting credits", "code": 500}, status_code=500)
total_inference_time = round((time.time() - start_time), 2)
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(f">>> CANVAS POINTS COMPLETED <<<")
logger.info("-" * 50)
return JSONResponse(status_code=200, content=response)
@nto_cto_router.post("/necklaceTryOnWithPoints")
async def necklace_try_on_with_points(necklace_try_on_id: NecklaceTryOnIDEntity = Depends(parse_necklace_try_on_id),
image: UploadFile = File(...),
left_x: int = Form(...),
left_y: int = Form(...),
right_x: int = Form(...),
right_y: int = Form(...)):
logger.info("-" * 50)
logger.info(">>> NECKLACE TRY ON WITH POINTS STARTED <<<")
logger.info(f"Parameters: storename={necklace_try_on_id.storename}, "
f"necklaceCategory={necklace_try_on_id.necklaceCategory}, "
f"necklaceImageId={necklace_try_on_id.necklaceImageId}, "
f"left_point=({left_x}, {left_y}), right_point=({right_x}, {right_y})")
start_time = time.time()
try:
imageBytes = await image.read()
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{necklace_try_on_id.storename}/{necklace_try_on_id.necklaceCategory}/image/{necklace_try_on_id.necklaceImageId}.png"
image, jewellery = Image.open(BytesIO(imageBytes)), Image.open(returnBytesData(url=jewellery_url))
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(content={
"error": f"The requested resource (Image, necklace category, or store) is not available. Please verify the availability and try again. Error: {str(e)}",
"code": 404}, status_code=404)
try:
result, headerText, mask = await pipeline.necklaceTryOnWithPoints_(
image=image, jewellery=jewellery, left_shoulder=(left_x, left_y), right_shoulder=(right_x, right_y),
storename=necklace_try_on_id.storename
)
logger.info(">>> NECKLACE TRY ON PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> NECKLACE TRY ON PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error during necklace try-on process", "code": 500},
status_code=500)
try:
inMemFile = BytesIO()
inMemFileMask = BytesIO()
result.save(inMemFile, format="WEBP", quality=85)
mask.save(inMemFileMask, format="WEBP", quality=85)
outputBytes = inMemFile.getvalue()
maskBytes = inMemFileMask.getvalue()
logger.info(">>> RESULT IMAGES SAVED <<<")
except Exception as e:
logger.error(f">>> RESULT SAVING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error saving result images", "code": 500}, status_code=500)
try:
creditResponse = deductAndTrackCredit(storename=necklace_try_on_id.storename, endpoint="/necklaceTryOnID")
total_inference_time = round((time.time() - start_time), 2)
response = {
"code": 200,
"output": f"data:image/WEBP;base64,{base64.b64encode(outputBytes).decode('utf-8')}",
"mask": f"data:image/WEBP;base64,{base64.b64encode(maskBytes).decode('utf-8')}",
"inference_time": total_inference_time
}
if creditResponse == "No Credits Available":
logger.error(">>> NO CREDITS REMAINING <<<")
response = {"error": "No Credits Remaining"}
return JSONResponse(content=response, status_code=402)
logger.info(">>> CREDITS DEDUCTED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> CREDIT DEDUCTION ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error deducting credits", "code": 500}, status_code=500)
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(f">>> NECKLACE TRY ON WITH POINTS COMPLETED <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
@nto_cto_router.post("/clothingAndNecklaceTryOn")
async def clothing_and_necklace_try_on(
image: UploadFile = File(...),
necklaceImageId: str = Form(...),
necklaceCategory: str = Form(...),
storename: str = Form(...),
clothing_type: str = Form(...)
):
logger.info("-" * 50)
logger.info(">>> CLOTHING AND NECKLACE TRY ON STARTED <<<")
logger.info(f"Parameters: storename={storename}, "
f"necklaceCategory={necklaceCategory}, "
f"necklaceImageId={necklaceImageId}, "
f"clothing_type={clothing_type}")
start_time = time.time()
def image_to_base64(img: Image.Image) -> str:
buffer = BytesIO()
img.save(buffer, format="WEBP", quality=85, optimize=True)
return f"data:image/webp;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
try:
person_bytes = await image.read()
person_image = Image.open(BytesIO(person_bytes)).convert("RGB").resize((512, 512))
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{necklaceCategory}/image/{necklaceImageId}.png"
necklace_image = Image.open(returnBytesData(url=jewellery_url)).convert("RGBA")
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
mask, left_point, right_point = await pipeline.shoulderPointMaskGeneration_(image=person_image)
logger.info(">>> MASK AND POINTS GENERATION COMPLETED <<<")
mask_data_uri, image_data_uri = await asyncio.gather(
asyncio.to_thread(image_to_base64, mask),
asyncio.to_thread(image_to_base64, person_image)
)
cto_output = replicate_run_cto({
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 20
})
if not cto_output or not isinstance(cto_output, (list, tuple)) or not cto_output[0]:
raise ValueError("Invalid output from clothing try-on")
async with aiohttp.ClientSession() as session:
async with session.get(str(cto_output[0])) as response:
if response.status != 200:
raise HTTPException(status_code=response.status, detail="Failed to fetch CTO output")
cto_result_bytes = await response.read()
with BytesIO(cto_result_bytes) as buf:
cto_result_image = Image.open(buf).convert("RGB")
result, headerText, _ = await pipeline.necklaceTryOnWithPoints_(
image=cto_result_image,
jewellery=necklace_image,
left_shoulder=left_point,
right_shoulder=right_point,
storename=storename
)
if result is None:
raise ValueError("Failed to process necklace try-on")
result_url = await supabase_upload_and_return_url(prefix="clothing_necklace_try_on", image=result)
if not result_url:
raise ValueError("Failed to upload result image")
response = {
"code": 200,
"output": result_url,
"inference_time": round((time.time() - start_time), 2)
}
except ValueError as ve:
logger.error(f">>> PROCESSING ERROR: {str(ve)} <<<")
return JSONResponse(status_code=400, content={"error": str(ve), "code": 400})
except Exception as e:
logger.error(f">>> PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": "Error during image processing", "code": 500})
finally:
gc.collect()
logger.info(f">>> TOTAL INFERENCE TIME: {response['inference_time']}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
@nto_cto_router.post("/m_nto")
async def mannequin_nto(necklace_try_on_id: NecklaceTryOnIDEntity = Depends(parse_necklace_try_on_id),
image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> MANNEQUIN NTO STARTED <<<")
logger.info(f"Parameters: storename={necklace_try_on_id.storename}, "
f"necklaceCategory={necklace_try_on_id.necklaceCategory}, "
f"necklaceImageId={necklace_try_on_id.necklaceImageId}")
start_time = time.time()
try:
imageBytes = await image.read()
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{necklace_try_on_id.storename}/{necklace_try_on_id.necklaceCategory}/image/{necklace_try_on_id.necklaceImageId}.png"
image, jewellery = Image.open(BytesIO(imageBytes)), Image.open(returnBytesData(url=jewellery_url))
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(content={
"error": f"The requested resource (Image, necklace category, or store) is not available. Please verify the availability and try again",
"code": 404}, status_code=404)
try:
result, resized_img = await pipeline.necklaceTryOnMannequin_(image=image, jewellery=jewellery)
if result is None:
logger.error(">>> NO FACE DETECTED IN THE IMAGE <<<")
return JSONResponse(
content={"error": "No face detected in the image please try again with a different image",
"code": 400}, status_code=400)
logger.info(">>> NECKLACE TRY ON PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> NECKLACE TRY ON PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error during necklace try-on process", "code": 500},
status_code=500)
try:
logger.info(">>> SAVING RESULT IMAGES <<<")
start_time_saving = time.time()
# Upload both images concurrently
upload_tasks = supabase_upload_and_return_url(prefix="necklace_try_on", image=result)
result_url = await asyncio.gather(upload_tasks)
if result_url[0] is None:
raise Exception("Failed to upload one or both images")
logger.info(f">>> RESULT IMAGES SAVED IN {round((time.time() - start_time_saving), 2)}s <<<")
logger.info(">>> RESULT IMAGES SAVED <<<")
except Exception as e:
logger.error(f">>> RESULT SAVING ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error saving result images", "code": 500}, status_code=500)
try:
try:
total_backend_time = round((time.time() - start_time), 2)
response = {
"code": 200,
"output": f"{result_url[0]}",
"inference_time": total_backend_time
}
except Exception as e:
logger.error(f">>> RESPONSE GENERATION ERROR: {str(e)} <<<")
return JSONResponse(content={"error": f"Error generating response", "code": 500}, status_code=500)
logger.info(f">>> TOTAL INFERENCE TIME: {total_backend_time}s <<<")
logger.info(f">>> NECKLACE TRY ON COMPLETED :: {necklace_try_on_id.storename} <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
finally:
if 'result' in locals(): del result
gc.collect()
@nto_cto_router.post("/nto_mto_combined")
async def combined_cto_nto(
image: UploadFile = File(...),
clothing_type: str = Form(...),
necklace_id: str = Form(...),
necklace_category: str = Form(...),
storename: str = Form(...)
):
logger.info("-" * 50)
logger.info(">>> COMBINED CTO-NTO STARTED <<<")
logger.info(f"Parameters: storename={storename}, necklace_category={necklace_category}, "
f"necklace_id={necklace_id}, clothing_type={clothing_type}")
start_time = time.time()
def image_to_base64(img: Image.Image) -> str:
buffer = BytesIO()
img.save(buffer, format="WEBP", quality=85, optimize=True)
return f"data:image/webp;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
try:
# Load source image and necklace
image_bytes = await image.read()
source_image = Image.open(BytesIO(image_bytes)).convert("RGB").resize((512, 512))
jewellery_url = f"https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/Stores/{storename}/{necklace_category}/image/{necklace_id}.png"
necklace_image = Image.open(returnBytesData(url=jewellery_url)).convert("RGBA")
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(content={
"error": "Error loading images. Please verify the image and necklace availability.",
"code": 404
}, status_code=404)
try:
# Generate mask and shoulder points
mask_start_time = time.time()
mask, _, _ = await pipeline.shoulderPointMaskGeneration_(image=source_image)
mask_time = round(time.time() - mask_start_time, 2)
logger.info(f">>> MASK GENERATION COMPLETED in {mask_time}s <<<")
# Convert images to base64
encoding_start_time = time.time()
mask_data_uri, image_data_uri = await asyncio.gather(
asyncio.to_thread(image_to_base64, mask),
asyncio.to_thread(image_to_base64, source_image)
)
encoding_time = round(time.time() - encoding_start_time, 2)
logger.info(f">>> IMAGE ENCODING COMPLETED in {encoding_time}s <<<")
# Perform CTO
cto_start_time = time.time()
cto_output = replicate_run_cto({
"mask": mask_data_uri,
"image": image_data_uri,
"prompt": f"Dull {clothing_type}, non-reflective clothing, properly worn, natural setting, elegant, natural look, neckline without jewellery, simple, perfect eyes, perfect face, perfect body, high quality, realistic, photorealistic, high resolution,traditional full sleeve blouse",
"negative_prompt": "necklaces, jewellery, jewelry, necklace, neckpiece, garland, chain, neck wear, jewelled neck, jeweled neck, necklace on neck, jewellery on neck, accessories, watermark, text, changed background, wider body, narrower body, bad proportions, extra limbs, mutated hands, changed sizes, altered proportions, unnatural body proportions, blury, ugly",
"num_inference_steps": 20
})
cto_time = round(time.time() - cto_start_time, 2)
logger.info(f">>> CTO COMPLETED in {cto_time}s <<<")
if not cto_output or not isinstance(cto_output, (list, tuple)) or not cto_output[0]:
raise ValueError("Invalid output from clothing try-on")
# Get CTO result image
async with aiohttp.ClientSession() as session:
async with session.get(str(cto_output[0])) as response:
if response.status != 200:
raise HTTPException(status_code=response.status, detail="Failed to fetch CTO output")
cto_result_bytes = await response.read()
# Perform NTO
nto_start_time = time.time()
with BytesIO(cto_result_bytes) as buf:
cto_result_image = Image.open(buf).convert("RGB")
result, headerText, _ = await pipeline.necklaceTryOn_(
image=cto_result_image,
jewellery=necklace_image,
storename=storename
)
nto_time = round(time.time() - nto_start_time, 2)
logger.info(f">>> NTO COMPLETED in {nto_time}s <<<")
if result is None:
raise ValueError("Failed to process necklace try-on")
upload_start_time = time.time()
result_url = await supabase_upload_and_return_url(
prefix="combined_cto_nto",
image=result
)
upload_time = round(time.time() - upload_start_time, 2)
logger.info(f">>> RESULT UPLOADED in {upload_time}s <<<")
if not result_url:
raise ValueError("Failed to upload result image")
total_time = round(time.time() - start_time, 2)
response = {
"code": 200,
"output": result_url,
"timing": {
"mask_generation": mask_time,
"encoding": encoding_time,
"cto_inference": cto_time,
"nto_inference": nto_time,
"upload": upload_time,
"total": total_time
}
}
except ValueError as ve:
logger.error(f">>> PROCESSING ERROR: {str(ve)} <<<")
return JSONResponse(status_code=400, content={"error": str(ve), "code": 400})
except Exception as e:
logger.error(f">>> PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": "Error during image processing", "code": 500})
finally:
if 'result' in locals(): del result
gc.collect()
logger.info(f">>> TOTAL PROCESSING TIME: {total_time}s <<<")
logger.info(">>> COMBINED CTO-NTO COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)