import gradio as gr import cv2 import numpy as np import os import zipfile from PIL import Image import tempfile import shutil from tqdm import tqdm import concurrent.futures import logging import asyncio from pathlib import Path # 로깅 설정 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # GPU 사용 가능 여부 확인 USE_GPU = cv2.cuda.getCudaEnabledDeviceCount() > 0 logging.info(f"GPU 사용 가능: {USE_GPU}") def get_video_info(video_path): try: cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() return {"duration": duration, "fps": fps, "total_frames": total_frames, "width": width, "height": height} except Exception as e: logging.error(f"비디오 정보 획득 중 오류 발생: {str(e)}") return None async def extract_frames_async(video_path, start_time, end_time, max_frames=10000, progress=gr.Progress()): logging.info(f"프레임 추출 시작: {video_path}, 시작 시간: {start_time}, 종료 시간: {end_time}") try: video_info = get_video_info(video_path) if not video_info: return [] fps = video_info["fps"] total_frames = video_info["total_frames"] start_frame = int(start_time * fps) end_frame = int(end_time * fps) if end_time else total_frames frame_interval = max(1, (end_frame - start_frame) // max_frames) frames = [] cap = cv2.VideoCapture(video_path) for i in progress.tqdm(range(start_frame, min(end_frame, total_frames), frame_interval)): cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if not ret: break frames.append(frame) if len(frames) >= max_frames: break await asyncio.sleep(0) # 다른 작업이 실행될 수 있도록 양보 cap.release() logging.info(f"총 {len(frames)}개의 프레임이 추출되었습니다.") return frames except Exception as e: logging.error(f"프레임 추출 중 오류 발생: {str(e)}") return [] def create_grid(frames, grid_size=(10, 10)): rows, cols = grid_size n_frames = len(frames) frames = frames[:rows*cols] # Limit to grid size frame_height, frame_width = frames[0].shape[:2] grid = np.zeros((frame_height * rows, frame_width * cols, 3), dtype=np.uint8) for i, frame in enumerate(frames): row = i // cols col = i % cols grid[row*frame_height:(row+1)*frame_height, col*frame_width:(col+1)*frame_width] = frame return grid async def save_frames_async(frames, output_dir, progress=gr.Progress()): os.makedirs(output_dir, exist_ok=True) for i, frame in progress.tqdm(enumerate(frames), desc="프레임 저장 중", total=len(frames)): cv2.imwrite(os.path.join(output_dir, f"frame_{i:04d}.jpg"), frame) await asyncio.sleep(0) # 다른 작업이 실행될 수 있도록 양보 async def create_zip_async(output_dir, progress=gr.Progress()): zip_path = os.path.join(output_dir, "frames.zip") with zipfile.ZipFile(zip_path, 'w') as zipf: files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')] for file in progress.tqdm(files, desc="ZIP 파일 생성 중"): zipf.write(os.path.join(output_dir, file), file) await asyncio.sleep(0) # 다른 작업이 실행될 수 있도록 양보 return zip_path async def process_video_async(video, start_time, end_time, progress=gr.Progress()): logging.info("process_video 함수 시작") try: if video is None: logging.warning("비디오 파일이 없습니다.") return None, None, "비디오 파일을 업로드해주세요." video_path = Path(video) logging.info(f"비디오 파일: {video_path}, 시작 시간: {start_time}, 종료 시간: {end_time}") file_size = os.path.getsize(video_path) if file_size > 2 * 1024 * 1024 * 1024: # 2GB logging.warning("파일 크기가 2GB를 초과합니다.") return None, None, "파일 크기가 2GB를 초과합니다. 더 작은 파일을 업로드해주세요." with tempfile.TemporaryDirectory() as temp_dir: logging.info("프레임 추출 시작") frames = await extract_frames_async(str(video_path), start_time, end_time, max_frames=10000, progress=progress) if not frames: logging.error("프레임 추출 실패") return None, None, "프레임 추출에 실패했습니다. 비디오 파일을 확인해주세요." logging.info(f"추출된 프레임 수: {len(frames)}") await save_frames_async(frames, temp_dir, progress=progress) grid = create_grid(frames) zip_path = await create_zip_async(temp_dir, progress=progress) final_zip_path = "frames.zip" shutil.copy(zip_path, final_zip_path) logging.info("비디오 처리 완료") return Image.fromarray(cv2.cvtColor(grid, cv2.COLOR_BGR2RGB)), final_zip_path, f"프레임 추출이 완료되었습니다. 총 {len(frames)}개의 프레임이 추출되었습니다. ({'GPU' if USE_GPU else 'CPU'} 사용)" except Exception as e: logging.error(f"비디오 처리 중 오류 발생: {str(e)}", exc_info=True) return None, None, f"오류가 발생했습니다: {str(e)}" def on_video_change(video): logging.info("비디오 파일이 변경되었습니다.") if video is None: return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1) try: video_info = get_video_info(video) if video_info: duration = video_info["duration"] logging.info(f"비디오 길이: {duration}초") return gr.Slider(minimum=0, maximum=duration, value=0, step=0.1), gr.Slider(minimum=0, maximum=duration, value=duration, step=0.1) except Exception as e: logging.error(f"비디오 정보 획득 중 오류 발생: {str(e)}") return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1) with gr.Blocks(theme="Nymbo/Nymbo_Theme") as iface: gr.Markdown("# 비디오 to 이미지(프레임)") gr.Markdown("원하는 구간의 프레임을 추출하여 그리드로 표시하고, 개별 이미지를 ZIP 파일로 다운로드할 수 있습니다.") gr.Markdown(f"최대 2GB 크기의 비디오 파일을 업로드할 수 있으며, 최대 10000개의 프레임을 추출합니다. ({'GPU' if USE_GPU else 'CPU'} 사용)") with gr.Row(): video_input = gr.Video(label="Upload and Play Video") with gr.Row(): start_slider = gr.Slider(minimum=0, maximum=100, value=0, step=0.1, label="CUT 시작 시간 (초)") end_slider = gr.Slider(minimum=0, maximum=100, value=100, step=0.1, label="CUT 종료 시간 (초)") with gr.Row(): process_button = gr.Button("프레임 추출") with gr.Row(): image_output = gr.Image(label="Frame Grid") file_output = gr.File(label="Download All Frames (ZIP)") message_output = gr.Textbox(label="메시지") video_input.change(on_video_change, inputs=[video_input], outputs=[start_slider, end_slider]) process_button.click(process_video_async, inputs=[video_input, start_slider, end_slider], outputs=[image_output, file_output, message_output]) if __name__ == "__main__": logging.info("애플리케이션 시작") iface.queue().launch(share=True) logging.info("애플리케이션 종료")