| | import os |
| | import subprocess |
| | import sys |
| |
|
| | |
| | def install(package): |
| | subprocess.check_call([sys.executable, "-m", "pip", "install", package]) |
| |
|
| | |
| | required_packages = [ |
| | "streamlit", |
| | "moviepy", |
| | "diffusers", |
| | "torch", |
| | "Pillow", |
| | "opencv-python-headless" |
| | ] |
| |
|
| | |
| | for package in required_packages: |
| | try: |
| | __import__(package) |
| | except ImportError: |
| | install(package) |
| |
|
| | |
| | import streamlit as st |
| | import cv2 |
| | import numpy as np |
| | from moviepy.editor import VideoFileClip, ImageSequenceClip |
| | from diffusers import StableDiffusionInstructPix2PixPipeline |
| | import torch |
| | from PIL import Image, ImageOps |
| | import math |
| |
|
| | |
| | pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None) |
| |
|
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| | if torch.cuda.is_available(): |
| | pipe = pipe.to("cuda") |
| |
|
| |
|
| | def pix2pix(input_image, instruction, steps, seed, text_cfg_scale, image_cfg_scale): |
| | width, height = input_image.size |
| | factor = 512 / max(width, height) |
| | factor = math.ceil(min(width, height) * factor / 64) * 64 / min(width, height) |
| | width = int((width * factor) // 64) * 64 |
| | height = int((height * factor) // 64) * 64 |
| | input_image = ImageOps.fit(input_image, (width, height), method=Image.Resampling.LANCZOS) |
| |
|
| | if instruction == "": |
| | return input_image |
| |
|
| | generator = torch.manual_seed(seed) |
| | edited_image = pipe( |
| | instruction, image=input_image, |
| | guidance_scale=text_cfg_scale, image_guidance_scale=image_cfg_scale, |
| | num_inference_steps=steps, generator=generator, |
| | ).images[0] |
| | return edited_image |
| |
|
| |
|
| | def get_frames(video_path): |
| | frames = [] |
| | clip = VideoFileClip(video_path) |
| | |
| | |
| | if clip.fps > 30: |
| | clip_resized = clip.resize(height=512) |
| | clip_resized.write_videofile("video_resized.mp4", fps=30) |
| | else: |
| | clip_resized = clip.resize(height=512) |
| | clip_resized.write_videofile("video_resized.mp4", fps=clip.fps) |
| | |
| | cap = cv2.VideoCapture("video_resized.mp4") |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| |
|
| | i = 0 |
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | frame_path = f'frame_{i}.jpg' |
| | cv2.imwrite(frame_path, frame) |
| | frames.append(frame_path) |
| | i += 1 |
| |
|
| | cap.release() |
| | cv2.destroyAllWindows() |
| | return frames, fps |
| |
|
| |
|
| | def create_video(frames, fps): |
| | clip = ImageSequenceClip(frames, fps=fps) |
| | clip.write_videofile("output_video.mp4", fps=fps) |
| | return "output_video.mp4" |
| |
|
| |
|
| | def main(): |
| | st.title("Pix2Pix Video") |
| |
|
| | video_file = st.file_uploader("Upload a video", type=["mp4", "mov", "avi", "mkv"]) |
| | prompt = st.text_input("Enter your prompt") |
| | seed = st.slider("Seed", 0, 2147483647, 123456) |
| | trim_value = st.slider("Cut video at (s)", 1, 5, 1) |
| |
|
| | if st.button("Generate Pix2Pix video") and video_file: |
| | with st.spinner("Processing video..."): |
| | with open("input_video.mp4", "wb") as f: |
| | f.write(video_file.getbuffer()) |
| |
|
| | frames, fps = get_frames("input_video.mp4") |
| | n_frame = int(trim_value * fps) |
| | n_frame = min(n_frame, len(frames)) |
| |
|
| | result_frames = [] |
| | for i in frames[:n_frame]: |
| | pil_image = Image.open(i).convert("RGB") |
| | pix2pix_image = pix2pix(pil_image, prompt, 50, seed, 7.5, 1.5) |
| | result_frame_path = f'result_{os.path.basename(i)}' |
| | pix2pix_image.save(result_frame_path) |
| | result_frames.append(result_frame_path) |
| |
|
| | final_video_path = create_video(result_frames, fps) |
| |
|
| | st.video(final_video_path) |
| | st.success("Video generated successfully!") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|
| |
|