vidslicegpu / app.py
openfree's picture
Update app.py
261a0f6 verified
import gradio as gr
import cv2
import numpy as np
import os
import zipfile
from PIL import Image
import tempfile
import shutil
from tqdm import tqdm
import concurrent.futures
import logging
import asyncio
from pathlib import Path
# λ‘œκΉ… μ„€μ •
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# GPU μ‚¬μš© κ°€λŠ₯ μ—¬λΆ€ 확인
USE_GPU = cv2.cuda.getCudaEnabledDeviceCount() > 0
logging.info(f"GPU μ‚¬μš© κ°€λŠ₯: {USE_GPU}")
def get_video_info(video_path):
try:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return {"duration": duration, "fps": fps, "total_frames": total_frames, "width": width, "height": height}
except Exception as e:
logging.error(f"λΉ„λ””μ˜€ 정보 νšλ“ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
return None
async def extract_frames_async(video_path, start_time, end_time, max_frames=10000, progress=gr.Progress()):
logging.info(f"ν”„λ ˆμž„ μΆ”μΆœ μ‹œμž‘: {video_path}, μ‹œμž‘ μ‹œκ°„: {start_time}, μ’…λ£Œ μ‹œκ°„: {end_time}")
try:
video_info = get_video_info(video_path)
if not video_info:
return []
fps = video_info["fps"]
total_frames = video_info["total_frames"]
start_frame = int(start_time * fps)
end_frame = int(end_time * fps) if end_time else total_frames
frame_interval = max(1, (end_frame - start_frame) // max_frames)
frames = []
cap = cv2.VideoCapture(video_path)
for i in progress.tqdm(range(start_frame, min(end_frame, total_frames), frame_interval)):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
if len(frames) >= max_frames:
break
await asyncio.sleep(0) # λ‹€λ₯Έ μž‘μ—…μ΄ 싀행될 수 μžˆλ„λ‘ 양보
cap.release()
logging.info(f"총 {len(frames)}개의 ν”„λ ˆμž„μ΄ μΆ”μΆœλ˜μ—ˆμŠ΅λ‹ˆλ‹€.")
return frames
except Exception as e:
logging.error(f"ν”„λ ˆμž„ μΆ”μΆœ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
return []
def create_grid(frames, grid_size=(10, 10)):
rows, cols = grid_size
n_frames = len(frames)
frames = frames[:rows*cols] # Limit to grid size
frame_height, frame_width = frames[0].shape[:2]
grid = np.zeros((frame_height * rows, frame_width * cols, 3), dtype=np.uint8)
for i, frame in enumerate(frames):
row = i // cols
col = i % cols
grid[row*frame_height:(row+1)*frame_height, col*frame_width:(col+1)*frame_width] = frame
return grid
async def save_frames_async(frames, output_dir, progress=gr.Progress()):
os.makedirs(output_dir, exist_ok=True)
for i, frame in progress.tqdm(enumerate(frames), desc="ν”„λ ˆμž„ μ €μž₯ 쀑", total=len(frames)):
cv2.imwrite(os.path.join(output_dir, f"frame_{i:04d}.jpg"), frame)
await asyncio.sleep(0) # λ‹€λ₯Έ μž‘μ—…μ΄ 싀행될 수 μžˆλ„λ‘ 양보
async def create_zip_async(output_dir, progress=gr.Progress()):
zip_path = os.path.join(output_dir, "frames.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
for file in progress.tqdm(files, desc="ZIP 파일 생성 쀑"):
zipf.write(os.path.join(output_dir, file), file)
await asyncio.sleep(0) # λ‹€λ₯Έ μž‘μ—…μ΄ 싀행될 수 μžˆλ„λ‘ 양보
return zip_path
async def process_video_async(video, start_time, end_time, progress=gr.Progress()):
logging.info("process_video ν•¨μˆ˜ μ‹œμž‘")
try:
if video is None:
logging.warning("λΉ„λ””μ˜€ 파일이 μ—†μŠ΅λ‹ˆλ‹€.")
return None, None, "λΉ„λ””μ˜€ νŒŒμΌμ„ μ—…λ‘œλ“œν•΄μ£Όμ„Έμš”."
video_path = Path(video)
logging.info(f"λΉ„λ””μ˜€ 파일: {video_path}, μ‹œμž‘ μ‹œκ°„: {start_time}, μ’…λ£Œ μ‹œκ°„: {end_time}")
file_size = os.path.getsize(video_path)
if file_size > 2 * 1024 * 1024 * 1024: # 2GB
logging.warning("파일 크기가 2GBλ₯Ό μ΄ˆκ³Όν•©λ‹ˆλ‹€.")
return None, None, "파일 크기가 2GBλ₯Ό μ΄ˆκ³Όν•©λ‹ˆλ‹€. 더 μž‘μ€ νŒŒμΌμ„ μ—…λ‘œλ“œν•΄μ£Όμ„Έμš”."
with tempfile.TemporaryDirectory() as temp_dir:
logging.info("ν”„λ ˆμž„ μΆ”μΆœ μ‹œμž‘")
frames = await extract_frames_async(str(video_path), start_time, end_time, max_frames=10000, progress=progress)
if not frames:
logging.error("ν”„λ ˆμž„ μΆ”μΆœ μ‹€νŒ¨")
return None, None, "ν”„λ ˆμž„ μΆ”μΆœμ— μ‹€νŒ¨ν–ˆμŠ΅λ‹ˆλ‹€. λΉ„λ””μ˜€ νŒŒμΌμ„ ν™•μΈν•΄μ£Όμ„Έμš”."
logging.info(f"μΆ”μΆœλœ ν”„λ ˆμž„ 수: {len(frames)}")
await save_frames_async(frames, temp_dir, progress=progress)
grid = create_grid(frames)
zip_path = await create_zip_async(temp_dir, progress=progress)
final_zip_path = "frames.zip"
shutil.copy(zip_path, final_zip_path)
logging.info("λΉ„λ””μ˜€ 처리 μ™„λ£Œ")
return Image.fromarray(cv2.cvtColor(grid, cv2.COLOR_BGR2RGB)), final_zip_path, f"ν”„λ ˆμž„ μΆ”μΆœμ΄ μ™„λ£Œλ˜μ—ˆμŠ΅λ‹ˆλ‹€. 총 {len(frames)}개의 ν”„λ ˆμž„μ΄ μΆ”μΆœλ˜μ—ˆμŠ΅λ‹ˆλ‹€. ({'GPU' if USE_GPU else 'CPU'} μ‚¬μš©)"
except Exception as e:
logging.error(f"λΉ„λ””μ˜€ 처리 쀑 였λ₯˜ λ°œμƒ: {str(e)}", exc_info=True)
return None, None, f"였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"
def on_video_change(video):
logging.info("λΉ„λ””μ˜€ 파일이 λ³€κ²½λ˜μ—ˆμŠ΅λ‹ˆλ‹€.")
if video is None:
return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1)
try:
video_info = get_video_info(video)
if video_info:
duration = video_info["duration"]
logging.info(f"λΉ„λ””μ˜€ 길이: {duration}초")
return gr.Slider(minimum=0, maximum=duration, value=0, step=0.1), gr.Slider(minimum=0, maximum=duration, value=duration, step=0.1)
except Exception as e:
logging.error(f"λΉ„λ””μ˜€ 정보 νšλ“ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1)
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as iface:
gr.Markdown("# λΉ„λ””μ˜€ to 이미지(ν”„λ ˆμž„)")
gr.Markdown("μ›ν•˜λŠ” κ΅¬κ°„μ˜ ν”„λ ˆμž„μ„ μΆ”μΆœν•˜μ—¬ κ·Έλ¦¬λ“œλ‘œ ν‘œμ‹œν•˜κ³ , κ°œλ³„ 이미지λ₯Ό ZIP 파일둜 λ‹€μš΄λ‘œλ“œν•  수 μžˆμŠ΅λ‹ˆλ‹€.")
gr.Markdown(f"μ΅œλŒ€ 2GB 크기의 λΉ„λ””μ˜€ νŒŒμΌμ„ μ—…λ‘œλ“œν•  수 있으며, μ΅œλŒ€ 10000개의 ν”„λ ˆμž„μ„ μΆ”μΆœν•©λ‹ˆλ‹€. ({'GPU' if USE_GPU else 'CPU'} μ‚¬μš©)")
with gr.Row():
video_input = gr.Video(label="Upload and Play Video")
with gr.Row():
start_slider = gr.Slider(minimum=0, maximum=100, value=0, step=0.1, label="CUT μ‹œμž‘ μ‹œκ°„ (초)")
end_slider = gr.Slider(minimum=0, maximum=100, value=100, step=0.1, label="CUT μ’…λ£Œ μ‹œκ°„ (초)")
with gr.Row():
process_button = gr.Button("ν”„λ ˆμž„ μΆ”μΆœ")
with gr.Row():
image_output = gr.Image(label="Frame Grid")
file_output = gr.File(label="Download All Frames (ZIP)")
message_output = gr.Textbox(label="λ©”μ‹œμ§€")
video_input.change(on_video_change, inputs=[video_input], outputs=[start_slider, end_slider])
process_button.click(process_video_async, inputs=[video_input, start_slider, end_slider], outputs=[image_output, file_output, message_output])
if __name__ == "__main__":
logging.info("μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ‹œμž‘")
iface.queue().launch(share=True)
logging.info("μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ’…λ£Œ")