Spaces:
Sleeping
Sleeping
File size: 8,077 Bytes
3cb1c14 e9a7e97 63ed029 261a0f6 3cb1c14 63ed029 22986f6 5c36249 261a0f6 63ed029 261a0f6 63ed029 261a0f6 63ed029 261a0f6 3cb1c14 261a0f6 ab3e998 63ed029 261a0f6 63ed029 ab3e998 261a0f6 ab3e998 261a0f6 63ed029 ab3e998 22986f6 e9a7e97 ab3e998 3cb1c14 261a0f6 ab3e998 261a0f6 3cb1c14 261a0f6 ab3e998 261a0f6 ab3e998 5c36249 261a0f6 5c36249 718ff3c 63ed029 5c36249 63ed029 261a0f6 5c36249 261a0f6 5c36249 ac5bab9 e9a7e97 5c36249 261a0f6 e9a7e97 5c36249 e9a7e97 5c36249 261a0f6 e9a7e97 261a0f6 e9a7e97 718ff3c 63ed029 5c36249 718ff3c 5c36249 718ff3c 3cb1c14 ab3e998 261a0f6 ab3e998 261a0f6 ab3e998 5c36249 32ce1f1 5c36249 32ce1f1 ac5bab9 ab34cf3 32ce1f1 ab3e998 32ce1f1 718ff3c 5c36249 261a0f6 6dde46d 9884dcf 5c36249 261a0f6 5c36249 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import gradio as gr
import cv2
import numpy as np
import os
import zipfile
from PIL import Image
import tempfile
import shutil
from tqdm import tqdm
import concurrent.futures
import logging
import asyncio
from pathlib import Path
# λ‘κΉ
μ€μ
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# GPU μ¬μ© κ°λ₯ μ¬λΆ νμΈ
USE_GPU = cv2.cuda.getCudaEnabledDeviceCount() > 0
logging.info(f"GPU μ¬μ© κ°λ₯: {USE_GPU}")
def get_video_info(video_path):
try:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return {"duration": duration, "fps": fps, "total_frames": total_frames, "width": width, "height": height}
except Exception as e:
logging.error(f"λΉλμ€ μ 보 νλ μ€ μ€λ₯ λ°μ: {str(e)}")
return None
async def extract_frames_async(video_path, start_time, end_time, max_frames=10000, progress=gr.Progress()):
logging.info(f"νλ μ μΆμΆ μμ: {video_path}, μμ μκ°: {start_time}, μ’
λ£ μκ°: {end_time}")
try:
video_info = get_video_info(video_path)
if not video_info:
return []
fps = video_info["fps"]
total_frames = video_info["total_frames"]
start_frame = int(start_time * fps)
end_frame = int(end_time * fps) if end_time else total_frames
frame_interval = max(1, (end_frame - start_frame) // max_frames)
frames = []
cap = cv2.VideoCapture(video_path)
for i in progress.tqdm(range(start_frame, min(end_frame, total_frames), frame_interval)):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
if len(frames) >= max_frames:
break
await asyncio.sleep(0) # λ€λ₯Έ μμ
μ΄ μ€νλ μ μλλ‘ μ보
cap.release()
logging.info(f"μ΄ {len(frames)}κ°μ νλ μμ΄ μΆμΆλμμ΅λλ€.")
return frames
except Exception as e:
logging.error(f"νλ μ μΆμΆ μ€ μ€λ₯ λ°μ: {str(e)}")
return []
def create_grid(frames, grid_size=(10, 10)):
rows, cols = grid_size
n_frames = len(frames)
frames = frames[:rows*cols] # Limit to grid size
frame_height, frame_width = frames[0].shape[:2]
grid = np.zeros((frame_height * rows, frame_width * cols, 3), dtype=np.uint8)
for i, frame in enumerate(frames):
row = i // cols
col = i % cols
grid[row*frame_height:(row+1)*frame_height, col*frame_width:(col+1)*frame_width] = frame
return grid
async def save_frames_async(frames, output_dir, progress=gr.Progress()):
os.makedirs(output_dir, exist_ok=True)
for i, frame in progress.tqdm(enumerate(frames), desc="νλ μ μ μ₯ μ€", total=len(frames)):
cv2.imwrite(os.path.join(output_dir, f"frame_{i:04d}.jpg"), frame)
await asyncio.sleep(0) # λ€λ₯Έ μμ
μ΄ μ€νλ μ μλλ‘ μ보
async def create_zip_async(output_dir, progress=gr.Progress()):
zip_path = os.path.join(output_dir, "frames.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
for file in progress.tqdm(files, desc="ZIP νμΌ μμ± μ€"):
zipf.write(os.path.join(output_dir, file), file)
await asyncio.sleep(0) # λ€λ₯Έ μμ
μ΄ μ€νλ μ μλλ‘ μ보
return zip_path
async def process_video_async(video, start_time, end_time, progress=gr.Progress()):
logging.info("process_video ν¨μ μμ")
try:
if video is None:
logging.warning("λΉλμ€ νμΌμ΄ μμ΅λλ€.")
return None, None, "λΉλμ€ νμΌμ μ
λ‘λν΄μ£ΌμΈμ."
video_path = Path(video)
logging.info(f"λΉλμ€ νμΌ: {video_path}, μμ μκ°: {start_time}, μ’
λ£ μκ°: {end_time}")
file_size = os.path.getsize(video_path)
if file_size > 2 * 1024 * 1024 * 1024: # 2GB
logging.warning("νμΌ ν¬κΈ°κ° 2GBλ₯Ό μ΄κ³Όν©λλ€.")
return None, None, "νμΌ ν¬κΈ°κ° 2GBλ₯Ό μ΄κ³Όν©λλ€. λ μμ νμΌμ μ
λ‘λν΄μ£ΌμΈμ."
with tempfile.TemporaryDirectory() as temp_dir:
logging.info("νλ μ μΆμΆ μμ")
frames = await extract_frames_async(str(video_path), start_time, end_time, max_frames=10000, progress=progress)
if not frames:
logging.error("νλ μ μΆμΆ μ€ν¨")
return None, None, "νλ μ μΆμΆμ μ€ν¨νμ΅λλ€. λΉλμ€ νμΌμ νμΈν΄μ£ΌμΈμ."
logging.info(f"μΆμΆλ νλ μ μ: {len(frames)}")
await save_frames_async(frames, temp_dir, progress=progress)
grid = create_grid(frames)
zip_path = await create_zip_async(temp_dir, progress=progress)
final_zip_path = "frames.zip"
shutil.copy(zip_path, final_zip_path)
logging.info("λΉλμ€ μ²λ¦¬ μλ£")
return Image.fromarray(cv2.cvtColor(grid, cv2.COLOR_BGR2RGB)), final_zip_path, f"νλ μ μΆμΆμ΄ μλ£λμμ΅λλ€. μ΄ {len(frames)}κ°μ νλ μμ΄ μΆμΆλμμ΅λλ€. ({'GPU' if USE_GPU else 'CPU'} μ¬μ©)"
except Exception as e:
logging.error(f"λΉλμ€ μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", exc_info=True)
return None, None, f"μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}"
def on_video_change(video):
logging.info("λΉλμ€ νμΌμ΄ λ³κ²½λμμ΅λλ€.")
if video is None:
return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1)
try:
video_info = get_video_info(video)
if video_info:
duration = video_info["duration"]
logging.info(f"λΉλμ€ κΈΈμ΄: {duration}μ΄")
return gr.Slider(minimum=0, maximum=duration, value=0, step=0.1), gr.Slider(minimum=0, maximum=duration, value=duration, step=0.1)
except Exception as e:
logging.error(f"λΉλμ€ μ 보 νλ μ€ μ€λ₯ λ°μ: {str(e)}")
return gr.Slider(minimum=0, maximum=100, value=0, step=0.1), gr.Slider(minimum=0, maximum=100, value=100, step=0.1)
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as iface:
gr.Markdown("# λΉλμ€ to μ΄λ―Έμ§(νλ μ)")
gr.Markdown("μνλ ꡬκ°μ νλ μμ μΆμΆνμ¬ κ·Έλ¦¬λλ‘ νμνκ³ , κ°λ³ μ΄λ―Έμ§λ₯Ό ZIP νμΌλ‘ λ€μ΄λ‘λν μ μμ΅λλ€.")
gr.Markdown(f"μ΅λ 2GB ν¬κΈ°μ λΉλμ€ νμΌμ μ
λ‘λν μ μμΌλ©°, μ΅λ 10000κ°μ νλ μμ μΆμΆν©λλ€. ({'GPU' if USE_GPU else 'CPU'} μ¬μ©)")
with gr.Row():
video_input = gr.Video(label="Upload and Play Video")
with gr.Row():
start_slider = gr.Slider(minimum=0, maximum=100, value=0, step=0.1, label="CUT μμ μκ° (μ΄)")
end_slider = gr.Slider(minimum=0, maximum=100, value=100, step=0.1, label="CUT μ’
λ£ μκ° (μ΄)")
with gr.Row():
process_button = gr.Button("νλ μ μΆμΆ")
with gr.Row():
image_output = gr.Image(label="Frame Grid")
file_output = gr.File(label="Download All Frames (ZIP)")
message_output = gr.Textbox(label="λ©μμ§")
video_input.change(on_video_change, inputs=[video_input], outputs=[start_slider, end_slider])
process_button.click(process_video_async, inputs=[video_input, start_slider, end_slider], outputs=[image_output, file_output, message_output])
if __name__ == "__main__":
logging.info("μ ν리μΌμ΄μ
μμ")
iface.queue().launch(share=True)
logging.info("μ ν리μΌμ΄μ
μ’
λ£") |