import os os.environ['CUDA_LAUNCH_BLOCKING'] = '1' web_url ="https://api.quantumgrove.tech" from fastapi import FastAPI, UploadFile, File, BackgroundTasks import sqlite3 def increment_video_hits(video_id): conn = sqlite3.connect('content.db') cursor = conn.cursor() cursor.execute(''' UPDATE videos SET hits = hits + 1 WHERE id = ? ''', (video_id,)) conn.commit() conn.close() def create_database(): conn = sqlite3.connect('api.db') cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS ban_list ( id INTEGER PRIMARY KEY, ip TEXT NOT NULL, reason TEXT NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS request_log ( id INTEGER PRIMARY KEY, ip TEXT NOT NULL, url TEXT NOT NULL, method TEXT NOT NULL, endpoint TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') cursor.execute('''CREATE TABLE IF NOT EXISTS output ( id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT NOT NULL, timestamp DATETIME NOT NULL )''') conn.commit() conn.close() def output_entry_id(output_id): conn = sqlite3.connect('api.db') cursor = conn.cursor() timestamp = datetime.now() cursor.execute("INSERT INTO output (uuid, timestamp) VALUES (?, ?)", (output_id, timestamp)) conn.commit() conn.close() from datetime import datetime, timedelta def delete_uuid(uuid_to_delete): path = "./outputs" directories = [] for root, dirs, files in os.walk(path): for dir in dirs: directories.append(os.path.join(root, dir)) for i in directories: temp = i.split('/') if temp[-1] == uuid_to_delete: shutil.rmtree(i) return True return False def get_old_out_ids(): conn = sqlite3.connect('api.db') cursor = conn.cursor() ten_minutes_ago = datetime.now() - timedelta(minutes=5) cursor.execute("SELECT uuid FROM output WHERE timestamp < ?", (ten_minutes_ago,)) rows = cursor.fetchall() if len(rows) == 0: return None for entry in rows: if delete_uuid(entry[0]): conn.execute("DELETE FROM output WHERE uuid = ?", (entry[0],)) conn.commit() conn.close() return True def insert_ban(ip, reason): conn = sqlite3.connect('api.db') cursor = conn.cursor() cursor.execute('''INSERT INTO ban_list (ip, reason) VALUES (?, ?)''', (ip, reason)) conn.commit() conn.close() def search_ban(ip): conn = sqlite3.connect('api.db') cursor = conn.cursor() cursor.execute('''SELECT * FROM ban_list WHERE ip = ?''', (ip,)) rows = cursor.fetchall() conn.close() return rows def log_request(ip, url, method, endpoint): conn = sqlite3.connect('api.db') cursor = conn.cursor() cursor.execute('''INSERT INTO request_log (ip, url, method, endpoint) VALUES (?, ?, ?, ?)''', (ip, url, method, endpoint)) conn.commit() conn.close() def create_database_port(): conn = sqlite3.connect('./subprocess/port.db', check_same_thread=False) cursor = conn.cursor() cursor.execute('''DROP TABLE IF EXISTS ports''') conn.commit() cursor.execute('''CREATE TABLE IF NOT EXISTS ports (port INTEGER)''') conn.commit() conn.close() def search_port(port): conn = sqlite3.connect('./subprocess/port.db') cursor = conn.cursor() cursor.execute('''SELECT * FROM ports WHERE port = ?''', (port,)) rows = cursor.fetchall() conn.close() if len(rows) > 0: return True return False def is_table_empty(): return False #test remove conn = sqlite3.connect('./subprocess/port.db') cursor = conn.cursor() cursor.execute('''SELECT COUNT(*) FROM ports''') count = cursor.fetchone()[0] conn.close() if count == 0: for item in os.listdir('/tmp/gradio'): item_path = os.path.join('/tmp/gradio', item) if os.path.isdir(item_path): shutil.rmtree(item_path) def remove_port(port): if(search_port(port)): conn = sqlite3.connect('./subprocess/port.db') cursor = conn.cursor() cursor.execute('''DELETE FROM ports WHERE port = ?''', (port,)) conn.commit() conn.close() return True return False def insert_port(port): if( not search_port(port)): conn = sqlite3.connect('./subprocess/port.db') cursor = conn.cursor() cursor.execute('''INSERT INTO ports (port) VALUES (?)''', (port,)) conn.commit() conn.close() return True else: return False def generate_response(response_message , response_status ,uuid_code , age , gender , metadata): response_dict = { "response_message" : response_message, "response_status" : response_status, "data" :{ "UUID" : uuid_code, "info" : {"age" : age , "gender" : gender}, "metadata" : metadata } } return response_dict import numpy as np import cv2 import time import io import gc import shutil import uuid import torch import subprocess import torchvision.transforms as transforms from scripts.psp import pSp from argparse import Namespace import dlib from scripts.align_all_parallel import align_face from scripts.augmentations import AgeTransformer from scripts.common import tensor2im from torchvision.transforms.functional import normalize from scripts.basicsr.utils import img2tensor, tensor2img from scripts.basicsr.utils.misc import get_device from scripts.facelib.utils.face_restoration_helper import FaceRestoreHelper from scripts.basicsr.utils.registry import ARCH_REGISTRY from scripts.basicsr.archs.rrdbnet_arch import RRDBNet from scripts.basicsr.utils.realesrgan_utils import RealESRGANer from scripts.facelib.utils.misc import is_gray import PIL.Image from scripts.erasescratches.models import Pix2PixHDModel_Mapping from scripts.erasescratches.options import Options from scripts.maskscratches import ScratchesDetector from scripts.util import irregular_hole_synthesize, tensor_to_ndarray import insightface from insightface.app import FaceAnalysis from pydub import AudioSegment import scripts.RRDBNet_arch as arch from RealESRGAN import RealESRGAN from PIL import Image import socket import subprocess from gradio_client import Client, handle_file device = ("cuda" if torch.cuda.is_available() else "cpu") #age model EXPERIMENT_TYPE = 'ffhq_aging' model_path = "./models/sam_ffhq_aging.pt" model_age_slider = None EXPERIMENT_DATA_ARGS = { "ffhq_aging": { "transform": transforms.Compose([ transforms.Resize((256, 256)), transforms.ToTensor(), transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]) } } EXPERIMENT_ARGS = EXPERIMENT_DATA_ARGS[EXPERIMENT_TYPE] def load_model_age(): try: if not get_model_state_age(): ckpt = torch.load(model_path, map_location='cpu') opts = ckpt['opts'] del ckpt torch.cuda.empty_cache() opts['checkpoint_path'] = model_path opts = Namespace(**opts) global model_age_slider model_age_slider = pSp(opts) del opts torch.cuda.empty_cache() model_age_slider.eval() model_age_slider.cuda() torch.cuda.empty_cache() return True except: return False def unload_model_age(): try: global model_age_slider if model_age_slider is not None: model_age_slider = None torch.cuda.empty_cache() return True except: return False def get_model_state_age(): global model_age_slider if model_age_slider is None: return False return True #model bg from scripts.BG import BG bg_model = None def load_model_bg(): try: global bg_model if bg_model is None: bg_model = BG() return True except: return False def unload_model_bg(): try: global bg_model if bg_model is not None: bg_model = None return True except: return False def get_model_state_bg(): global bg_model if bg_model is None: return False return True #model code_former model_upsampler = None model_code_former = None def set_realesrgan_cf(): use_half = True if torch.cuda.is_available() else False model = RRDBNet( num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2, ) upsampler = RealESRGANer( scale=2, model_path="./models/realesrgan/RealESRGAN_x2plus.pth", model=model, tile=400, tile_pad=40, pre_pad=0, half=use_half ) return upsampler def load_model_cf(): try: device = get_device() global model_code_former model_code_former = ARCH_REGISTRY.get('CodeFormer')(dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, connect_list=['32', '64', '128', '256']).to(device) ckpt_path = 'models/CodeFormer/codeformer.pth' checkpoint = torch.load(ckpt_path)['params_ema'] model_code_former.load_state_dict(checkpoint) model_code_former.eval() global model_upsampler model_upsampler = set_realesrgan_cf() return True except: return False def get_model_state_cf(): global model_code_former if model_code_former is None: return False else: return True def unload_model_cf(): if get_model_state_cf() == False: return True try: global model_code_former model_code_former = None torch.cuda.empty_cache() global model_upsampler model_upsampler = None torch.cuda.empty_cache() return True except: return False #model swap swap_model = None app_model = None def load_model_swap(): try: global swap_model global app_model if swap_model is None: swap_model = insightface.model_zoo.get_model("./models/inswapper_128.onnx",download=False,download_zip=False) app_model = FaceAnalysis(name="buffalo_l") app_model.prepare(ctx_id=0,det_size=(640,640)) return True except: return False def unload_model_swap(): try: global swap_model global app_model if swap_model is not None: swap_model = None if app_model is not None: app_model = None return True except: return False def get_model_state_swap(): global swap_model if swap_model is None: return False return True #model rest model_scratches_remove = None model_scratches_remove_detector = None model_scratches_remove_options = None def load_model_rest(): try: model_path = "./models/zeroscratches/restoration" global model_scratches_remove_detector model_scratches_remove_detector = ScratchesDetector('./models/zeroscratches') gpu_ids = [] if torch.cuda.is_available(): gpu_ids = [d for d in range(torch.cuda.device_count())] global model_scratches_remove_options model_scratches_remove_options = Options(model_path, gpu_ids) model_scratches = Pix2PixHDModel_Mapping() global model_scratches_remove model_scratches_remove = Pix2PixHDModel_Mapping() model_scratches_remove.initialize(model_scratches_remove_options) model_scratches_remove.eval() return True except: return False def get_model_state_rest(): global model_scratches_remove if model_scratches_remove is None: return False else: return True def unload_model_rest(): if get_model_state_rest() == False: return True try: global model_scratches_remove model_scratches_remove = None global model_scratches_remove_detector model_scratches_remove_detector = None global model_scratches_remove_options model_scratches_remove_options = None torch.cuda.empty_cache() return True except: return False #model talk from scripts.talker import sad_talker sad_talker_model = None def load_model_talk(): try: global sad_talker_model if sad_talker_model is None: sad_talker_model = sad_talker() return True except: return False def unload_model_talk(): try: global sad_talker_model if sad_talker_model is not None: sad_talker_model = None return True except: return False def get_model_state_talk(): global sad_talker_model if sad_talker_model is None: return False return True #model sky from scripts.SKY import SKY sky_model = None def load_model_sky(): try: global sky_model if sky_model is None: sky_model = SKY() return True except: return False def unload_model_sky(): try: global sky_model if sky_model is not None: sky_model = None return True except: return False def get_model_state_sky(): global sky_model if sky_model is None: return False return True #model up model_upscale = None def load_model_up(): try: model_path_upscale = './models/RRDB_ESRGAN_x4.pth' global model_upscale model_upscale = arch.RRDBNet(3, 3, 64, 23, gc=32) model_upscale.load_state_dict(torch.load(model_path_upscale), strict=True) model_upscale = model_upscale.half() model_upscale.eval() model_upscale = model_upscale.to(device) return True except: return False def get_model_state_up(): global model_upscale if model_upscale is None: return False else: return True def unload_model_up(): if get_model_state_up() == False: return True try: global model_upscale model_upscale = None torch.cuda.empty_cache() return True except: return False #agr slider helper def run_alignment(image_path): predictor = dlib.shape_predictor("./models/shape_predictor_68_face_landmarks.dat") aligned_image = align_face(filepath=image_path, predictor=predictor) return aligned_image def run_on_batch(inputs, model_age_slider): result_batch = model_age_slider(inputs.to("cuda").float(), randomize_noise=False, resize=False) return result_batch #model model_esrgan model_esrgan = None def load_model_esrgan(): try: global model_esrgan model_esrgan = RealESRGAN( torch.device(device), scale=2) model_esrgan.load_weights('./models/realesrgan/RealESRGAN_x2plus.pth', download=False) return True except: return False def get_model_state_esrgan(): global model_esrgan if model_esrgan is None: return False else: return True def unload_model_esrgan(): if get_model_state_esrgan() == False: return True try: global model_esrgan model_esrgan = None torch.cuda.empty_cache() return True except: return False #model lp def is_port_in_use(port, host='localhost'): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) try: s.bind((host, port)) except socket.error: return True return False import requests def is_server_ready(url, timeout=30): start_time = time.time() while True: try: response = requests.get(url) if response.status_code == 200: return True except requests.RequestException: pass if time.time() - start_time > timeout: return False time.sleep(1) model_port_lp = None def load_port_lp(): try: if not get_model_port_lp(): port_to_check = 60000 con_count = 0 while(True): if con_count > 100: break if is_port_in_use(port_to_check) or (search_port(port_to_check)): port_to_check = port_to_check + 1 con_count = con_count + 1 else: break port_to_check = str(port_to_check) script_dir = "./subprocess/LivePortrait" script_dir = os.path.abspath(script_dir) command = ["/home/qtech/miniconda3/envs/LivePortrait/bin/python", script_dir+"/app.py", "--server_port", port_to_check] process = subprocess.Popen(command, cwd=script_dir) global model_port_lp model_port_lp = {"port":port_to_check , "pid":process} insert_port(port_to_check) is_server_ready(f'http://127.0.0.1:{port_to_check}/') return True except: return False def unload_port_lp(): try: if get_model_port_lp(): global model_port_lp port = model_port_lp['port'] remove_port(port) pid = model_port_lp['pid'] pid.terminate() pid.kill() model_port_lp = None return True else: return True except: return False def get_model_port_lp(): global model_port_lp if model_port_lp is None: return False return True def unload_model_all(): try: unload_model_age() unload_model_bg() unload_model_cf() unload_model_up() unload_model_swap() unload_model_rest() unload_model_talk() unload_model_sky() unload_model_esrgan() unload_port_lp() try: is_table_empty() except: pass return True except: return False def start_http_file_server(): directory = './outputs' port = 8010 if not os.path.exists(directory): os.makedirs(directory) cmd = f"python -m http.server {port} --directory {directory}" return subprocess.Popen(cmd, shell=True) import asyncio from datetime import datetime time_last_call = None async def continuous_function(): while True: if get_old_out_ids():#test remove print(f"Old Data Deleted") if torch.cuda.is_available(): torch.cuda.empty_cache() global time_last_call if time_last_call is not None: current_time = datetime.now() time_difference = current_time - time_last_call time_elapsed_minutes = time_difference.total_seconds() / 60 if time_elapsed_minutes > 1: gc.collect() if time_elapsed_minutes > 5: model_delete_status = unload_model_all() print(f"model delete status = {model_delete_status}") time_last_call = None await asyncio.sleep(60*1) async def start_continuous_function(): await continuous_function() app = FastAPI() from fastapi.responses import HTMLResponse from fastapi import Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) from api_analytics.fastapi import Analytics app.add_middleware(Analytics, api_key="87686a97-20e4-42c9-99ab-9410c2ef2e65") #test remove # async def custom_middleware(request: Request, call_next): # client_ip = request.client.host # request_url = str(request.url) # request_method = request.method # endpoint = request.url.path # log_request(client_ip, request_url, request_method, endpoint) # print("\n\n\n\n\n") # print("request:") # headers = request.headers # query_params = request.query_params # path_params = request.path_params # method = request.method # print("\n\n\n\n\n") # print("Request Method:", method) # print("Request Path Params:", path_params) # print("Request Query Params:", query_params) # print("Request Headers:", headers) # print("\n\n\n\n\n") # print("\n\n\n\n\n") # if len(search_ban(client_ip)) > 0: # return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) # # user_agent = request.headers.get("user-agent", "").lower() # # print(user_agent) # # if "android" not in user_agent: # # insert_data(client_ip, "Access using unapproved device type") # # return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) # response = await call_next(request) # if (response.status_code == 404) or (response.status_code == 405): # insert_ban(client_ip, "Access using unapproved method/endpoint") # return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) # return response from fastapi import Request from fastapi.responses import JSONResponse, Response, StreamingResponse from fastapi import Request, Response from fastapi.responses import StreamingResponse import io async def custom_middleware(request: Request, call_next): client_ip = request.client.host request_url = str(request.url) request_method = request.method endpoint = request.url.path log_request(client_ip, request_url, request_method, endpoint) print("\n\n\n\n\n") print("Request Details:") headers = request.headers query_params = request.query_params path_params = request.path_params method = request.method print("Request Method:", method) print("Request Path Params:", path_params) print("Request Query Params:", query_params) print("Request Headers:", headers) print("\n\n\n\n\n") if len(search_ban(client_ip)) > 0: return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) response = await call_next(request) if isinstance(response, StreamingResponse): # Buffer the streaming content for inspection (be cautious of performance impacts) content = io.BytesIO() async for chunk in response.body_iterator: content.write(chunk) content.seek(0) response = StreamingResponse(content, media_type=response.media_type) # Now you can log the content if needed print("Response Content:", content.getvalue().decode('utf-8', errors='replace')) else: # Handle non-streaming responses as before response_body = b"".join([chunk async for chunk in response.body_iterator]).decode('utf-8', errors='replace') print("Response Content:", response_body) if (response.status_code == 404) or (response.status_code == 405): insert_ban(client_ip, "Access using unapproved method/endpoint") return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) return response app.middleware('http')(custom_middleware) @app.get("/", response_class=HTMLResponse) def read_index(): index_path = './assests/index.html' with open(index_path, 'r') as file: content = file.read() return HTMLResponse(content=content) from fastapi.responses import FileResponse @app.get("/favicon.ico") async def get_image(): image_path = "./assests/logo.png" return FileResponse(image_path, media_type="image/png") @app.on_event("startup") async def startup_event(): asyncio.create_task(start_continuous_function()) @app.on_event("shutdown") async def shutdown_event(): print("Clearing Models") model_status = unload_model_all() print("Model clear status = "+str(model_status)) asyncio.get_event_loop().stop() print("Stopping Server") ALLOWED_IMAGE_FORMATS = ["jpg", "jpeg", "png"] ALLOWED_AUDIO_FORMATS = ["wav", "mp3"] def delete_uuid(uuid_to_delete): path = "./outputs" directories = [] for root, dirs, files in os.walk(path): for dir in dirs: directories.append(os.path.join(root, dir)) for i in directories: temp = i.split('/') if temp[-1] == uuid_to_delete: shutil.rmtree(i) return True return False @app.delete("/Delete/{uuid}") async def delete_item(uuid: str): gc.collect() try: result = delete_uuid(uuid) if result: return {"error": f"UUID '{uuid}' does not exist"} return {"message": f"UUID '{uuid}' deleted"} except : return {"error": "unexpected error"} @app.put("/ClearModel_ALL") async def clearModel_ALL(): try: model_delete_status = unload_model_all() if model_delete_status: global time_last_call time_last_call = None return {"message": "vram cleared"} else : return {"message": "vram not cleared"} except : return {"error": "unexpected error"} @app.post("/AgeSlider") async def age_slider(background_tasks: BackgroundTasks , image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) app_face = FaceAnalysis(name="buffalo_l") app_face.prepare(ctx_id=0,det_size=(640,640)) faces = app_face.get(image) if len(faces)==0: response_message = "Error no face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) elif len(faces)!=1: response_message = "Error more than 1 face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) info_age = faces[0]['age'] info_gender = faces[0]['gender'] ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/AGE/{unique_id}" os.makedirs(output_path) # loading model if not get_model_state_age(): unload_model_all() model_status = load_model_age() print(f"model status =========={model_status}") cv2.imwrite(output_path+'/input.png',image) aligned_image = run_alignment(output_path+'/input.png') #aligned_image.save(output_path+"/input_aligned_cropped.png") copy_aligned_image = aligned_image.copy() aligned_image.resize((256, 256)) img_transforms = EXPERIMENT_ARGS['transform'] input_image = img_transforms(aligned_image) target_ages = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] age_transformers = [AgeTransformer(target_age=age) for age in target_ages] images = [] for age_transformer in age_transformers: with torch.no_grad(): input_image_age = [age_transformer(input_image.cpu()).to('cuda')] input_image_age = torch.stack(input_image_age) result_tensor = run_on_batch(input_image_age, model_age_slider)[0] result_image = tensor2im(result_tensor) images.append(result_image) image_paths_temp = [] result_paths_list = [] for idx,i in enumerate(images): image_temp = np.array(i) image_temp = cv2.cvtColor(np.array(image_temp), cv2.COLOR_RGB2BGR) result_path_temp = output_path+"/output_"+str(idx)+".png" cv2.imwrite(result_path_temp,image_temp) image_url_temp = '/'.join(result_path_temp.split('/')[-3:]) image_url_temp = f"{web_url}:8001/{image_url_temp}" result_paths_list.append({"age" :str(target_ages[idx]), "image_url" : image_url_temp}) image_paths_temp.append(result_path_temp) closest_age = min(target_ages, key=lambda x: abs(x - info_age)) closest_age = target_ages.index(closest_age) os.remove(f"{output_path}/output_{closest_age}.png") copy_aligned_image = copy_aligned_image.resize((1024, 1024)) copy_aligned_image.save(f"{output_path}/output_{closest_age}.png") background_tasks.add_task(Age_Slider_Video,unique_id , image_paths_temp) os.remove(output_path+'/input.png') response_message = "AgeSlider Ran Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , info_age , ["Female","Male"][info_gender] , result_paths_list) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) def Age_Slider_Video(uuid_input , image_files): temp_file_uuid = str(uuid.uuid4()) directory_path = f'./outputs/{temp_file_uuid}' if os.path.exists(directory_path): shutil.rmtree(directory_path) os.mkdir(directory_path) from scripts.morph_video import doMorphing doMorphing(image_files, 0.3, 20, f"{directory_path}/output") video_path = f"{directory_path}/output_combined.mp4" gif_path = f"{directory_path}/output.gif" shutil.copy(video_path, f"./outputs/AGE/{uuid_input}/output_combined.mp4") shutil.copy(gif_path, f"./outputs/AGE/{uuid_input}/output.gif") shutil.rmtree(directory_path) output_entry_id(uuid_input) @app.post("/AgeSliderVideo/{uuid_input}") async def ageslidervideo(uuid_input: str): ################## try: input_path = f"./outputs/AGE/{uuid_input}" ################################################################################################################################################################################### ############################################################################### Test Input UUID ################################################################################## ################################################################################################################################################################################### if not os.path.exists(input_path): response_message = f"Invalid UUID , file does not exist" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Processing ################################################################################## ################################################################################################################################################################################### output_video_path = '' output_gif_path = '' duration = 120 start_time = time.time() while time.time() - start_time < duration: if output_video_path == '': if os.path.exists(input_path+"/output_combined.mp4"): output_video_path = f'{web_url}:8001/AGE/{uuid_input}'+"/output_combined.mp4" if output_gif_path == '': if os.path.exists(input_path+"/output.gif"): output_gif_path = f'{web_url}:8001/AGE/{uuid_input}'+"/output.gif" if output_video_path != '': if output_gif_path != '': #output_entry_id(uuid_input) #test remove response_message = "Video Genrated Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}]) if output_video_path != '': if output_gif_path == '': response_message = "Only Video Genrated" response_status = "Failure" return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}]) if output_gif_path != '': if output_video_path == '': response_message = "Only GIF Genrated" response_status = "Failure" return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}]) if output_gif_path == '': if output_video_path == '': response_message = "Timeout Reached , Retry Later" response_status = "Failure" return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}]) except Exception as e: return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/BG_remove") async def bg(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/BG/{unique_id}" # loading model if not get_model_state_bg(): unload_model_all() model_status = load_model_bg() print(f"model status =========={model_status}") output_image = bg_model.BG_remove(image) os.makedirs(output_path) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,output_image) output_entry_id(unique_id) #test remove response_message = "BG Removed Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/BG/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/CodeFormer") async def codeformer(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) height = image.shape[0] width = image.shape[1] if (height*width)>(500*500) : aspect_ratio = width / height new_height = int(((500*500) / aspect_ratio) ** 0.5) new_width = int(aspect_ratio * new_height) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/CF/{unique_id}" # loading model if not get_model_state_cf(): unload_model_all() model_status = load_model_cf() print(f"model status =========={model_status}") #image = cv2.imread(output_path+'/input.png') has_aligned = False only_center_face = False draw_box = False detection_model = "retinaface_resnet50" background_enhance = True face_upsample = True upscale = 2 codeformer_fidelity = 0.5 img = image#cv2.imread(str(img_path), cv2.IMREAD_COLOR) upscale = int(upscale) # convert type to int if upscale > 4: # avoid memory exceeded due to too large upscale upscale = 4 if upscale > 2 and max(img.shape[:2])>1000: # avoid memory exceeded due to too large img resolution upscale = 2 if max(img.shape[:2]) > 1500: # avoid memory exceeded due to too large img resolution upscale = 1 background_enhance = False face_upsample = False face_helper = FaceRestoreHelper( upscale, face_size=512, crop_ratio=(1, 1), det_model=detection_model, save_ext="png", use_parse=True, device=device, ) bg_upsampler = model_upsampler if background_enhance else None face_upsampler = model_upsampler if face_upsample else None if has_aligned: # the input faces are already cropped and aligned img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR) face_helper.is_gray = is_gray(img, threshold=5) if face_helper.is_gray: print('\tgrayscale input: True') face_helper.cropped_faces = [img] else: face_helper.read_image(img) # get face landmarks for each face num_det_faces = face_helper.get_face_landmarks_5( only_center_face=only_center_face, resize=640, eye_dist_threshold=5 ) print(f'\tdetect {num_det_faces} faces') # align and warp each face face_helper.align_warp_face() # face restoration for each cropped face for idx, cropped_face in enumerate(face_helper.cropped_faces): # prepare data cropped_face_t = img2tensor( cropped_face / 255.0, bgr2rgb=True, float32=True ) normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) cropped_face_t = cropped_face_t.unsqueeze(0).to(device) try: with torch.no_grad(): output = model_code_former(cropped_face_t, w=codeformer_fidelity, adain=True)[0] restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) del output torch.cuda.empty_cache() except RuntimeError as error: print(f"Failed inference for CodeFormer: {error}") restored_face = tensor2img( cropped_face_t, rgb2bgr=True, min_max=(-1, 1) ) restored_face = restored_face.astype("uint8") face_helper.add_restored_face(restored_face) # paste_back if not has_aligned: # upsample the background if bg_upsampler is not None: # Now only support RealESRGAN for upsampling background bg_img = bg_upsampler.enhance(img, outscale=upscale)[0] else: bg_img = None face_helper.get_inverse_affine(None) # paste each restored face to the input image if face_upsample and face_upsampler is not None: restored_img = face_helper.paste_faces_to_input_image( upsample_img=bg_img, draw_box=draw_box, face_upsampler=face_upsampler, ) else: restored_img = face_helper.paste_faces_to_input_image( upsample_img=bg_img, draw_box=draw_box ) output_image_path = output_path+'/output.png' os.makedirs(output_path) cv2.imwrite(output_image_path,restored_img) #cv2.imwrite(output_path+'/output.png',restored_img) output_entry_id(unique_id) #test remove response_message = "CodeFormer Image Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/CF/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/FaceSwap_single_image") async def Swap_single_image(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/SWAP/{unique_id}" # loading model if not get_model_state_swap(): unload_model_all() model_status = load_model_swap() print(f"model status =========={model_status}") faces = app_model.get(image) if len(faces)==0: response_message = "Error no face detected" response_status = "Failure" return generate_response(response_message,response_status,'' , '' , '' , []) elif len(faces)!=2: response_message = "Error more than 2 face detected" response_status = "Failure" return generate_response(response_message,response_status,'' , '' , '' , []) face1 = faces[0] face2 = faces[1] image = swap_model.get(image,face1,face2,paste_back=True) image = swap_model.get(image,face2,face1,paste_back=True) os.makedirs(output_path) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,image) output_entry_id(unique_id) #test remove response_message = "Face Swapped Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SWAP/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/FaceSwap_two_images") async def swap_two_image(image1: UploadFile = File(...), image2: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### images = [] for idx ,image in enumerate([image1,image2]): image_format = image.filename.split(".")[-1].lower() print(image.filename,image.filename.split(".")[-1].lower()) if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image{str(idx+1)} format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) images.append(image) if image is None: response_message = f"Image{str(idx+1)} is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/SWAP/{unique_id}" # loading model if not get_model_state_swap(): unload_model_all() model_status = load_model_swap() print(f"model status =========={model_status}") faces = [] faces.append(app_model.get(images[0])) faces.append(app_model.get(images[1])) for idx ,face in enumerate(faces): if len(face)==0: response_message = f"Error no face detected in image{idx}" response_status = "Failure" return generate_response(response_message,response_status,'' , '' , '' , []) elif len(face) >1: response_message = f"Error more than 1 face detected in image{idx}" response_status = "Failure" return generate_response(response_message,response_status,'' , '' , '' , []) face1 = faces[0][0] face2 = faces[1][0] image = swap_model.get(images[0],face1,face2,paste_back=True) os.makedirs(output_path) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,image) output_entry_id(unique_id) #test remove response_message = "Face Swapped Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SWAP/{unique_id}/output.png"]) except Exception as e: return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/Restore_Images") async def restore(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/Restore/{unique_id}" os.makedirs(output_path) cv2.imwrite(output_path+'/input.png',image) # loading model if not get_model_state_rest(): unload_model_all() model_status = load_model_rest() print(f"model status =========={model_status}") image = PIL.Image.open(output_path+'/input.png') os.remove(output_path+'/input.png') transformed, mask = model_scratches_remove_detector.process(image) img_transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) mask_transform = transforms.ToTensor() if model_scratches_remove_options.mask_dilation != 0: kernel = np.ones((3, 3), np.uint8) mask = np.array(mask) mask = cv2.dilate(mask, kernel, iterations=model_scratches_remove_options.mask_dilation) mask = PIL.Image.fromarray(mask.astype('uint8')) transformed = irregular_hole_synthesize(transformed, mask) mask = mask_transform(mask) mask = mask[:1, :, :] mask = mask.unsqueeze(0) transformed = img_transform(transformed) transformed = transformed.unsqueeze(0) generated = model_scratches_remove.inference(transformed, mask) tensor_restored = (generated.data.cpu() + 1.0) / 2.0 image_to_show = tensor_restored.squeeze().cpu().numpy().transpose((1, 2, 0)) image_to_show = (image_to_show * 255).astype(np.uint8)[:, :, ::-1] from scripts.colorizer import colorize_image image_to_show = colorize_image(image_to_show) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,image_to_show) output_entry_id(unique_id) #test remove response_message = "Restred Image Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Restore/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/Sad_Talker") async def sadtalker(image: UploadFile = File(...), audio: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Test Input Audio ################################################################################## ################################################################################################################################################################################### audio_format = audio.filename.split(".")[-1].lower() if audio_format not in ALLOWED_AUDIO_FORMATS: response_message = f"Unsupported audio format. Supported formats: {', '.join(ALLOWED_AUDIO_FORMATS)}" response_status = "Failure" return generate_response(response_message, response_status, '', '', '', []) audio_contents = await audio.read() if len(audio_contents) == 0: response_message = "Audio file is empty" response_status = "Failure" return generate_response(response_message, response_status, '', '', '', []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/Sad_Talker/{unique_id}" os.makedirs(output_path) image_input_path = output_path+'/input.png' cv2.imwrite(image_input_path,image) audio_segment = AudioSegment.from_file(io.BytesIO(audio_contents), format=audio_format) audio_input_path = os.path.join(output_path, f"audio.{audio_format}") audio_segment.export(audio_input_path, format=audio_format) app_face = FaceAnalysis(name="buffalo_l") app_face.prepare(ctx_id=0,det_size=(640,640)) faces = app_face.get(image) if len(faces)==0: response_message = "Error no face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) elif len(faces)!=1: response_message = "Error more than 1 face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) info_age = faces[0]['age'] info_gender = faces[0]['gender'] # loading model if not get_model_state_talk(): unload_model_all() model_status = load_model_talk() print(f"model status =========={model_status}") response = sad_talker_model.genrate_video(image_input_path,audio_input_path,output_path,True) # input_file = f"{output_path}/output.mp4 " # subprocess.run(['ffmpeg', '-i', input_file, '-c:v', 'libx264', '-preset', 'slow', '-crf', '23', '-c:a', 'aac', '-b:a', '192k', '-y', input_file], check=True) os.remove(image_input_path) os.remove(audio_input_path) if response: input_file = f"outputs/Sad_Talker/{unique_id}/output.mp4" output_file = f"outputs/Sad_Talker/{unique_id}/final.mp4" subprocess.run(['ffmpeg', '-i', input_file, '-c:v', 'libx264', '-preset', 'slow', '-crf', '23', '-c:a', 'aac', '-b:a', '192k', output_file], check=True) os.remove(input_file) output_entry_id(unique_id) #test remove response_message = "Video Generated Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , info_age , ["Female","Male"][info_gender] , [f"{web_url}:8001/Sad_Talker/{unique_id}/final.mp4"]) else: response_message = "Error Genrating Video, try another image" response_status = "Failure" #shutil.rmtree(output_path) return generate_response(response_message , response_status ,'','','',[]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/SKY_remove") async def sky(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/SKY/{unique_id}" # loading model if not get_model_state_sky(): unload_model_all() model_status = load_model_sky() print(f"model status =========={model_status}") output_image = sky_model.SKY_remove(image) os.makedirs(output_path) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,output_image) output_entry_id(unique_id) #test remove response_message = "SKY Removed Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SKY/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/SKY_replace_image") async def sky_replace_image(image1: UploadFile = File(...), image2: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### images = [] for idx ,image in enumerate([image1,image2]): image_format = image.filename.split(".")[-1].lower() print(image.filename,image.filename.split(".")[-1].lower()) if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image{str(idx+1)} format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) images.append(image) if image is None: response_message = f"Image{str(idx+1)} is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/SKY/{unique_id}" # loading model if not get_model_state_sky(): unload_model_all() model_status = load_model_sky() print(f"model status =========={model_status}") output_image = sky_model.SKY_replace_image(images[0],images[1]) os.makedirs(output_path) output_image_path = output_path+'/output.png' cv2.imwrite(output_image_path,output_image) output_entry_id(unique_id) #test remove response_message = "SKY Removed Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SKY/{unique_id}/output.png"]) except Exception as e: return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/ImageUpscale") async def upscale(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) height = image.shape[0] width = image.shape[1] if (height*width)>(200*200) : aspect_ratio = width / height new_height = int(((200*200) / aspect_ratio) ** 0.5) new_width = int(aspect_ratio * new_height) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/Upscale/{unique_id}" # loading model if not get_model_state_up(): unload_model_all() model_status = load_model_up() print(f"model status =========={model_status}") #image = cv2.imread(output_path+'/input.png') image = image * 1.0 / 255 image = torch.from_numpy(np.transpose(image[:, :, [2, 1, 0]], (2, 0, 1))).float() img_LR = image.unsqueeze(0) img_LR = img_LR.to(device).half() output = model_upscale(img_LR).data.squeeze().float().cpu().clamp_(0, 1).numpy() output = np.transpose(output[[2, 1, 0], :, :], (1, 2, 0)) output = (output * 255.0).round() image = output.astype(np.uint8) output_image_path = output_path+'/output.png' os.makedirs(output_path) cv2.imwrite(output_image_path,image) output_entry_id(unique_id) #test remove response_message = "Upscaled Image Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Upscale/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/ImageDenoise") async def deniose(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) height = image.shape[0] width = image.shape[1] if (height*width)>(1500*1500) : aspect_ratio = width / height new_height = int(((1500*1500) / aspect_ratio) ** 0.5) new_width = int(aspect_ratio * new_height) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/Denoise/{unique_id}" # loading model if not get_model_state_esrgan(): unload_model_all() model_status = load_model_esrgan() print(f"model status =========={model_status}") #image = cv2.imread(output_path+'/input.png') image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image_pil = Image.fromarray(image_rgb) global model_esrgan result_pil = model_esrgan.predict(image_pil) result_rgb = result_pil.convert('RGB') result_np = np.array(result_rgb) image = cv2.cvtColor(result_np, cv2.COLOR_RGB2BGR) output_image_path = output_path+'/output.png' os.makedirs(output_path) cv2.imwrite(output_image_path,image) output_entry_id(unique_id) #test remove response_message = "Upscaled Image Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Denoise/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) video_url = [{'id': '6a025d6c-5cf9-4de0-b323-6a628563327f', 'url': 'https://api.quantumgrove.tech:8001/Videos/d13.mp4', 'title': '6a025d6c-5cf9-4de0-b323-6a628563327f', 'info': '6a025d6c-5cf9-4de0-b323-6a628563327f', 'path': './subprocess/LivePortrait/assets/examples/driving/d13.mp4', "cat" : "Trending"}, {'id': '70153a58-0b49-44f2-8c81-816aff6ee2fe', 'url': 'https://api.quantumgrove.tech:8001/Videos/d10.mp4', 'title': '70153a58-0b49-44f2-8c81-816aff6ee2fe', 'info': '70153a58-0b49-44f2-8c81-816aff6ee2fe', 'path': './subprocess/LivePortrait/assets/examples/driving/d10.mp4', "cat" : "Kids"}, {'id': '5df41b72-f7e6-4aba-aa19-fc970d310dc7', 'url': 'https://api.quantumgrove.tech:8001/Videos/d19.mp4', 'title': '5df41b72-f7e6-4aba-aa19-fc970d310dc7', 'info': '5df41b72-f7e6-4aba-aa19-fc970d310dc7', 'path': './subprocess/LivePortrait/assets/examples/driving/d19.mp4', "cat" : "Kids"}, {'id': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea', 'url': 'https://api.quantumgrove.tech:8001/Videos/d9.mp4', 'title': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea', 'info': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea', 'path': './subprocess/LivePortrait/assets/examples/driving/d9.mp4', "cat" : "Trending"}, {'id': '93116106-7c25-41fa-8343-35c866ce9954', 'url': 'https://api.quantumgrove.tech:8001/Videos/d3.mp4', 'title': '93116106-7c25-41fa-8343-35c866ce9954', 'info': '93116106-7c25-41fa-8343-35c866ce9954', 'path': './subprocess/LivePortrait/assets/examples/driving/d3.mp4', "cat" : "Trending"}, {'id': '15576fab-3125-4bf2-a26b-2a3f69120516', 'url': 'https://api.quantumgrove.tech:8001/Videos/d14.mp4', 'title': '15576fab-3125-4bf2-a26b-2a3f69120516', 'info': '15576fab-3125-4bf2-a26b-2a3f69120516', 'path': './subprocess/LivePortrait/assets/examples/driving/d14.mp4', "cat" : "Trending"}, {'id': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e', 'url': 'https://api.quantumgrove.tech:8001/Videos/d11.mp4', 'title': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e', 'info': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e', 'path': './subprocess/LivePortrait/assets/examples/driving/d11.mp4', "cat" : "Top"}, {'id': 'd528d624-546a-4a98-8a8d-810c604178de', 'url': 'https://api.quantumgrove.tech:8001/Videos/d6.mp4', 'title': 'd528d624-546a-4a98-8a8d-810c604178de', 'info': 'd528d624-546a-4a98-8a8d-810c604178de', 'path': './subprocess/LivePortrait/assets/examples/driving/d6.mp4', "cat" : "Top"}, {'id': 'fd8785af-deed-40cd-89b2-978c3658d1eb', 'url': 'https://api.quantumgrove.tech:8001/Videos/d0.mp4', 'title': 'fd8785af-deed-40cd-89b2-978c3658d1eb', 'info': 'fd8785af-deed-40cd-89b2-978c3658d1eb', 'path': './subprocess/LivePortrait/assets/examples/driving/d0.mp4', "cat" : "Trending"}, {'id': '946bf5ad-6612-4aab-8551-22e77762df1c', 'url': 'https://api.quantumgrove.tech:8001/Videos/d12.mp4', 'title': '946bf5ad-6612-4aab-8551-22e77762df1c', 'info': '946bf5ad-6612-4aab-8551-22e77762df1c', 'path': './subprocess/LivePortrait/assets/examples/driving/d12.mp4', "cat" : "Trending"}, {'id': '13286ebb-8bda-4560-8521-9455a5f7188a', 'url': 'https://api.quantumgrove.tech:8001/Videos/d18.mp4', 'title': '13286ebb-8bda-4560-8521-9455a5f7188a', 'info': '13286ebb-8bda-4560-8521-9455a5f7188a', 'path': './subprocess/LivePortrait/assets/examples/driving/d18.mp4', "cat" : "Top"}] @app.post("/LivePortrait/{video_id}") async def lp(video_id: str,image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() print(image_format) print(video_id) # if image_format not in ALLOWED_IMAGE_FORMATS: # response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" # response_status = "Failure" # return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) height = image.shape[0] width = image.shape[1] if (height*width)>(1500*1500) : aspect_ratio = width / height new_height = int(((500*500) / aspect_ratio) ** 0.5) new_width = int(aspect_ratio * new_height) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) app_face = FaceAnalysis(name="buffalo_l") app_face.prepare(ctx_id=0,det_size=(640,640)) faces = app_face.get(image) if len(faces)==0: response_message = "Error no face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) elif len(faces)!=1: response_message = "Error more than 1 face detected" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### unique_id = str(uuid.uuid4()) output_path = f"./outputs/LP/{unique_id}" os.makedirs(output_path) image_input_path = output_path+'/input.png' cv2.imwrite(image_input_path,image) # import glob # directory_path = "./subprocess/LivePortrait/assets/examples/driving" # mp4_files = glob.glob(os.path.join(directory_path, "*.mp4")) # loading model if not get_model_port_lp(): unload_model_all() model_status = load_port_lp() print(f"model status =========={model_status}") con = 0 for i in video_url: #id = "7aaa2b05-5207-4c84-b2f9-926b0219cae1" video_path = f"./outputs/videos/{video_id}.mp4" if os.path.exists(video_path): increment_video_hits(video_id) # if video_id == i['id']: # video_path = i['path'] con = 1 break if con == 0: return generate_response(f"Incorrect Video ID" , "Failure" ,'' , '' , '' , []) global model_port_lp client = Client(f"http://127.0.0.1:{model_port_lp['port']}/") result = client.predict( param_0=handle_file(os.path.abspath(output_path+'/input.png')), param_1={"video": handle_file(video_path)}, param_2=True, param_3=True, param_4=True, param_5=False, api_name="/gpu_wrapped_execute_video" ) shutil.copy(result[0]['video'], output_path+'/output.mp4') for i in result: root = os.path.dirname(i['video']) if os.path.isdir(root): shutil.rmtree(root) os.remove(output_path+'/input.png') output_entry_id(unique_id) #test remove response_message = "LP genrated Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/LP/{unique_id}/output.mp4"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) @app.post("/ImageEnhancer") async def imageenhance(image: UploadFile = File(...)): ################# ################# #get time global time_last_call time_last_call = datetime.now() ################## try: ################################################################################################################################################################################### ############################################################################### Test Input Image ################################################################################## ################################################################################################################################################################################### image_format = image.filename.split(".")[-1].lower() if image_format not in ALLOWED_IMAGE_FORMATS: response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) contents = await image.read() nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: response_message = "Image is empty" response_status = "Failure" return generate_response(response_message , response_status ,'' , '' , '' , []) ################################################################################################################################################################################### ############################################################################### Image Processing ################################################################################## ################################################################################################################################################################################### # loading model upscale_image = True unique_id = str(uuid.uuid4()) output_path = f"./outputs/Enhance/{unique_id}" height = image.shape[0] width = image.shape[1] if (height*width)>(1500*1500) : aspect_ratio = width / height new_height = int(((1500*1500) / aspect_ratio) ** 0.5) new_width = int(aspect_ratio * new_height) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) # loading model if not get_model_state_cf(): unload_model_all() model_status = load_model_cf() print(f"model status =========={model_status}") if upscale_image: has_aligned = False only_center_face = False draw_box = False detection_model = "retinaface_resnet50" background_enhance = True face_upsample = True upscale = 2 codeformer_fidelity = 0.5 img = image#cv2.imread(str(img_path), cv2.IMREAD_COLOR) upscale = int(upscale) # convert type to int if upscale > 4: # avoid memory exceeded due to too large upscale upscale = 4 if upscale > 2 and max(img.shape[:2])>1000: # avoid memory exceeded due to too large img resolution upscale = 2 if max(img.shape[:2]) > 1500: # avoid memory exceeded due to too large img resolution upscale = 1 background_enhance = False face_upsample = False face_helper = FaceRestoreHelper( upscale, face_size=512, crop_ratio=(1, 1), det_model=detection_model, save_ext="png", use_parse=True, device=device, ) bg_upsampler = model_upsampler if background_enhance else None face_upsampler = model_upsampler if face_upsample else None if has_aligned: # the input faces are already cropped and aligned img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR) face_helper.is_gray = is_gray(img, threshold=5) if face_helper.is_gray: print('\tgrayscale input: True') face_helper.cropped_faces = [img] else: face_helper.read_image(img) # get face landmarks for each face num_det_faces = face_helper.get_face_landmarks_5( only_center_face=only_center_face, resize=640, eye_dist_threshold=5 ) print(f'\tdetect {num_det_faces} faces') # align and warp each face face_helper.align_warp_face() # face restoration for each cropped face for idx, cropped_face in enumerate(face_helper.cropped_faces): # prepare data cropped_face_t = img2tensor( cropped_face / 255.0, bgr2rgb=True, float32=True ) normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) cropped_face_t = cropped_face_t.unsqueeze(0).to(device) try: with torch.no_grad(): output = model_code_former(cropped_face_t, w=codeformer_fidelity, adain=True)[0] restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) del output torch.cuda.empty_cache() except RuntimeError as error: print(f"Failed inference for CodeFormer: {error}") restored_face = tensor2img( cropped_face_t, rgb2bgr=True, min_max=(-1, 1) ) restored_face = restored_face.astype("uint8") face_helper.add_restored_face(restored_face) # paste_back if not has_aligned: # upsample the background if bg_upsampler is not None: # Now only support RealESRGAN for upsampling background bg_img = bg_upsampler.enhance(img, outscale=upscale)[0] else: bg_img = None face_helper.get_inverse_affine(None) # paste each restored face to the input image if face_upsample and face_upsampler is not None: restored_img = face_helper.paste_faces_to_input_image( upsample_img=bg_img, draw_box=draw_box, face_upsampler=face_upsampler, ) else: restored_img = face_helper.paste_faces_to_input_image( upsample_img=bg_img, draw_box=draw_box ) output_image_path = output_path+'/output.png' os.makedirs(output_path) cv2.imwrite(output_image_path,restored_img) #cv2.imwrite(output_path+'/output.png',restored_img) output_entry_id(unique_id) #test remove response_message = "CodeFormer Image Sucessfully" response_status = "Successful" return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Enhance/{unique_id}/output.png"]) except Exception as e: shutil.rmtree(output_path) return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , []) # if __name__ == '__main__': # import uvicorn # try: # http_file_server_process = start_http_file_server() # uvicorn.run("main:app", host='0.0.0.0', port=8080, workers=1 , limit_max_requests=20) # except KeyboardInterrupt: # print("Received KeyboardInterrupt, shutting down gracefully...") # http_file_server_process.kill() # shutdown_event() if __name__ == '__main__': from cf_api import start_cf_api import subprocess api_process = subprocess.Popen(["python", "cf_api.py"]) get_server = subprocess.Popen(["/home/qtech/miniconda3/envs/api/bin/python","./get_req.py"]) report_server = subprocess.Popen(["/home/qtech/miniconda3/envs/api/bin/python","./report/report.py"]) print("on") import uvicorn import os try: if not os.path.exists('api.db'): create_database() if not os.path.exists('./subprocess/port.db'): os.remove('./subprocess/port.db') create_database_port() http_file_server_process = start_http_file_server() uvicorn.run("main:app", host='0.0.0.0', port=8080, workers=4 , limit_max_requests=200 )#, timeout_keep_alive=5) print("Received KeyboardInterrupt, shutting down gracefully...") http_file_server_process.kill() shutdown_event() except KeyboardInterrupt: print("off") print("Received KeyboardInterrupt, shutting down gracefully...") http_file_server_process.kill() shutdown_event() api_process.kill() get_server.kill() report_server.kill()