Spaces:
Sleeping
Sleeping
| import mimetypes | |
| import os | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import List | |
| import requests | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from starlette.responses import JSONResponse | |
| from supabase import create_client | |
| from src.components.each_necklace_video_gen import EachVideoCreator | |
| from src.components.product_page_nto_cto_gen import ProductPageImageGeneration | |
| from src.utils.logs import logger | |
| supabase_url = os.getenv('SUPABASE_URL') | |
| supabase_key = os.getenv('SUPABASE_KEY') | |
| supabase = create_client(supabase_url, supabase_key) | |
| prod_page = ProductPageImageGeneration() | |
| class MakeupColors: | |
| lipstick: str | |
| eyeliner: str | |
| eyeshadow: str | |
| app = FastAPI() | |
| RESOURCES_DIR = "resources" | |
| os.makedirs(RESOURCES_DIR, exist_ok=True) | |
| TEMP_VIDEO_DIR = f"{RESOURCES_DIR}/temp_video" | |
| os.environ['MOVIEPY_TEMP_DIR'] = '/tmp/moviepy' | |
| def upload_to_supabase(video_path, bucket_name="JewelmirrorVideoGeneration", model_name="MOD"): | |
| """ | |
| Upload video to Supabase with organized folder structure. | |
| Format: {bucket_name}/{year}/{month}/{day}/{model_name}/{video_filename} | |
| """ | |
| logger.info(f"Uploading video to Supabase: {video_path}") | |
| try: | |
| # Check if file exists | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| # Get current date for folder structure | |
| current_date = datetime.now() | |
| year = current_date.strftime("%Y") | |
| month = current_date.strftime("%m") | |
| day = current_date.strftime("%d") | |
| unique_id = os.urandom(8).hex() | |
| original_filename = os.path.basename(video_path) | |
| filename, ext = os.path.splitext(original_filename) | |
| structured_filename = f"{filename}_{unique_id}{ext}" | |
| structured_path = f"{year}/{month}/{day}/{model_name}/{structured_filename}" | |
| content_type = mimetypes.guess_type(video_path)[0] or 'video/mp4' | |
| options = { | |
| "content-type": content_type, | |
| "x-upsert": "true", | |
| "cache-control": "max-age=3600" | |
| } | |
| logger.info(f"Uploading video with structured path: {structured_path}") | |
| # Read and upload file | |
| with open(video_path, 'rb') as f: | |
| file_data = f.read() | |
| # Upload to Supabase | |
| supabase.storage.from_(bucket_name).upload( | |
| path=structured_path, | |
| file=file_data, | |
| file_options=options | |
| ) | |
| public_url = supabase.storage.from_(bucket_name).get_public_url(structured_path) | |
| # Verify upload | |
| response = requests.head(public_url) | |
| if response.status_code != 200: | |
| logger.error(f"Upload verification failed: {response.status_code}") | |
| raise Exception(f"Upload verification failed: {response.status_code}") | |
| logger.info(f"Video uploaded successfully: {public_url}") | |
| return public_url | |
| except Exception as e: | |
| logger.error(f"Error uploading to Supabase: {str(e)}") | |
| return None | |
| def get_structured_path(video_path, model_name="MOD"): | |
| current_date = datetime.now() | |
| unique_id = os.urandom(8).hex() | |
| filename = os.path.basename(video_path) | |
| name, ext = os.path.splitext(filename) | |
| return f"{current_date.strftime('%Y')}/{current_date.strftime('%m')}/{current_date.strftime('%d')}/{model_name}/{name}_{unique_id}{ext}" | |
| def download_image(url): | |
| logger.info(f"Downloading image from {url}") | |
| try: | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
| with open(temp_file.name, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| logger.info(f"Image downloaded successfully: {temp_file.name}") | |
| return temp_file.name | |
| except Exception as e: | |
| logger.error(f"Error downloading image from {url}: {str(e)}") | |
| print(f"Error downloading image from {url}: {str(e)}") | |
| return None | |
| # | |
| # class VideoGenerator(BaseModel): | |
| # necklace_image: str | |
| # necklace_try_on_output_images: list[str] | |
| # clothing_output_images: list[str] | |
| # makeup_output_images: list[str] | |
| # intro_video_path: str = "JewelMirror_intro.mp4" | |
| # background_audio_path: str = "TraditionalIndianVlogMusic.mp3" | |
| # font_path: str = "PlayfairDisplay-VariableFont.ttf" | |
| # image_display_duration: float = 2.5 | |
| # fps: int = 30 | |
| # necklace_title: str = "Necklace Try-On" | |
| # clothing_title: str = "Clothing Try-On" | |
| # makeup_title: str = "Makeup Try-On" | |
| # necklace_image_title: str = "Necklace Preview" | |
| # nto_image_title: str = "Necklace Try-On" | |
| # nto_cto_image_title: str = "Clothing Try-On" | |
| # makeup_image_title: str = "Makeup Try-On" | |
| # transition_duration: float = 0.5 | |
| # | |
| # | |
| # @app.post("/createvideo/") | |
| # async def create_video(request: VideoGenerator): | |
| # logger.info(f"Creating video with request: {request.dict()}") | |
| # start_time = time.time() | |
| # try: | |
| # logger.info("Downloading images...") | |
| # temp_files = { | |
| # 'necklace': download_image(request.necklace_image), | |
| # 'necklace_tryon': [download_image(url) for url in request.necklace_try_on_output_images], | |
| # 'clothing': [download_image(url) for url in request.clothing_output_images], | |
| # 'makeup': [download_image(url) for url in request.makeup_output_images] | |
| # } | |
| # file_download_time = time.time() - start_time | |
| # | |
| # if any(file is None for files in temp_files.values() for file in | |
| # (files if isinstance(files, list) else [files])): | |
| # return JSONResponse(content={"status": "error", "message": "Failed to download all required images."}, | |
| # status_code=400) | |
| # | |
| # intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" | |
| # output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" | |
| # font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" | |
| # audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" | |
| # logger.info(f"Creating video with paths: {intro_path}, {output_path}, {font_path}, {audio_path}") | |
| # | |
| # video_creator = VideoCreator( | |
| # intro_video_path=intro_path, | |
| # necklace_image=temp_files['necklace'], | |
| # nto_outputs=temp_files['necklace_tryon'], | |
| # nto_cto_outputs=temp_files['clothing'], | |
| # makeup_outputs=temp_files['makeup'], | |
| # font_path=font_path, | |
| # output_path=output_path, | |
| # audio_path=audio_path, | |
| # image_display_duration=request.image_display_duration, | |
| # fps=request.fps, | |
| # necklace_image_title=request.necklace_image_title, | |
| # nto_image_title=request.nto_image_title, | |
| # nto_cto_image_title=request.nto_cto_image_title, | |
| # makeup_image_title=request.makeup_image_title, | |
| # ) | |
| # logger.info("Creating video...") | |
| # | |
| # video_creator.create_final_video() | |
| # video_creation_time = time.time() - start_time - file_download_time | |
| # | |
| # url = upload_to_supabase(video_path=output_path) | |
| # logger.info(f"Video created successfully: {url}") | |
| # supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time | |
| # logger.info(f"Video uploaded to Supabase: {url}") | |
| # | |
| # response = { | |
| # "status": "success", | |
| # "message": "Video created successfully.", | |
| # "video_url": url, | |
| # "timings": { | |
| # "file_download_time": file_download_time, | |
| # "video_creation_time": video_creation_time, | |
| # "supabase_upload_time": supabase_upload_time | |
| # } | |
| # } | |
| # logger.info(f"Response: {response}") | |
| # | |
| # for files in temp_files.values(): | |
| # if isinstance(files, list): | |
| # for file in files: | |
| # if os.path.exists(file): | |
| # os.unlink(file) | |
| # elif os.path.exists(files): | |
| # os.unlink(files) | |
| # | |
| # os.remove(output_path) | |
| # logger.info("Files cleaned up successfully.") | |
| # return JSONResponse(content=response, status_code=200) | |
| # | |
| # except Exception as e: | |
| # logger.error(f"Error creating video: {str(e)}") | |
| # return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
| class EachNecklaceVideoGeneratorRequest(BaseModel): | |
| intro_video_path: str = "JewelMirror_intro.mp4" | |
| font_path: str = "PlayfairDisplay-VariableFont.ttf" | |
| background_audio_path: str = "TraditionalIndianVlogMusic.mp3" | |
| image_display_duration: float = 2.5 | |
| fps: int = 10 | |
| necklace_title: List[str] = None | |
| nto_image_title: List[List[str]] = None | |
| nto_cto_image_title: List[List[str]] = None | |
| makeup_image_title: List[List[str]] = None | |
| necklace_images: List[str] = None | |
| necklace_try_on_output_images: List[List[str]] = None | |
| clothing_output_images: List[List[str]] = None | |
| makeup_output_images: List[List[str]] = None | |
| background_colors: list[tuple[int, int, int]] = None | |
| outro_title: str = "Reach out to us for more information" | |
| address: str = "123, ABC Street, XYZ City" | |
| phone_numbers: str = "1234567890" | |
| logo_url: str = "https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/MagicMirror/FullImages/default.png" | |
| transition_duration: float = 0.5 | |
| transition_type: str = "None" | |
| direction: str = "left" | |
| outro_video_path: str = "JewelMirror_outro.mp4" | |
| skip_necklace_video_portion: bool = False | |
| async def create_combined_video(request: EachNecklaceVideoGeneratorRequest): | |
| logger.info(f"Creating video with request: {request.dict()}") | |
| start_time = time.time() | |
| background_ = request.background_colors | |
| try: | |
| logger.info("Downloading images...") | |
| def process_images(image_list): | |
| if isinstance(image_list, list): | |
| return [process_images(item) for item in image_list] | |
| return download_image(image_list) | |
| temp_files = { | |
| 'necklaces': process_images(request.necklace_images), | |
| 'necklace_tryon': process_images(request.necklace_try_on_output_images), | |
| 'clothing': process_images(request.clothing_output_images), | |
| 'makeup': process_images(request.makeup_output_images), | |
| 'logo': download_image(request.logo_url) | |
| } | |
| file_download_time = time.time() - start_time | |
| def verify_files(files): | |
| logger.info(f"Verifying files: {files}") | |
| if isinstance(files, list): | |
| return all(verify_files(item) for item in files) | |
| return files is not None | |
| intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" | |
| output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" | |
| font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" | |
| audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" | |
| outro_path = f"{RESOURCES_DIR}/outro/{request.outro_video_path}" | |
| try: | |
| model_name = request.necklace_try_on_output_images[0][0].split("/")[-1].split(".")[0].split("-")[1] | |
| except: | |
| model_name = "MOD" | |
| video_creator = EachVideoCreator( | |
| intro_video_path=intro_path, | |
| necklace_image=temp_files['necklaces'], | |
| nto_outputs=temp_files['necklace_tryon'], | |
| nto_cto_outputs=temp_files['clothing'], | |
| makeup_outputs=temp_files['makeup'], | |
| font_path=font_path, | |
| output_path=output_path, | |
| audio_path=audio_path, | |
| image_display_duration=request.image_display_duration, | |
| fps=request.fps, | |
| necklace_title=request.necklace_title, | |
| nto_title=request.nto_image_title, | |
| cto_title=request.nto_cto_image_title, | |
| makeup_title=request.makeup_image_title, | |
| backgrounds=background_, | |
| outro_title=request.outro_title, | |
| address=request.address, | |
| phone_numbers=request.phone_numbers, | |
| logo_image=temp_files['logo'], | |
| transition_duration=request.transition_duration, | |
| transition_type=request.transition_type, | |
| direction=request.direction, | |
| outro_video_path=outro_path, | |
| skip_necklace_video_portion=request.skip_necklace_video_portion | |
| ) | |
| logger.info("Creating video...") | |
| video_creator.create_final_video() | |
| video_creation_time = time.time() - start_time - file_download_time | |
| logger.info("Video created successfully.") | |
| url = upload_to_supabase(video_path=output_path, model_name=model_name) | |
| supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time | |
| logger.info(f"Video uploaded to Supabase: {url}") | |
| response = { | |
| "status": "success", | |
| "message": "Video created successfully.", | |
| "video_url": url, | |
| "timings": { | |
| "file_download_time": file_download_time, | |
| "video_creation_time": video_creation_time, | |
| "supabase_upload_time": supabase_upload_time | |
| } | |
| } | |
| logger.info(f"Response: {response}") | |
| def cleanup_files(files): | |
| if isinstance(files, list): | |
| for item in files: | |
| cleanup_files(item) | |
| elif os.path.exists(files): | |
| os.unlink(files) | |
| if os.path.exists(output_path): | |
| os.remove(output_path) | |
| logger.info("Files cleaned up successfully.") | |
| return JSONResponse(content=response, status_code=200) | |
| except Exception as e: | |
| return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
| async def get_transitions(): | |
| response = { | |
| "transitions": ["crossfade", "slide"] | |
| } | |
| return JSONResponse(content=response, status_code=200) | |
| async def get_information(): | |
| logger.info("Getting resources information") | |
| music = os.listdir(RESOURCES_DIR + "/audio") | |
| fonts = os.listdir(RESOURCES_DIR + "/fonts") | |
| intro = os.listdir(RESOURCES_DIR + "/intro") | |
| outro = os.listdir(RESOURCES_DIR + "/outro") | |
| logger.info(f"Resources: {music}, {fonts}, {intro}") | |
| json = {"music": music, "fonts": fonts, "intro": intro, "outro": outro} | |
| return JSONResponse(content=json, status_code=200) | |
| def ensure_json_serializable(data): | |
| if isinstance(data, list): | |
| return [ensure_json_serializable(item) for item in data] | |
| elif isinstance(data, dict): | |
| return {key: ensure_json_serializable(value) for key, value in data.items()} | |
| elif hasattr(data, "__dict__"): | |
| return ensure_json_serializable(data.__dict__) | |
| else: | |
| return data | |
| async def product_page_image_generation(model_image: str, necklace_id: str, necklace_category: str, storename: str, | |
| x_offset: str, | |
| y_offset: str, | |
| clothing_list: List[str], | |
| makeup_colors: MakeupColors): | |
| try: | |
| model_name = model_image.split("/")[-1].split(".")[0] | |
| model_image_path = download_image(model_image) | |
| if model_image_path is None: | |
| return JSONResponse(content={"status": "error", "message": "Failed to download model image"}, | |
| status_code=400) | |
| response = prod_page.process_full_tryon_sequence( | |
| model_image_path, | |
| necklace_id, | |
| { | |
| "id": necklace_id, | |
| "category": necklace_category, | |
| "store_name": storename, | |
| "offset_x": x_offset, | |
| "offset_y": y_offset | |
| }, | |
| clothing_list, | |
| makeup_colors, | |
| model_name=model_name | |
| ) | |
| nto_results = response["nto_results"] | |
| cto_results = response["cto_results"] | |
| mto_results = response["mto_results"] | |
| json_response = { | |
| "status": "success", | |
| "message": "Product page images generated successfully.", | |
| "nto_results": ensure_json_serializable(nto_results), | |
| "cto_results": ensure_json_serializable(cto_results), | |
| "mto_results": ensure_json_serializable(mto_results) | |
| } | |
| return JSONResponse(content=json_response, status_code=200) | |
| except Exception as e: | |
| return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |