dineth554's picture
Update app.py
ad66e36 verified
import os
import subprocess
import sys
# Function to install packages
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
# List of required packages
required_packages = [
"streamlit",
"moviepy",
"diffusers",
"torch",
"Pillow",
"opencv-python-headless"
]
# Install required packages
for package in required_packages:
try:
__import__(package)
except ImportError:
install(package)
# Import the now-installed packages
import streamlit as st
import cv2
import numpy as np
from moviepy.editor import VideoFileClip, ImageSequenceClip
from diffusers import StableDiffusionInstructPix2PixPipeline
import torch
from PIL import Image, ImageOps
import math
# Initialize the model
pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None)
device = "cuda" if torch.cuda.is_available() else "cpu"
if torch.cuda.is_available():
pipe = pipe.to("cuda")
def pix2pix(input_image, instruction, steps, seed, text_cfg_scale, image_cfg_scale):
width, height = input_image.size
factor = 512 / max(width, height)
factor = math.ceil(min(width, height) * factor / 64) * 64 / min(width, height)
width = int((width * factor) // 64) * 64
height = int((height * factor) // 64) * 64
input_image = ImageOps.fit(input_image, (width, height), method=Image.Resampling.LANCZOS)
if instruction == "":
return input_image
generator = torch.manual_seed(seed)
edited_image = pipe(
instruction, image=input_image,
guidance_scale=text_cfg_scale, image_guidance_scale=image_cfg_scale,
num_inference_steps=steps, generator=generator,
).images[0]
return edited_image
def get_frames(video_path):
frames = []
clip = VideoFileClip(video_path)
# Resize the video
if clip.fps > 30:
clip_resized = clip.resize(height=512)
clip_resized.write_videofile("video_resized.mp4", fps=30)
else:
clip_resized = clip.resize(height=512)
clip_resized.write_videofile("video_resized.mp4", fps=clip.fps)
cap = cv2.VideoCapture("video_resized.mp4")
fps = cap.get(cv2.CAP_PROP_FPS)
i = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_path = f'frame_{i}.jpg'
cv2.imwrite(frame_path, frame)
frames.append(frame_path)
i += 1
cap.release()
cv2.destroyAllWindows()
return frames, fps
def create_video(frames, fps):
clip = ImageSequenceClip(frames, fps=fps)
clip.write_videofile("output_video.mp4", fps=fps)
return "output_video.mp4"
def main():
st.title("Pix2Pix Video")
video_file = st.file_uploader("Upload a video", type=["mp4", "mov", "avi", "mkv"])
prompt = st.text_input("Enter your prompt")
seed = st.slider("Seed", 0, 2147483647, 123456)
trim_value = st.slider("Cut video at (s)", 1, 5, 1)
if st.button("Generate Pix2Pix video") and video_file:
with st.spinner("Processing video..."):
with open("input_video.mp4", "wb") as f:
f.write(video_file.getbuffer())
frames, fps = get_frames("input_video.mp4")
n_frame = int(trim_value * fps)
n_frame = min(n_frame, len(frames))
result_frames = []
for i in frames[:n_frame]:
pil_image = Image.open(i).convert("RGB")
pix2pix_image = pix2pix(pil_image, prompt, 50, seed, 7.5, 1.5)
result_frame_path = f'result_{os.path.basename(i)}'
pix2pix_image.save(result_frame_path)
result_frames.append(result_frame_path)
final_video_path = create_video(result_frames, fps)
st.video(final_video_path)
st.success("Video generated successfully!")
if __name__ == "__main__":
main()