killbill007's picture
Upload 864 files
d248b55 verified
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
web_url ="https://api.quantumgrove.tech"
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
import sqlite3
def increment_video_hits(video_id):
conn = sqlite3.connect('content.db')
cursor = conn.cursor()
cursor.execute('''
UPDATE videos
SET hits = hits + 1
WHERE id = ?
''', (video_id,))
conn.commit()
conn.close()
def create_database():
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS ban_list (
id INTEGER PRIMARY KEY,
ip TEXT NOT NULL,
reason TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS request_log (
id INTEGER PRIMARY KEY,
ip TEXT NOT NULL,
url TEXT NOT NULL,
method TEXT NOT NULL,
endpoint TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS output (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL,
timestamp DATETIME NOT NULL
)''')
conn.commit()
conn.close()
def output_entry_id(output_id):
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
timestamp = datetime.now()
cursor.execute("INSERT INTO output (uuid, timestamp) VALUES (?, ?)", (output_id, timestamp))
conn.commit()
conn.close()
from datetime import datetime, timedelta
def delete_uuid(uuid_to_delete):
path = "./outputs"
directories = []
for root, dirs, files in os.walk(path):
for dir in dirs:
directories.append(os.path.join(root, dir))
for i in directories:
temp = i.split('/')
if temp[-1] == uuid_to_delete:
shutil.rmtree(i)
return True
return False
def get_old_out_ids():
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
ten_minutes_ago = datetime.now() - timedelta(minutes=5)
cursor.execute("SELECT uuid FROM output WHERE timestamp < ?", (ten_minutes_ago,))
rows = cursor.fetchall()
if len(rows) == 0:
return None
for entry in rows:
if delete_uuid(entry[0]):
conn.execute("DELETE FROM output WHERE uuid = ?", (entry[0],))
conn.commit()
conn.close()
return True
def insert_ban(ip, reason):
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
cursor.execute('''INSERT INTO ban_list (ip, reason) VALUES (?, ?)''', (ip, reason))
conn.commit()
conn.close()
def search_ban(ip):
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
cursor.execute('''SELECT * FROM ban_list WHERE ip = ?''', (ip,))
rows = cursor.fetchall()
conn.close()
return rows
def log_request(ip, url, method, endpoint):
conn = sqlite3.connect('api.db')
cursor = conn.cursor()
cursor.execute('''INSERT INTO request_log (ip, url, method, endpoint) VALUES (?, ?, ?, ?)''', (ip, url, method, endpoint))
conn.commit()
conn.close()
def create_database_port():
conn = sqlite3.connect('./subprocess/port.db', check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''DROP TABLE IF EXISTS ports''')
conn.commit()
cursor.execute('''CREATE TABLE IF NOT EXISTS ports
(port INTEGER)''')
conn.commit()
conn.close()
def search_port(port):
conn = sqlite3.connect('./subprocess/port.db')
cursor = conn.cursor()
cursor.execute('''SELECT * FROM ports WHERE port = ?''', (port,))
rows = cursor.fetchall()
conn.close()
if len(rows) > 0:
return True
return False
def is_table_empty():
return False #test remove
conn = sqlite3.connect('./subprocess/port.db')
cursor = conn.cursor()
cursor.execute('''SELECT COUNT(*) FROM ports''')
count = cursor.fetchone()[0]
conn.close()
if count == 0:
for item in os.listdir('/tmp/gradio'):
item_path = os.path.join('/tmp/gradio', item)
if os.path.isdir(item_path):
shutil.rmtree(item_path)
def remove_port(port):
if(search_port(port)):
conn = sqlite3.connect('./subprocess/port.db')
cursor = conn.cursor()
cursor.execute('''DELETE FROM ports WHERE port = ?''', (port,))
conn.commit()
conn.close()
return True
return False
def insert_port(port):
if( not search_port(port)):
conn = sqlite3.connect('./subprocess/port.db')
cursor = conn.cursor()
cursor.execute('''INSERT INTO ports (port) VALUES (?)''', (port,))
conn.commit()
conn.close()
return True
else:
return False
def generate_response(response_message , response_status ,uuid_code , age , gender , metadata):
response_dict = {
"response_message" : response_message,
"response_status" : response_status,
"data" :{
"UUID" : uuid_code,
"info" : {"age" : age , "gender" : gender},
"metadata" : metadata
}
}
return response_dict
import numpy as np
import cv2
import time
import io
import gc
import shutil
import uuid
import torch
import subprocess
import torchvision.transforms as transforms
from scripts.psp import pSp
from argparse import Namespace
import dlib
from scripts.align_all_parallel import align_face
from scripts.augmentations import AgeTransformer
from scripts.common import tensor2im
from torchvision.transforms.functional import normalize
from scripts.basicsr.utils import img2tensor, tensor2img
from scripts.basicsr.utils.misc import get_device
from scripts.facelib.utils.face_restoration_helper import FaceRestoreHelper
from scripts.basicsr.utils.registry import ARCH_REGISTRY
from scripts.basicsr.archs.rrdbnet_arch import RRDBNet
from scripts.basicsr.utils.realesrgan_utils import RealESRGANer
from scripts.facelib.utils.misc import is_gray
import PIL.Image
from scripts.erasescratches.models import Pix2PixHDModel_Mapping
from scripts.erasescratches.options import Options
from scripts.maskscratches import ScratchesDetector
from scripts.util import irregular_hole_synthesize, tensor_to_ndarray
import insightface
from insightface.app import FaceAnalysis
from pydub import AudioSegment
import scripts.RRDBNet_arch as arch
from RealESRGAN import RealESRGAN
from PIL import Image
import socket
import subprocess
from gradio_client import Client, handle_file
device = ("cuda" if torch.cuda.is_available() else "cpu")
#age model
EXPERIMENT_TYPE = 'ffhq_aging'
model_path = "./models/sam_ffhq_aging.pt"
model_age_slider = None
EXPERIMENT_DATA_ARGS = {
"ffhq_aging": {
"transform": transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
}
}
EXPERIMENT_ARGS = EXPERIMENT_DATA_ARGS[EXPERIMENT_TYPE]
def load_model_age():
try:
if not get_model_state_age():
ckpt = torch.load(model_path, map_location='cpu')
opts = ckpt['opts']
del ckpt
torch.cuda.empty_cache()
opts['checkpoint_path'] = model_path
opts = Namespace(**opts)
global model_age_slider
model_age_slider = pSp(opts)
del opts
torch.cuda.empty_cache()
model_age_slider.eval()
model_age_slider.cuda()
torch.cuda.empty_cache()
return True
except:
return False
def unload_model_age():
try:
global model_age_slider
if model_age_slider is not None:
model_age_slider = None
torch.cuda.empty_cache()
return True
except:
return False
def get_model_state_age():
global model_age_slider
if model_age_slider is None:
return False
return True
#model bg
from scripts.BG import BG
bg_model = None
def load_model_bg():
try:
global bg_model
if bg_model is None:
bg_model = BG()
return True
except:
return False
def unload_model_bg():
try:
global bg_model
if bg_model is not None:
bg_model = None
return True
except:
return False
def get_model_state_bg():
global bg_model
if bg_model is None:
return False
return True
#model code_former
model_upsampler = None
model_code_former = None
def set_realesrgan_cf():
use_half = True if torch.cuda.is_available() else False
model = RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=2,
)
upsampler = RealESRGANer(
scale=2,
model_path="./models/realesrgan/RealESRGAN_x2plus.pth",
model=model,
tile=400,
tile_pad=40,
pre_pad=0,
half=use_half
)
return upsampler
def load_model_cf():
try:
device = get_device()
global model_code_former
model_code_former = ARCH_REGISTRY.get('CodeFormer')(dim_embd=512, codebook_size=1024, n_head=8, n_layers=9,
connect_list=['32', '64', '128', '256']).to(device)
ckpt_path = 'models/CodeFormer/codeformer.pth'
checkpoint = torch.load(ckpt_path)['params_ema']
model_code_former.load_state_dict(checkpoint)
model_code_former.eval()
global model_upsampler
model_upsampler = set_realesrgan_cf()
return True
except:
return False
def get_model_state_cf():
global model_code_former
if model_code_former is None:
return False
else:
return True
def unload_model_cf():
if get_model_state_cf() == False:
return True
try:
global model_code_former
model_code_former = None
torch.cuda.empty_cache()
global model_upsampler
model_upsampler = None
torch.cuda.empty_cache()
return True
except:
return False
#model swap
swap_model = None
app_model = None
def load_model_swap():
try:
global swap_model
global app_model
if swap_model is None:
swap_model = insightface.model_zoo.get_model("./models/inswapper_128.onnx",download=False,download_zip=False)
app_model = FaceAnalysis(name="buffalo_l")
app_model.prepare(ctx_id=0,det_size=(640,640))
return True
except:
return False
def unload_model_swap():
try:
global swap_model
global app_model
if swap_model is not None:
swap_model = None
if app_model is not None:
app_model = None
return True
except:
return False
def get_model_state_swap():
global swap_model
if swap_model is None:
return False
return True
#model rest
model_scratches_remove = None
model_scratches_remove_detector = None
model_scratches_remove_options = None
def load_model_rest():
try:
model_path = "./models/zeroscratches/restoration"
global model_scratches_remove_detector
model_scratches_remove_detector = ScratchesDetector('./models/zeroscratches')
gpu_ids = []
if torch.cuda.is_available():
gpu_ids = [d for d in range(torch.cuda.device_count())]
global model_scratches_remove_options
model_scratches_remove_options = Options(model_path, gpu_ids)
model_scratches = Pix2PixHDModel_Mapping()
global model_scratches_remove
model_scratches_remove = Pix2PixHDModel_Mapping()
model_scratches_remove.initialize(model_scratches_remove_options)
model_scratches_remove.eval()
return True
except:
return False
def get_model_state_rest():
global model_scratches_remove
if model_scratches_remove is None:
return False
else:
return True
def unload_model_rest():
if get_model_state_rest() == False:
return True
try:
global model_scratches_remove
model_scratches_remove = None
global model_scratches_remove_detector
model_scratches_remove_detector = None
global model_scratches_remove_options
model_scratches_remove_options = None
torch.cuda.empty_cache()
return True
except:
return False
#model talk
from scripts.talker import sad_talker
sad_talker_model = None
def load_model_talk():
try:
global sad_talker_model
if sad_talker_model is None:
sad_talker_model = sad_talker()
return True
except:
return False
def unload_model_talk():
try:
global sad_talker_model
if sad_talker_model is not None:
sad_talker_model = None
return True
except:
return False
def get_model_state_talk():
global sad_talker_model
if sad_talker_model is None:
return False
return True
#model sky
from scripts.SKY import SKY
sky_model = None
def load_model_sky():
try:
global sky_model
if sky_model is None:
sky_model = SKY()
return True
except:
return False
def unload_model_sky():
try:
global sky_model
if sky_model is not None:
sky_model = None
return True
except:
return False
def get_model_state_sky():
global sky_model
if sky_model is None:
return False
return True
#model up
model_upscale = None
def load_model_up():
try:
model_path_upscale = './models/RRDB_ESRGAN_x4.pth'
global model_upscale
model_upscale = arch.RRDBNet(3, 3, 64, 23, gc=32)
model_upscale.load_state_dict(torch.load(model_path_upscale), strict=True)
model_upscale = model_upscale.half()
model_upscale.eval()
model_upscale = model_upscale.to(device)
return True
except:
return False
def get_model_state_up():
global model_upscale
if model_upscale is None:
return False
else:
return True
def unload_model_up():
if get_model_state_up() == False:
return True
try:
global model_upscale
model_upscale = None
torch.cuda.empty_cache()
return True
except:
return False
#agr slider helper
def run_alignment(image_path):
predictor = dlib.shape_predictor("./models/shape_predictor_68_face_landmarks.dat")
aligned_image = align_face(filepath=image_path, predictor=predictor)
return aligned_image
def run_on_batch(inputs, model_age_slider):
result_batch = model_age_slider(inputs.to("cuda").float(), randomize_noise=False, resize=False)
return result_batch
#model model_esrgan
model_esrgan = None
def load_model_esrgan():
try:
global model_esrgan
model_esrgan = RealESRGAN( torch.device(device), scale=2)
model_esrgan.load_weights('./models/realesrgan/RealESRGAN_x2plus.pth', download=False)
return True
except:
return False
def get_model_state_esrgan():
global model_esrgan
if model_esrgan is None:
return False
else:
return True
def unload_model_esrgan():
if get_model_state_esrgan() == False:
return True
try:
global model_esrgan
model_esrgan = None
torch.cuda.empty_cache()
return True
except:
return False
#model lp
def is_port_in_use(port, host='localhost'):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.bind((host, port))
except socket.error:
return True
return False
import requests
def is_server_ready(url, timeout=30):
start_time = time.time()
while True:
try:
response = requests.get(url)
if response.status_code == 200:
return True
except requests.RequestException:
pass
if time.time() - start_time > timeout:
return False
time.sleep(1)
model_port_lp = None
def load_port_lp():
try:
if not get_model_port_lp():
port_to_check = 60000
con_count = 0
while(True):
if con_count > 100:
break
if is_port_in_use(port_to_check) or (search_port(port_to_check)):
port_to_check = port_to_check + 1
con_count = con_count + 1
else:
break
port_to_check = str(port_to_check)
script_dir = "./subprocess/LivePortrait"
script_dir = os.path.abspath(script_dir)
command = ["/home/qtech/miniconda3/envs/LivePortrait/bin/python", script_dir+"/app.py", "--server_port", port_to_check]
process = subprocess.Popen(command, cwd=script_dir)
global model_port_lp
model_port_lp = {"port":port_to_check , "pid":process}
insert_port(port_to_check)
is_server_ready(f'http://127.0.0.1:{port_to_check}/')
return True
except:
return False
def unload_port_lp():
try:
if get_model_port_lp():
global model_port_lp
port = model_port_lp['port']
remove_port(port)
pid = model_port_lp['pid']
pid.terminate()
pid.kill()
model_port_lp = None
return True
else:
return True
except:
return False
def get_model_port_lp():
global model_port_lp
if model_port_lp is None:
return False
return True
def unload_model_all():
try:
unload_model_age()
unload_model_bg()
unload_model_cf()
unload_model_up()
unload_model_swap()
unload_model_rest()
unload_model_talk()
unload_model_sky()
unload_model_esrgan()
unload_port_lp()
try:
is_table_empty()
except:
pass
return True
except:
return False
def start_http_file_server():
directory = './outputs'
port = 8010
if not os.path.exists(directory):
os.makedirs(directory)
cmd = f"python -m http.server {port} --directory {directory}"
return subprocess.Popen(cmd, shell=True)
import asyncio
from datetime import datetime
time_last_call = None
async def continuous_function():
while True:
if get_old_out_ids():#test remove
print(f"Old Data Deleted")
if torch.cuda.is_available():
torch.cuda.empty_cache()
global time_last_call
if time_last_call is not None:
current_time = datetime.now()
time_difference = current_time - time_last_call
time_elapsed_minutes = time_difference.total_seconds() / 60
if time_elapsed_minutes > 1:
gc.collect()
if time_elapsed_minutes > 5:
model_delete_status = unload_model_all()
print(f"model delete status = {model_delete_status}")
time_last_call = None
await asyncio.sleep(60*1)
async def start_continuous_function():
await continuous_function()
app = FastAPI()
from fastapi.responses import HTMLResponse
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from api_analytics.fastapi import Analytics
app.add_middleware(Analytics, api_key="87686a97-20e4-42c9-99ab-9410c2ef2e65") #test remove
# async def custom_middleware(request: Request, call_next):
# client_ip = request.client.host
# request_url = str(request.url)
# request_method = request.method
# endpoint = request.url.path
# log_request(client_ip, request_url, request_method, endpoint)
# print("\n\n\n\n\n")
# print("request:")
# headers = request.headers
# query_params = request.query_params
# path_params = request.path_params
# method = request.method
# print("\n\n\n\n\n")
# print("Request Method:", method)
# print("Request Path Params:", path_params)
# print("Request Query Params:", query_params)
# print("Request Headers:", headers)
# print("\n\n\n\n\n")
# print("\n\n\n\n\n")
# if len(search_ban(client_ip)) > 0:
# return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
# # user_agent = request.headers.get("user-agent", "").lower()
# # print(user_agent)
# # if "android" not in user_agent:
# # insert_data(client_ip, "Access using unapproved device type")
# # return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
# response = await call_next(request)
# if (response.status_code == 404) or (response.status_code == 405):
# insert_ban(client_ip, "Access using unapproved method/endpoint")
# return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
# return response
from fastapi import Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from fastapi import Request, Response
from fastapi.responses import StreamingResponse
import io
async def custom_middleware(request: Request, call_next):
client_ip = request.client.host
request_url = str(request.url)
request_method = request.method
endpoint = request.url.path
log_request(client_ip, request_url, request_method, endpoint)
print("\n\n\n\n\n")
print("Request Details:")
headers = request.headers
query_params = request.query_params
path_params = request.path_params
method = request.method
print("Request Method:", method)
print("Request Path Params:", path_params)
print("Request Query Params:", query_params)
print("Request Headers:", headers)
print("\n\n\n\n\n")
if len(search_ban(client_ip)) > 0:
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
response = await call_next(request)
if isinstance(response, StreamingResponse):
# Buffer the streaming content for inspection (be cautious of performance impacts)
content = io.BytesIO()
async for chunk in response.body_iterator:
content.write(chunk)
content.seek(0)
response = StreamingResponse(content, media_type=response.media_type)
# Now you can log the content if needed
print("Response Content:", content.getvalue().decode('utf-8', errors='replace'))
else:
# Handle non-streaming responses as before
response_body = b"".join([chunk async for chunk in response.body_iterator]).decode('utf-8', errors='replace')
print("Response Content:", response_body)
if (response.status_code == 404) or (response.status_code == 405):
insert_ban(client_ip, "Access using unapproved method/endpoint")
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
return response
app.middleware('http')(custom_middleware)
@app.get("/", response_class=HTMLResponse)
def read_index():
index_path = './assests/index.html'
with open(index_path, 'r') as file:
content = file.read()
return HTMLResponse(content=content)
from fastapi.responses import FileResponse
@app.get("/favicon.ico")
async def get_image():
image_path = "./assests/logo.png"
return FileResponse(image_path, media_type="image/png")
@app.on_event("startup")
async def startup_event():
asyncio.create_task(start_continuous_function())
@app.on_event("shutdown")
async def shutdown_event():
print("Clearing Models")
model_status = unload_model_all()
print("Model clear status = "+str(model_status))
asyncio.get_event_loop().stop()
print("Stopping Server")
ALLOWED_IMAGE_FORMATS = ["jpg", "jpeg", "png"]
ALLOWED_AUDIO_FORMATS = ["wav", "mp3"]
def delete_uuid(uuid_to_delete):
path = "./outputs"
directories = []
for root, dirs, files in os.walk(path):
for dir in dirs:
directories.append(os.path.join(root, dir))
for i in directories:
temp = i.split('/')
if temp[-1] == uuid_to_delete:
shutil.rmtree(i)
return True
return False
@app.delete("/Delete/{uuid}")
async def delete_item(uuid: str):
gc.collect()
try:
result = delete_uuid(uuid)
if result:
return {"error": f"UUID '{uuid}' does not exist"}
return {"message": f"UUID '{uuid}' deleted"}
except :
return {"error": "unexpected error"}
@app.put("/ClearModel_ALL")
async def clearModel_ALL():
try:
model_delete_status = unload_model_all()
if model_delete_status:
global time_last_call
time_last_call = None
return {"message": "vram cleared"}
else :
return {"message": "vram not cleared"}
except :
return {"error": "unexpected error"}
@app.post("/AgeSlider")
async def age_slider(background_tasks: BackgroundTasks , image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
app_face = FaceAnalysis(name="buffalo_l")
app_face.prepare(ctx_id=0,det_size=(640,640))
faces = app_face.get(image)
if len(faces)==0:
response_message = "Error no face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
elif len(faces)!=1:
response_message = "Error more than 1 face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
info_age = faces[0]['age']
info_gender = faces[0]['gender']
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/AGE/{unique_id}"
os.makedirs(output_path)
# loading model
if not get_model_state_age():
unload_model_all()
model_status = load_model_age()
print(f"model status =========={model_status}")
cv2.imwrite(output_path+'/input.png',image)
aligned_image = run_alignment(output_path+'/input.png')
#aligned_image.save(output_path+"/input_aligned_cropped.png")
copy_aligned_image = aligned_image.copy()
aligned_image.resize((256, 256))
img_transforms = EXPERIMENT_ARGS['transform']
input_image = img_transforms(aligned_image)
target_ages = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
age_transformers = [AgeTransformer(target_age=age) for age in target_ages]
images = []
for age_transformer in age_transformers:
with torch.no_grad():
input_image_age = [age_transformer(input_image.cpu()).to('cuda')]
input_image_age = torch.stack(input_image_age)
result_tensor = run_on_batch(input_image_age, model_age_slider)[0]
result_image = tensor2im(result_tensor)
images.append(result_image)
image_paths_temp = []
result_paths_list = []
for idx,i in enumerate(images):
image_temp = np.array(i)
image_temp = cv2.cvtColor(np.array(image_temp), cv2.COLOR_RGB2BGR)
result_path_temp = output_path+"/output_"+str(idx)+".png"
cv2.imwrite(result_path_temp,image_temp)
image_url_temp = '/'.join(result_path_temp.split('/')[-3:])
image_url_temp = f"{web_url}:8001/{image_url_temp}"
result_paths_list.append({"age" :str(target_ages[idx]), "image_url" : image_url_temp})
image_paths_temp.append(result_path_temp)
closest_age = min(target_ages, key=lambda x: abs(x - info_age))
closest_age = target_ages.index(closest_age)
os.remove(f"{output_path}/output_{closest_age}.png")
copy_aligned_image = copy_aligned_image.resize((1024, 1024))
copy_aligned_image.save(f"{output_path}/output_{closest_age}.png")
background_tasks.add_task(Age_Slider_Video,unique_id , image_paths_temp)
os.remove(output_path+'/input.png')
response_message = "AgeSlider Ran Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , info_age , ["Female","Male"][info_gender] , result_paths_list)
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
def Age_Slider_Video(uuid_input , image_files):
temp_file_uuid = str(uuid.uuid4())
directory_path = f'./outputs/{temp_file_uuid}'
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
os.mkdir(directory_path)
from scripts.morph_video import doMorphing
doMorphing(image_files, 0.3, 20, f"{directory_path}/output")
video_path = f"{directory_path}/output_combined.mp4"
gif_path = f"{directory_path}/output.gif"
shutil.copy(video_path, f"./outputs/AGE/{uuid_input}/output_combined.mp4")
shutil.copy(gif_path, f"./outputs/AGE/{uuid_input}/output.gif")
shutil.rmtree(directory_path)
output_entry_id(uuid_input)
@app.post("/AgeSliderVideo/{uuid_input}")
async def ageslidervideo(uuid_input: str):
##################
try:
input_path = f"./outputs/AGE/{uuid_input}"
###################################################################################################################################################################################
############################################################################### Test Input UUID ##################################################################################
###################################################################################################################################################################################
if not os.path.exists(input_path):
response_message = f"Invalid UUID , file does not exist"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Processing ##################################################################################
###################################################################################################################################################################################
output_video_path = ''
output_gif_path = ''
duration = 120
start_time = time.time()
while time.time() - start_time < duration:
if output_video_path == '':
if os.path.exists(input_path+"/output_combined.mp4"):
output_video_path = f'{web_url}:8001/AGE/{uuid_input}'+"/output_combined.mp4"
if output_gif_path == '':
if os.path.exists(input_path+"/output.gif"):
output_gif_path = f'{web_url}:8001/AGE/{uuid_input}'+"/output.gif"
if output_video_path != '':
if output_gif_path != '':
#output_entry_id(uuid_input) #test remove
response_message = "Video Genrated Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}])
if output_video_path != '':
if output_gif_path == '':
response_message = "Only Video Genrated"
response_status = "Failure"
return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}])
if output_gif_path != '':
if output_video_path == '':
response_message = "Only GIF Genrated"
response_status = "Failure"
return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}])
if output_gif_path == '':
if output_video_path == '':
response_message = "Timeout Reached , Retry Later"
response_status = "Failure"
return generate_response(response_message , response_status ,uuid_input , '' , '', [{"video":output_video_path , "gif":output_gif_path}])
except Exception as e:
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/BG_remove")
async def bg(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/BG/{unique_id}"
# loading model
if not get_model_state_bg():
unload_model_all()
model_status = load_model_bg()
print(f"model status =========={model_status}")
output_image = bg_model.BG_remove(image)
os.makedirs(output_path)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,output_image)
output_entry_id(unique_id) #test remove
response_message = "BG Removed Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/BG/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/CodeFormer")
async def codeformer(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
height = image.shape[0]
width = image.shape[1]
if (height*width)>(500*500) :
aspect_ratio = width / height
new_height = int(((500*500) / aspect_ratio) ** 0.5)
new_width = int(aspect_ratio * new_height)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/CF/{unique_id}"
# loading model
if not get_model_state_cf():
unload_model_all()
model_status = load_model_cf()
print(f"model status =========={model_status}")
#image = cv2.imread(output_path+'/input.png')
has_aligned = False
only_center_face = False
draw_box = False
detection_model = "retinaface_resnet50"
background_enhance = True
face_upsample = True
upscale = 2
codeformer_fidelity = 0.5
img = image#cv2.imread(str(img_path), cv2.IMREAD_COLOR)
upscale = int(upscale) # convert type to int
if upscale > 4: # avoid memory exceeded due to too large upscale
upscale = 4
if upscale > 2 and max(img.shape[:2])>1000: # avoid memory exceeded due to too large img resolution
upscale = 2
if max(img.shape[:2]) > 1500: # avoid memory exceeded due to too large img resolution
upscale = 1
background_enhance = False
face_upsample = False
face_helper = FaceRestoreHelper(
upscale,
face_size=512,
crop_ratio=(1, 1),
det_model=detection_model,
save_ext="png",
use_parse=True,
device=device,
)
bg_upsampler = model_upsampler if background_enhance else None
face_upsampler = model_upsampler if face_upsample else None
if has_aligned:
# the input faces are already cropped and aligned
img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR)
face_helper.is_gray = is_gray(img, threshold=5)
if face_helper.is_gray:
print('\tgrayscale input: True')
face_helper.cropped_faces = [img]
else:
face_helper.read_image(img)
# get face landmarks for each face
num_det_faces = face_helper.get_face_landmarks_5(
only_center_face=only_center_face, resize=640, eye_dist_threshold=5
)
print(f'\tdetect {num_det_faces} faces')
# align and warp each face
face_helper.align_warp_face()
# face restoration for each cropped face
for idx, cropped_face in enumerate(face_helper.cropped_faces):
# prepare data
cropped_face_t = img2tensor(
cropped_face / 255.0, bgr2rgb=True, float32=True
)
normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True)
cropped_face_t = cropped_face_t.unsqueeze(0).to(device)
try:
with torch.no_grad():
output = model_code_former(cropped_face_t, w=codeformer_fidelity, adain=True)[0]
restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1))
del output
torch.cuda.empty_cache()
except RuntimeError as error:
print(f"Failed inference for CodeFormer: {error}")
restored_face = tensor2img(
cropped_face_t, rgb2bgr=True, min_max=(-1, 1)
)
restored_face = restored_face.astype("uint8")
face_helper.add_restored_face(restored_face)
# paste_back
if not has_aligned:
# upsample the background
if bg_upsampler is not None:
# Now only support RealESRGAN for upsampling background
bg_img = bg_upsampler.enhance(img, outscale=upscale)[0]
else:
bg_img = None
face_helper.get_inverse_affine(None)
# paste each restored face to the input image
if face_upsample and face_upsampler is not None:
restored_img = face_helper.paste_faces_to_input_image(
upsample_img=bg_img,
draw_box=draw_box,
face_upsampler=face_upsampler,
)
else:
restored_img = face_helper.paste_faces_to_input_image(
upsample_img=bg_img, draw_box=draw_box
)
output_image_path = output_path+'/output.png'
os.makedirs(output_path)
cv2.imwrite(output_image_path,restored_img)
#cv2.imwrite(output_path+'/output.png',restored_img)
output_entry_id(unique_id) #test remove
response_message = "CodeFormer Image Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/CF/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/FaceSwap_single_image")
async def Swap_single_image(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/SWAP/{unique_id}"
# loading model
if not get_model_state_swap():
unload_model_all()
model_status = load_model_swap()
print(f"model status =========={model_status}")
faces = app_model.get(image)
if len(faces)==0:
response_message = "Error no face detected"
response_status = "Failure"
return generate_response(response_message,response_status,'' , '' , '' , [])
elif len(faces)!=2:
response_message = "Error more than 2 face detected"
response_status = "Failure"
return generate_response(response_message,response_status,'' , '' , '' , [])
face1 = faces[0]
face2 = faces[1]
image = swap_model.get(image,face1,face2,paste_back=True)
image = swap_model.get(image,face2,face1,paste_back=True)
os.makedirs(output_path)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,image)
output_entry_id(unique_id) #test remove
response_message = "Face Swapped Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SWAP/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/FaceSwap_two_images")
async def swap_two_image(image1: UploadFile = File(...), image2: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
images = []
for idx ,image in enumerate([image1,image2]):
image_format = image.filename.split(".")[-1].lower()
print(image.filename,image.filename.split(".")[-1].lower())
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image{str(idx+1)} format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
images.append(image)
if image is None:
response_message = f"Image{str(idx+1)} is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/SWAP/{unique_id}"
# loading model
if not get_model_state_swap():
unload_model_all()
model_status = load_model_swap()
print(f"model status =========={model_status}")
faces = []
faces.append(app_model.get(images[0]))
faces.append(app_model.get(images[1]))
for idx ,face in enumerate(faces):
if len(face)==0:
response_message = f"Error no face detected in image{idx}"
response_status = "Failure"
return generate_response(response_message,response_status,'' , '' , '' , [])
elif len(face) >1:
response_message = f"Error more than 1 face detected in image{idx}"
response_status = "Failure"
return generate_response(response_message,response_status,'' , '' , '' , [])
face1 = faces[0][0]
face2 = faces[1][0]
image = swap_model.get(images[0],face1,face2,paste_back=True)
os.makedirs(output_path)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,image)
output_entry_id(unique_id) #test remove
response_message = "Face Swapped Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SWAP/{unique_id}/output.png"])
except Exception as e:
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/Restore_Images")
async def restore(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/Restore/{unique_id}"
os.makedirs(output_path)
cv2.imwrite(output_path+'/input.png',image)
# loading model
if not get_model_state_rest():
unload_model_all()
model_status = load_model_rest()
print(f"model status =========={model_status}")
image = PIL.Image.open(output_path+'/input.png')
os.remove(output_path+'/input.png')
transformed, mask = model_scratches_remove_detector.process(image)
img_transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
mask_transform = transforms.ToTensor()
if model_scratches_remove_options.mask_dilation != 0:
kernel = np.ones((3, 3), np.uint8)
mask = np.array(mask)
mask = cv2.dilate(mask, kernel, iterations=model_scratches_remove_options.mask_dilation)
mask = PIL.Image.fromarray(mask.astype('uint8'))
transformed = irregular_hole_synthesize(transformed, mask)
mask = mask_transform(mask)
mask = mask[:1, :, :]
mask = mask.unsqueeze(0)
transformed = img_transform(transformed)
transformed = transformed.unsqueeze(0)
generated = model_scratches_remove.inference(transformed, mask)
tensor_restored = (generated.data.cpu() + 1.0) / 2.0
image_to_show = tensor_restored.squeeze().cpu().numpy().transpose((1, 2, 0))
image_to_show = (image_to_show * 255).astype(np.uint8)[:, :, ::-1]
from scripts.colorizer import colorize_image
image_to_show = colorize_image(image_to_show)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,image_to_show)
output_entry_id(unique_id) #test remove
response_message = "Restred Image Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Restore/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/Sad_Talker")
async def sadtalker(image: UploadFile = File(...), audio: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Test Input Audio ##################################################################################
###################################################################################################################################################################################
audio_format = audio.filename.split(".")[-1].lower()
if audio_format not in ALLOWED_AUDIO_FORMATS:
response_message = f"Unsupported audio format. Supported formats: {', '.join(ALLOWED_AUDIO_FORMATS)}"
response_status = "Failure"
return generate_response(response_message, response_status, '', '', '', [])
audio_contents = await audio.read()
if len(audio_contents) == 0:
response_message = "Audio file is empty"
response_status = "Failure"
return generate_response(response_message, response_status, '', '', '', [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/Sad_Talker/{unique_id}"
os.makedirs(output_path)
image_input_path = output_path+'/input.png'
cv2.imwrite(image_input_path,image)
audio_segment = AudioSegment.from_file(io.BytesIO(audio_contents), format=audio_format)
audio_input_path = os.path.join(output_path, f"audio.{audio_format}")
audio_segment.export(audio_input_path, format=audio_format)
app_face = FaceAnalysis(name="buffalo_l")
app_face.prepare(ctx_id=0,det_size=(640,640))
faces = app_face.get(image)
if len(faces)==0:
response_message = "Error no face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
elif len(faces)!=1:
response_message = "Error more than 1 face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
info_age = faces[0]['age']
info_gender = faces[0]['gender']
# loading model
if not get_model_state_talk():
unload_model_all()
model_status = load_model_talk()
print(f"model status =========={model_status}")
response = sad_talker_model.genrate_video(image_input_path,audio_input_path,output_path,True)
# input_file = f"{output_path}/output.mp4 "
# subprocess.run(['ffmpeg', '-i', input_file, '-c:v', 'libx264', '-preset', 'slow', '-crf', '23', '-c:a', 'aac', '-b:a', '192k', '-y', input_file], check=True)
os.remove(image_input_path)
os.remove(audio_input_path)
if response:
input_file = f"outputs/Sad_Talker/{unique_id}/output.mp4"
output_file = f"outputs/Sad_Talker/{unique_id}/final.mp4"
subprocess.run(['ffmpeg', '-i', input_file, '-c:v', 'libx264', '-preset', 'slow', '-crf', '23', '-c:a', 'aac', '-b:a', '192k', output_file], check=True)
os.remove(input_file)
output_entry_id(unique_id) #test remove
response_message = "Video Generated Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , info_age , ["Female","Male"][info_gender] , [f"{web_url}:8001/Sad_Talker/{unique_id}/final.mp4"])
else:
response_message = "Error Genrating Video, try another image"
response_status = "Failure"
#shutil.rmtree(output_path)
return generate_response(response_message , response_status ,'','','',[])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/SKY_remove")
async def sky(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/SKY/{unique_id}"
# loading model
if not get_model_state_sky():
unload_model_all()
model_status = load_model_sky()
print(f"model status =========={model_status}")
output_image = sky_model.SKY_remove(image)
os.makedirs(output_path)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,output_image)
output_entry_id(unique_id) #test remove
response_message = "SKY Removed Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SKY/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/SKY_replace_image")
async def sky_replace_image(image1: UploadFile = File(...), image2: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
images = []
for idx ,image in enumerate([image1,image2]):
image_format = image.filename.split(".")[-1].lower()
print(image.filename,image.filename.split(".")[-1].lower())
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image{str(idx+1)} format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
images.append(image)
if image is None:
response_message = f"Image{str(idx+1)} is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/SKY/{unique_id}"
# loading model
if not get_model_state_sky():
unload_model_all()
model_status = load_model_sky()
print(f"model status =========={model_status}")
output_image = sky_model.SKY_replace_image(images[0],images[1])
os.makedirs(output_path)
output_image_path = output_path+'/output.png'
cv2.imwrite(output_image_path,output_image)
output_entry_id(unique_id) #test remove
response_message = "SKY Removed Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/SKY/{unique_id}/output.png"])
except Exception as e:
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/ImageUpscale")
async def upscale(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
height = image.shape[0]
width = image.shape[1]
if (height*width)>(200*200) :
aspect_ratio = width / height
new_height = int(((200*200) / aspect_ratio) ** 0.5)
new_width = int(aspect_ratio * new_height)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/Upscale/{unique_id}"
# loading model
if not get_model_state_up():
unload_model_all()
model_status = load_model_up()
print(f"model status =========={model_status}")
#image = cv2.imread(output_path+'/input.png')
image = image * 1.0 / 255
image = torch.from_numpy(np.transpose(image[:, :, [2, 1, 0]], (2, 0, 1))).float()
img_LR = image.unsqueeze(0)
img_LR = img_LR.to(device).half()
output = model_upscale(img_LR).data.squeeze().float().cpu().clamp_(0, 1).numpy()
output = np.transpose(output[[2, 1, 0], :, :], (1, 2, 0))
output = (output * 255.0).round()
image = output.astype(np.uint8)
output_image_path = output_path+'/output.png'
os.makedirs(output_path)
cv2.imwrite(output_image_path,image)
output_entry_id(unique_id) #test remove
response_message = "Upscaled Image Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Upscale/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/ImageDenoise")
async def deniose(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
height = image.shape[0]
width = image.shape[1]
if (height*width)>(1500*1500) :
aspect_ratio = width / height
new_height = int(((1500*1500) / aspect_ratio) ** 0.5)
new_width = int(aspect_ratio * new_height)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/Denoise/{unique_id}"
# loading model
if not get_model_state_esrgan():
unload_model_all()
model_status = load_model_esrgan()
print(f"model status =========={model_status}")
#image = cv2.imread(output_path+'/input.png')
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_pil = Image.fromarray(image_rgb)
global model_esrgan
result_pil = model_esrgan.predict(image_pil)
result_rgb = result_pil.convert('RGB')
result_np = np.array(result_rgb)
image = cv2.cvtColor(result_np, cv2.COLOR_RGB2BGR)
output_image_path = output_path+'/output.png'
os.makedirs(output_path)
cv2.imwrite(output_image_path,image)
output_entry_id(unique_id) #test remove
response_message = "Upscaled Image Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Denoise/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
video_url = [{'id': '6a025d6c-5cf9-4de0-b323-6a628563327f',
'url': 'https://api.quantumgrove.tech:8001/Videos/d13.mp4',
'title': '6a025d6c-5cf9-4de0-b323-6a628563327f',
'info': '6a025d6c-5cf9-4de0-b323-6a628563327f',
'path': './subprocess/LivePortrait/assets/examples/driving/d13.mp4',
"cat" : "Trending"},
{'id': '70153a58-0b49-44f2-8c81-816aff6ee2fe',
'url': 'https://api.quantumgrove.tech:8001/Videos/d10.mp4',
'title': '70153a58-0b49-44f2-8c81-816aff6ee2fe',
'info': '70153a58-0b49-44f2-8c81-816aff6ee2fe',
'path': './subprocess/LivePortrait/assets/examples/driving/d10.mp4',
"cat" : "Kids"},
{'id': '5df41b72-f7e6-4aba-aa19-fc970d310dc7',
'url': 'https://api.quantumgrove.tech:8001/Videos/d19.mp4',
'title': '5df41b72-f7e6-4aba-aa19-fc970d310dc7',
'info': '5df41b72-f7e6-4aba-aa19-fc970d310dc7',
'path': './subprocess/LivePortrait/assets/examples/driving/d19.mp4',
"cat" : "Kids"},
{'id': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea',
'url': 'https://api.quantumgrove.tech:8001/Videos/d9.mp4',
'title': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea',
'info': '3ca89a2a-145d-4d7c-88c3-f7e0bc3c2dea',
'path': './subprocess/LivePortrait/assets/examples/driving/d9.mp4',
"cat" : "Trending"},
{'id': '93116106-7c25-41fa-8343-35c866ce9954',
'url': 'https://api.quantumgrove.tech:8001/Videos/d3.mp4',
'title': '93116106-7c25-41fa-8343-35c866ce9954',
'info': '93116106-7c25-41fa-8343-35c866ce9954',
'path': './subprocess/LivePortrait/assets/examples/driving/d3.mp4',
"cat" : "Trending"},
{'id': '15576fab-3125-4bf2-a26b-2a3f69120516',
'url': 'https://api.quantumgrove.tech:8001/Videos/d14.mp4',
'title': '15576fab-3125-4bf2-a26b-2a3f69120516',
'info': '15576fab-3125-4bf2-a26b-2a3f69120516',
'path': './subprocess/LivePortrait/assets/examples/driving/d14.mp4',
"cat" : "Trending"},
{'id': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e',
'url': 'https://api.quantumgrove.tech:8001/Videos/d11.mp4',
'title': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e',
'info': '8ff558b3-5437-4b0c-a1f0-a02758eb7c7e',
'path': './subprocess/LivePortrait/assets/examples/driving/d11.mp4',
"cat" : "Top"},
{'id': 'd528d624-546a-4a98-8a8d-810c604178de',
'url': 'https://api.quantumgrove.tech:8001/Videos/d6.mp4',
'title': 'd528d624-546a-4a98-8a8d-810c604178de',
'info': 'd528d624-546a-4a98-8a8d-810c604178de',
'path': './subprocess/LivePortrait/assets/examples/driving/d6.mp4',
"cat" : "Top"},
{'id': 'fd8785af-deed-40cd-89b2-978c3658d1eb',
'url': 'https://api.quantumgrove.tech:8001/Videos/d0.mp4',
'title': 'fd8785af-deed-40cd-89b2-978c3658d1eb',
'info': 'fd8785af-deed-40cd-89b2-978c3658d1eb',
'path': './subprocess/LivePortrait/assets/examples/driving/d0.mp4',
"cat" : "Trending"},
{'id': '946bf5ad-6612-4aab-8551-22e77762df1c',
'url': 'https://api.quantumgrove.tech:8001/Videos/d12.mp4',
'title': '946bf5ad-6612-4aab-8551-22e77762df1c',
'info': '946bf5ad-6612-4aab-8551-22e77762df1c',
'path': './subprocess/LivePortrait/assets/examples/driving/d12.mp4',
"cat" : "Trending"},
{'id': '13286ebb-8bda-4560-8521-9455a5f7188a',
'url': 'https://api.quantumgrove.tech:8001/Videos/d18.mp4',
'title': '13286ebb-8bda-4560-8521-9455a5f7188a',
'info': '13286ebb-8bda-4560-8521-9455a5f7188a',
'path': './subprocess/LivePortrait/assets/examples/driving/d18.mp4',
"cat" : "Top"}]
@app.post("/LivePortrait/{video_id}")
async def lp(video_id: str,image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
print(image_format)
print(video_id)
# if image_format not in ALLOWED_IMAGE_FORMATS:
# response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
# response_status = "Failure"
# return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
height = image.shape[0]
width = image.shape[1]
if (height*width)>(1500*1500) :
aspect_ratio = width / height
new_height = int(((500*500) / aspect_ratio) ** 0.5)
new_width = int(aspect_ratio * new_height)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
app_face = FaceAnalysis(name="buffalo_l")
app_face.prepare(ctx_id=0,det_size=(640,640))
faces = app_face.get(image)
if len(faces)==0:
response_message = "Error no face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
elif len(faces)!=1:
response_message = "Error more than 1 face detected"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
unique_id = str(uuid.uuid4())
output_path = f"./outputs/LP/{unique_id}"
os.makedirs(output_path)
image_input_path = output_path+'/input.png'
cv2.imwrite(image_input_path,image)
# import glob
# directory_path = "./subprocess/LivePortrait/assets/examples/driving"
# mp4_files = glob.glob(os.path.join(directory_path, "*.mp4"))
# loading model
if not get_model_port_lp():
unload_model_all()
model_status = load_port_lp()
print(f"model status =========={model_status}")
con = 0
for i in video_url:
#id = "7aaa2b05-5207-4c84-b2f9-926b0219cae1"
video_path = f"./outputs/videos/{video_id}.mp4"
if os.path.exists(video_path):
increment_video_hits(video_id)
# if video_id == i['id']:
# video_path = i['path']
con = 1
break
if con == 0:
return generate_response(f"Incorrect Video ID" , "Failure" ,'' , '' , '' , [])
global model_port_lp
client = Client(f"http://127.0.0.1:{model_port_lp['port']}/")
result = client.predict(
param_0=handle_file(os.path.abspath(output_path+'/input.png')),
param_1={"video": handle_file(video_path)},
param_2=True,
param_3=True,
param_4=True,
param_5=False,
api_name="/gpu_wrapped_execute_video"
)
shutil.copy(result[0]['video'], output_path+'/output.mp4')
for i in result:
root = os.path.dirname(i['video'])
if os.path.isdir(root):
shutil.rmtree(root)
os.remove(output_path+'/input.png')
output_entry_id(unique_id) #test remove
response_message = "LP genrated Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/LP/{unique_id}/output.mp4"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
@app.post("/ImageEnhancer")
async def imageenhance(image: UploadFile = File(...)):
#################
#################
#get time
global time_last_call
time_last_call = datetime.now()
##################
try:
###################################################################################################################################################################################
############################################################################### Test Input Image ##################################################################################
###################################################################################################################################################################################
image_format = image.filename.split(".")[-1].lower()
if image_format not in ALLOWED_IMAGE_FORMATS:
response_message = f"Unsupported image format. Supported formats: {', '.join(ALLOWED_IMAGE_FORMATS)}"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
contents = await image.read()
nparr = np.frombuffer(contents, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
response_message = "Image is empty"
response_status = "Failure"
return generate_response(response_message , response_status ,'' , '' , '' , [])
###################################################################################################################################################################################
############################################################################### Image Processing ##################################################################################
###################################################################################################################################################################################
# loading model
upscale_image = True
unique_id = str(uuid.uuid4())
output_path = f"./outputs/Enhance/{unique_id}"
height = image.shape[0]
width = image.shape[1]
if (height*width)>(1500*1500) :
aspect_ratio = width / height
new_height = int(((1500*1500) / aspect_ratio) ** 0.5)
new_width = int(aspect_ratio * new_height)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
# loading model
if not get_model_state_cf():
unload_model_all()
model_status = load_model_cf()
print(f"model status =========={model_status}")
if upscale_image:
has_aligned = False
only_center_face = False
draw_box = False
detection_model = "retinaface_resnet50"
background_enhance = True
face_upsample = True
upscale = 2
codeformer_fidelity = 0.5
img = image#cv2.imread(str(img_path), cv2.IMREAD_COLOR)
upscale = int(upscale) # convert type to int
if upscale > 4: # avoid memory exceeded due to too large upscale
upscale = 4
if upscale > 2 and max(img.shape[:2])>1000: # avoid memory exceeded due to too large img resolution
upscale = 2
if max(img.shape[:2]) > 1500: # avoid memory exceeded due to too large img resolution
upscale = 1
background_enhance = False
face_upsample = False
face_helper = FaceRestoreHelper(
upscale,
face_size=512,
crop_ratio=(1, 1),
det_model=detection_model,
save_ext="png",
use_parse=True,
device=device,
)
bg_upsampler = model_upsampler if background_enhance else None
face_upsampler = model_upsampler if face_upsample else None
if has_aligned:
# the input faces are already cropped and aligned
img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR)
face_helper.is_gray = is_gray(img, threshold=5)
if face_helper.is_gray:
print('\tgrayscale input: True')
face_helper.cropped_faces = [img]
else:
face_helper.read_image(img)
# get face landmarks for each face
num_det_faces = face_helper.get_face_landmarks_5(
only_center_face=only_center_face, resize=640, eye_dist_threshold=5
)
print(f'\tdetect {num_det_faces} faces')
# align and warp each face
face_helper.align_warp_face()
# face restoration for each cropped face
for idx, cropped_face in enumerate(face_helper.cropped_faces):
# prepare data
cropped_face_t = img2tensor(
cropped_face / 255.0, bgr2rgb=True, float32=True
)
normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True)
cropped_face_t = cropped_face_t.unsqueeze(0).to(device)
try:
with torch.no_grad():
output = model_code_former(cropped_face_t, w=codeformer_fidelity, adain=True)[0]
restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1))
del output
torch.cuda.empty_cache()
except RuntimeError as error:
print(f"Failed inference for CodeFormer: {error}")
restored_face = tensor2img(
cropped_face_t, rgb2bgr=True, min_max=(-1, 1)
)
restored_face = restored_face.astype("uint8")
face_helper.add_restored_face(restored_face)
# paste_back
if not has_aligned:
# upsample the background
if bg_upsampler is not None:
# Now only support RealESRGAN for upsampling background
bg_img = bg_upsampler.enhance(img, outscale=upscale)[0]
else:
bg_img = None
face_helper.get_inverse_affine(None)
# paste each restored face to the input image
if face_upsample and face_upsampler is not None:
restored_img = face_helper.paste_faces_to_input_image(
upsample_img=bg_img,
draw_box=draw_box,
face_upsampler=face_upsampler,
)
else:
restored_img = face_helper.paste_faces_to_input_image(
upsample_img=bg_img, draw_box=draw_box
)
output_image_path = output_path+'/output.png'
os.makedirs(output_path)
cv2.imwrite(output_image_path,restored_img)
#cv2.imwrite(output_path+'/output.png',restored_img)
output_entry_id(unique_id) #test remove
response_message = "CodeFormer Image Sucessfully"
response_status = "Successful"
return generate_response(response_message , response_status ,unique_id , '' , '', [f"{web_url}:8001/Enhance/{unique_id}/output.png"])
except Exception as e:
shutil.rmtree(output_path)
return generate_response(f"Unknown Internal Error : {e}" , "Failure" ,'' , '' , '' , [])
# if __name__ == '__main__':
# import uvicorn
# try:
# http_file_server_process = start_http_file_server()
# uvicorn.run("main:app", host='0.0.0.0', port=8080, workers=1 , limit_max_requests=20)
# except KeyboardInterrupt:
# print("Received KeyboardInterrupt, shutting down gracefully...")
# http_file_server_process.kill()
# shutdown_event()
if __name__ == '__main__':
from cf_api import start_cf_api
import subprocess
api_process = subprocess.Popen(["python", "cf_api.py"])
get_server = subprocess.Popen(["/home/qtech/miniconda3/envs/api/bin/python","./get_req.py"])
report_server = subprocess.Popen(["/home/qtech/miniconda3/envs/api/bin/python","./report/report.py"])
print("on")
import uvicorn
import os
try:
if not os.path.exists('api.db'):
create_database()
if not os.path.exists('./subprocess/port.db'):
os.remove('./subprocess/port.db')
create_database_port()
http_file_server_process = start_http_file_server()
uvicorn.run("main:app", host='0.0.0.0', port=8080, workers=4 , limit_max_requests=200 )#, timeout_keep_alive=5)
print("Received KeyboardInterrupt, shutting down gracefully...")
http_file_server_process.kill()
shutdown_event()
except KeyboardInterrupt:
print("off")
print("Received KeyboardInterrupt, shutting down gracefully...")
http_file_server_process.kill()
shutdown_event()
api_process.kill()
get_server.kill()
report_server.kill()