Emmanuel Durand
Adding Latentsync
21aaf52
import gradio as gr
import os
import random
import sys
from typing import Sequence, Mapping, Any, Union
import torch
import spaces
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any:
"""Returns the value at the given index of a sequence or mapping.
If the object is a sequence (like list or string), returns the value at the given index.
If the object is a mapping (like a dictionary), returns the value at the index-th key.
Some return a dictionary, in these cases, we look for the "results" key
Args:
obj (Union[Sequence, Mapping]): The object to retrieve the value from.
index (int): The index of the value to retrieve.
Returns:
Any: The value at the given index.
Raises:
IndexError: If the index is out of bounds for the object and the object is not a mapping.
"""
try:
return obj[index]
except KeyError:
return obj["result"][index]
def find_path(name: str, path: str = None) -> str:
"""
Recursively looks at parent folders starting from the given path until it finds the given name.
Returns the path as a Path object if found, or None otherwise.
"""
# If no path is given, use the current working directory
if path is None:
path = os.getcwd()
# Check if the current directory contains the name
if name in os.listdir(path):
path_name = os.path.join(path, name)
print(f"{name} found: {path_name}")
return path_name
# Get the parent directory
parent_directory = os.path.dirname(path)
# If the parent directory is the same as the current directory, we've reached the root and stop the search
if parent_directory == path:
return None
# Recursively call the function with the parent directory
return find_path(name, parent_directory)
def add_comfyui_directory_to_sys_path() -> None:
"""
Add 'ComfyUI' to the sys.path
"""
comfyui_path = find_path("ComfyUI")
if comfyui_path is not None and os.path.isdir(comfyui_path):
sys.path.append(comfyui_path)
print(f"'{comfyui_path}' added to sys.path")
def add_extra_model_paths() -> None:
"""
Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path.
"""
try:
from main import load_extra_path_config
except ImportError:
print(
"Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead."
)
from utils.extra_config import load_extra_path_config
extra_model_paths = find_path("extra_model_paths.yaml")
if extra_model_paths is not None:
load_extra_path_config(extra_model_paths)
else:
print("Could not find the extra_model_paths config file.")
add_comfyui_directory_to_sys_path()
add_extra_model_paths()
def import_custom_nodes() -> None:
"""Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS
This function sets up a new asyncio event loop, initializes the PromptServer,
creates a PromptQueue, and initializes the custom nodes.
"""
import asyncio
import execution
from nodes import init_extra_nodes
import server
# Creating a new event loop and setting it as the default loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Creating an instance of PromptServer with the loop
server_instance = server.PromptServer(loop)
execution.PromptQueue(server_instance)
# Initializing custom nodes
init_extra_nodes()
from nodes import NODE_CLASS_MAPPINGS
#@spaces.GPU(duration=15)
def generate_image(video, audio):
import_custom_nodes()
with torch.inference_mode():
loadaudio = NODE_CLASS_MAPPINGS["LoadAudio"]()
loadaudio_37 = loadaudio.load(audio=audio)
vhs_loadvideo = NODE_CLASS_MAPPINGS["VHS_LoadVideo"]()
vhs_loadvideo_40 = vhs_loadvideo.load_video(
video=video,
force_rate=25,
custom_width=0,
custom_height=768,
frame_load_cap=0,
skip_first_frames=0,
select_every_nth=1,
format="AnimateDiff",
)
videolengthadjuster = NODE_CLASS_MAPPINGS["VideoLengthAdjuster"]()
latentsyncnode = NODE_CLASS_MAPPINGS["LatentSyncNode"]()
vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]()
for q in range(1):
videolengthadjuster_55 = videolengthadjuster.adjust(
mode="pingpong",
fps=25,
silent_padding_sec=0.5,
images=get_value_at_index(vhs_loadvideo_40, 0),
audio=get_value_at_index(loadaudio_37, 0),
)
latentsyncnode_54 = latentsyncnode.inference(
seed=random.randint(1, 2**64),
lips_expression=1.5,
inference_steps=20,
images=get_value_at_index(videolengthadjuster_55, 0),
audio=get_value_at_index(videolengthadjuster_55, 1),
)
vhs_videocombine_41 = vhs_videocombine.combine_video(
frame_rate=25,
loop_count=0,
filename_prefix="latentsync",
format="video/h264-mp4",
pix_fmt="yuv420p",
crf=19,
save_metadata=True,
trim_to_audio=False,
pingpong=False,
save_output=True,
images=get_value_at_index(latentsyncnode_54, 0),
audio=get_value_at_index(latentsyncnode_54, 1),
)
if __name__ == "__main__":
# Comment out the main() call in the exported Python code
# Start your Gradio app
with gr.Blocks() as app:
# Add a title
gr.Markdown("# SD prompt")
with gr.Row():
with gr.Column():
# Add an input
with gr.Row():
with gr.Group():
source_video = gr.Video(label="Source video")
with gr.Group():
source_audio = gr.Audio(label="Microphone", type="filepath")
# The generate button
generate_btn = gr.Button("Generate")
with gr.Column():
# The output image
output_video = gr.Video(label="Generated Image")
# When clicking the button, it will trigger the `generate_image` function, with the respective inputs
# and the output an image
generate_btn.click(
fn=generate_image,
inputs=[source_video, source_audio],
outputs=[output_video]
)
app.launch(share=True)