Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import os | |
| import zipfile | |
| from PIL import Image | |
| import tempfile | |
| import shutil | |
| from tqdm import tqdm | |
| import concurrent.futures | |
| import logging | |
| import asyncio | |
| from pathlib import Path | |
| # λ‘κΉ μ€μ | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # GPU μ¬μ© κ°λ₯ μ¬λΆ νμΈ | |
| USE_GPU = cv2.cuda.getCudaEnabledDeviceCount() > 0 | |
| logging.info(f"GPU μ¬μ© κ°λ₯: {USE_GPU}") | |
| def get_video_info(video_path): | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| return {"duration": duration, "fps": fps, "total_frames": total_frames, "width": width, "height": height} | |
| except Exception as e: | |
| logging.error(f"λΉλμ€ μ 보 νλ μ€ μ€λ₯ λ°μ: {str(e)}") | |
| return None | |
| async def extract_frames_async(video_path, start_time, end_time, max_frames=10000, progress=gr.Progress()): | |
| logging.info(f"νλ μ μΆμΆ μμ: {video_path}, μμ μκ°: {start_time}, μ’ λ£ μκ°: {end_time}") | |
| try: | |
| video_info = get_video_info(video_path) | |
| if not video_info: | |
| return [] | |
| fps = video_info["fps"] | |
| total_frames = video_info["total_frames"] | |
| start_frame = int(start_time * fps) | |
| end_frame = int(end_time * fps) if end_time else total_frames | |
| frame_interval = max(1, (end_frame - start_frame) // max_frames) | |
| frames = [] | |
| cap = cv2.VideoCapture(video_path) | |
| for i in progress.tqdm(range(start_frame, min(end_frame, total_frames), frame_interval)): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frames.append(frame) | |
| if len(frames) >= max_frames: | |
| break | |
| await asyncio.sleep(0) # λ€λ₯Έ μμ μ΄ μ€νλ μ μλλ‘ μ보 | |
| cap.release() | |
| logging.info(f"μ΄ {len(frames)}κ°μ νλ μμ΄ μΆμΆλμμ΅λλ€.") | |
| return frames | |
| except Exception as e: | |
| logging.error(f"νλ μ μΆμΆ μ€ μ€λ₯ λ°μ: {str(e)}") | |
| return [] | |
| def create_grid(frames, grid_size=(10, 10)): | |
| rows, cols = grid_size | |
| n_frames = len(frames) | |
| frames = frames[:rows*cols] # Limit to grid size | |
| frame_height, frame_width = frames[0].shape[:2] | |
| grid = np.zeros((frame_height * rows, frame_width * cols, 3), dtype=np.uint8) | |
| for i, frame in enumerate(frames): | |
| row = i // cols | |
| col = i % cols | |
| grid[row*frame_height:(row+1)*frame_height, col*frame_width:(col+1)*frame_width] = frame | |
| return grid | |
| async def save_frames_async(frames, output_dir, progress=gr.Progress()): | |
| os.makedirs(output_dir, exist_ok=True) | |
| for i, frame in progress.tqdm(enumerate(frames), desc="νλ μ μ μ₯ μ€", total=len(frames)): | |
| cv2.imwrite(os.path.join(output_dir, f"frame_{i:04d}.jpg"), frame) | |
| await asyncio.sleep(0) # λ€λ₯Έ μμ μ΄ μ€νλ μ μλλ‘ μ보 | |
| async def create_zip_async(output_dir, progress=gr.Progress()): | |
| zip_path = os.path.join(output_dir, "frames.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')] | |
| for file in progress.tqdm(files, desc="ZIP νμΌ μμ± μ€"): | |
| zipf.write(os.path.join(output_dir, file), file) | |
| await asyncio.sleep(0) # λ€λ₯Έ μμ μ΄ μ€νλ μ μλλ‘ μ보 | |
| return zip_path | |
| async def process_video_async(video, start_time, end_time, progress=gr.Progress()): | |
| logging.info("process_video ν¨μ μμ") | |
| try: | |
| if video is None: | |
| logging.warning("λΉλμ€ νμΌμ΄ μμ΅λλ€.") | |
| return None, None, "λΉλμ€ νμΌμ μ λ‘λν΄μ£ΌμΈμ." | |
| video_path = Path(video) | |
| logging.info(f"λΉλμ€ νμΌ: {video_path}, μμ μκ°: {start_time}, μ’ λ£ μκ°: {end_time}") | |
| file_size = os.path.getsize(video_path) | |
| if file_size > 2 * 1024 * 1024 * 1024: # 2GB | |
| logging.warning("νμΌ ν¬κΈ°κ° 2GBλ₯Ό μ΄κ³Όν©λλ€.") | |
| return None, None, "νμΌ ν¬κΈ°κ° 2GBλ₯Ό μ΄κ³Όν©λλ€. λ μμ νμΌμ μ λ‘λν΄μ£ΌμΈμ." | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| logging.info("νλ μ μΆμΆ μμ") | |
| frames = await extract_frames_async(str(video_path), start_time, end_time, max_frames=10000, progress=progress) | |
| if not frames: | |
| logging.error("νλ μ μΆμΆ μ€ν¨") | |
| return None, None, "νλ μ μΆμΆμ μ€ν¨νμ΅λλ€. λΉλμ€ νμΌμ νμΈν΄μ£ΌμΈμ." | |
| logging.info(f"μΆμΆλ νλ μ μ: {len(frames)}") | |
| await save_frames_async(frames, temp_dir, progress=progress) | |
| grid = create_grid(frames) | |
| zip_path = await create_zip_async(temp_dir, progress=progress) | |
| final_zip_path = "frames.zip" | |
| shutil.copy(zip_path, final_zip_path) | |
| logging.info("λΉλμ€ μ²λ¦¬ μλ£") | |
| return Image.fromarray(cv2.cvtColor(grid, cv2.COLOR_BGR2RGB)), final_zip_path, f"νλ μ μΆμΆμ΄ μλ£λμμ΅λλ€. μ΄ {len(frames)}κ°μ νλ μμ΄ μΆμΆλμμ΅λλ€. ({'GPU' if USE_GPU else 'CPU'} μ¬μ©)" | |
| except Exception as e: | |
| logging.error(f"λΉλμ€ μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", exc_info=True) | |
| return None, None, f"μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" | |
| def on_video_change(video): | |
| logging.info("λΉλμ€ νμΌμ΄ λ³κ²½λμμ΅λλ€.") | |
| if video is None: | |
| return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1) | |
| try: | |
| video_info = get_video_info(video) | |
| if video_info: | |
| duration = video_info["duration"] | |
| logging.info(f"λΉλμ€ κΈΈμ΄: {duration}μ΄") | |
| return gr.Slider(minimum=0, maximum=duration, value=0, step=0.1), gr.Slider(minimum=0, maximum=duration, value=duration, step=0.1) | |
| except Exception as e: | |
| logging.error(f"λΉλμ€ μ 보 νλ μ€ μ€λ₯ λ°μ: {str(e)}") | |
| return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1) | |
| with gr.Blocks(theme="Nymbo/Nymbo_Theme") as iface: | |
| gr.Markdown("# λΉλμ€ to μ΄λ―Έμ§(νλ μ)") | |
| gr.Markdown("μνλ ꡬκ°μ νλ μμ μΆμΆνμ¬ κ·Έλ¦¬λλ‘ νμνκ³ , κ°λ³ μ΄λ―Έμ§λ₯Ό ZIP νμΌλ‘ λ€μ΄λ‘λν μ μμ΅λλ€.") | |
| gr.Markdown(f"μ΅λ 2GB ν¬κΈ°μ λΉλμ€ νμΌμ μ λ‘λν μ μμΌλ©°, μ΅λ 10000κ°μ νλ μμ μΆμΆν©λλ€. ({'GPU' if USE_GPU else 'CPU'} μ¬μ©)") | |
| with gr.Row(): | |
| video_input = gr.Video(label="Upload and Play Video") | |
| with gr.Row(): | |
| start_slider = gr.Slider(minimum=0, maximum=100, value=0, step=0.1, label="CUT μμ μκ° (μ΄)") | |
| end_slider = gr.Slider(minimum=0, maximum=100, value=100, step=0.1, label="CUT μ’ λ£ μκ° (μ΄)") | |
| with gr.Row(): | |
| process_button = gr.Button("νλ μ μΆμΆ") | |
| with gr.Row(): | |
| image_output = gr.Image(label="Frame Grid") | |
| file_output = gr.File(label="Download All Frames (ZIP)") | |
| message_output = gr.Textbox(label="λ©μμ§") | |
| video_input.change(on_video_change, inputs=[video_input], outputs=[start_slider, end_slider]) | |
| process_button.click(process_video_async, inputs=[video_input, start_slider, end_slider], outputs=[image_output, file_output, message_output]) | |
| if __name__ == "__main__": | |
| logging.info("μ ν리μΌμ΄μ μμ") | |
| iface.queue().launch(share=True) | |
| logging.info("μ ν리μΌμ΄μ μ’ λ£") |