import os import subprocess import sys # Function to install packages def install(package): subprocess.check_call([sys.executable, "-m", "pip", "install", package]) # List of required packages required_packages = [ "streamlit", "moviepy", "diffusers", "torch", "Pillow", "opencv-python-headless" ] # Install required packages for package in required_packages: try: __import__(package) except ImportError: install(package) # Import the now-installed packages import streamlit as st import cv2 import numpy as np from moviepy.editor import VideoFileClip, ImageSequenceClip from diffusers import StableDiffusionInstructPix2PixPipeline import torch from PIL import Image, ImageOps import math # Initialize the model pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None) device = "cuda" if torch.cuda.is_available() else "cpu" if torch.cuda.is_available(): pipe = pipe.to("cuda") def pix2pix(input_image, instruction, steps, seed, text_cfg_scale, image_cfg_scale): width, height = input_image.size factor = 512 / max(width, height) factor = math.ceil(min(width, height) * factor / 64) * 64 / min(width, height) width = int((width * factor) // 64) * 64 height = int((height * factor) // 64) * 64 input_image = ImageOps.fit(input_image, (width, height), method=Image.Resampling.LANCZOS) if instruction == "": return input_image generator = torch.manual_seed(seed) edited_image = pipe( instruction, image=input_image, guidance_scale=text_cfg_scale, image_guidance_scale=image_cfg_scale, num_inference_steps=steps, generator=generator, ).images[0] return edited_image def get_frames(video_path): frames = [] clip = VideoFileClip(video_path) # Resize the video if clip.fps > 30: clip_resized = clip.resize(height=512) clip_resized.write_videofile("video_resized.mp4", fps=30) else: clip_resized = clip.resize(height=512) clip_resized.write_videofile("video_resized.mp4", fps=clip.fps) cap = cv2.VideoCapture("video_resized.mp4") fps = cap.get(cv2.CAP_PROP_FPS) i = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_path = f'frame_{i}.jpg' cv2.imwrite(frame_path, frame) frames.append(frame_path) i += 1 cap.release() cv2.destroyAllWindows() return frames, fps def create_video(frames, fps): clip = ImageSequenceClip(frames, fps=fps) clip.write_videofile("output_video.mp4", fps=fps) return "output_video.mp4" def main(): st.title("Pix2Pix Video") video_file = st.file_uploader("Upload a video", type=["mp4", "mov", "avi", "mkv"]) prompt = st.text_input("Enter your prompt") seed = st.slider("Seed", 0, 2147483647, 123456) trim_value = st.slider("Cut video at (s)", 1, 5, 1) if st.button("Generate Pix2Pix video") and video_file: with st.spinner("Processing video..."): with open("input_video.mp4", "wb") as f: f.write(video_file.getbuffer()) frames, fps = get_frames("input_video.mp4") n_frame = int(trim_value * fps) n_frame = min(n_frame, len(frames)) result_frames = [] for i in frames[:n_frame]: pil_image = Image.open(i).convert("RGB") pix2pix_image = pix2pix(pil_image, prompt, 50, seed, 7.5, 1.5) result_frame_path = f'result_{os.path.basename(i)}' pix2pix_image.save(result_frame_path) result_frames.append(result_frame_path) final_video_path = create_video(result_frames, fps) st.video(final_video_path) st.success("Video generated successfully!") if __name__ == "__main__": main()