haraberget's picture
Upload app.py
9b79705 verified
import gradio as gr
import cv2
import numpy as np
import os
import io
import base64
import tempfile # Make sure tempfile is imported
import math # Import math for ceiling function
# --- Helper Function ---
# (Keep the get_frame_from_video function exactly as it was)
def get_frame_from_video(video_path, frame_number):
"""Reads a specific frame from a video file."""
if not video_path or not os.path.exists(video_path):
return None, "Error: Video file not found or path is invalid."
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
try: cap.release()
except Exception: pass
return None, f"Error: Could not open video file: {video_path}"
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_number = int(frame_number)
if total_frames <= 0:
cap.release()
return None, f"Error: Video file seems to have no frames or is corrupted: {os.path.basename(video_path)}"
max_frame = total_frames - 1
if frame_number < 0 or frame_number > max_frame:
cap.release()
return None, f"Error: Frame number {frame_number} is out of bounds (0-{max_frame})."
# Ensure frame number doesn't exceed max_frame even after calculation
frame_number = min(frame_number, max_frame)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
cap.release()
if not ret:
# Attempt to read the last valid frame if the target failed (might happen near the end)
if frame_number > 0:
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, max_frame)
ret_last, frame_last = cap.read()
cap.release()
if ret_last:
return cv2.cvtColor(frame_last, cv2.COLOR_BGR2RGB), f"Could not read frame {frame_number}. Showing last valid frame {max_frame}/{max_frame}."
return None, f"Error: Could not read frame {frame_number}."
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
status_message = f"Showing frame {frame_number}/{max_frame}." # Basic status
return frame_rgb, status_message
# --- Gradio Callback Functions ---
# 1. Function triggered when a video file is uploaded
def load_video(video_path):
"""Loads the video, gets its properties, updates the slider, displays the first frame, and hides download links."""
if video_path is None:
# Reset UI elements, including FPS state
return {
video_path_state: None,
fps_state: 0.0, # Reset FPS
frame_slider: gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False),
frame_display: None,
info_text: "Please upload a video file (e.g., MP4).",
jpg_download_output: gr.File(value=None, visible=False),
png_download_output: gr.File(value=None, visible=False)
}
cap = cv2.VideoCapture(video_path)
fps = 0.0 # Initialize fps
if not cap.isOpened():
try: cap.release();
except: pass
return {
video_path_state: None,
fps_state: 0.0, # Reset FPS
frame_slider: gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False),
frame_display: None,
info_text: f"Error: Could not open video file: {os.path.basename(video_path)}",
jpg_download_output: gr.File(value=None, visible=False),
png_download_output: gr.File(value=None, visible=False)
}
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) # Get FPS here
# Validate total_frames and fps
if total_frames <= 0 or not fps or fps <= 0:
cap.release()
error_msg = f"Error: Could not read valid metadata (frames={total_frames}, fps={fps}) from {os.path.basename(video_path)}."
return {
video_path_state: None,
fps_state: 0.0, # Reset FPS
frame_slider: gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False),
frame_display: None,
info_text: error_msg,
jpg_download_output: gr.File(value=None, visible=False),
png_download_output: gr.File(value=None, visible=False)
}
duration = total_frames / fps
cap.release()
first_frame, status = get_frame_from_video(video_path, 0)
if first_frame is None:
# FPS is valid here, but first frame failed
return {
video_path_state: None,
fps_state: 0.0, # Reset FPS as we can't proceed
frame_slider: gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False),
frame_display: None,
info_text: f"Error reading first frame: {status}",
jpg_download_output: gr.File(value=None, visible=False),
png_download_output: gr.File(value=None, visible=False)
}
info = (
f"Video: {os.path.basename(video_path)}\n"
f"Total Frames: {total_frames}\n"
f"FPS: {fps:.2f}\n"
f"Duration: {duration:.2f} seconds\n"
f"Use slider or video controls." # Updated instruction
)
slider_max = max(0, total_frames - 1)
return {
video_path_state: video_path,
fps_state: fps, # Store the valid FPS
frame_slider: gr.Slider(minimum=0, maximum=slider_max, value=0, step=1, interactive=True),
frame_display: first_frame,
info_text: info + "\n" + status,
jpg_download_output: gr.File(value=None, visible=False),
png_download_output: gr.File(value=None, visible=False)
}
# 2. Function triggered when the slider value changes (on release)
def update_frame_on_slider(video_path, frame_number):
"""Gets and displays the frame, updates status, and hides download links."""
if video_path is None:
return None, "No video loaded.", gr.File(value=None, visible=False), gr.File(value=None, visible=False)
frame_number = int(frame_number)
frame, status = get_frame_from_video(video_path, frame_number)
return frame, status, gr.File(value=None, visible=False), gr.File(value=None, visible=False)
# 3. Function to prepare and save the frame temporarily
# (Keep the save_frame_temporarily function exactly as it was)
def save_frame_temporarily(video_path, frame_number, image_format):
"""Saves the frame temporarily and returns the file path."""
if video_path is None:
return None, "No video loaded to download from."
frame_number = int(frame_number)
frame, status = get_frame_from_video(video_path, frame_number)
if frame is None:
return None, status # Return error status from get_frame
# Ensure frame is uint8
if frame.dtype != np.uint8:
if frame.max() <= 1.0:
frame = (frame * 255).astype(np.uint8)
else:
frame = frame.astype(np.uint8) # Potential clipping
# Choose suffix and encode
suffix = f".{image_format}"
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # Convert for OpenCV saving
if image_format == "jpeg":
ret, buffer = cv2.imencode('.jpg', frame_bgr)
elif image_format == "png":
ret, buffer = cv2.imencode('.png', frame_bgr)
else:
return None, f"Unsupported format: {image_format}"
if not ret:
return None, f"Error encoding frame {frame_number} to {image_format}."
try:
temp_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
temp_file.write(buffer)
temp_file.close()
file_path = temp_file.name
status = f"Frame {frame_number} ready. Click link below to download {image_format.upper()}."
return file_path, status
except Exception as e:
if 'temp_file' in locals() and temp_file:
try: os.unlink(temp_file.name)
except OSError: pass
return None, f"Error creating temporary file: {e}"
# 4. NEW Function: Triggered when video is paused
def sync_slider_on_pause(video_path, fps, pause_time: gr.Number):
"""Updates the slider value based on the video pause time."""
print(f"Pause event: video_path={video_path}, fps={fps}, pause_time={pause_time}") # Debug print
if video_path is None or not fps or fps <= 0 or pause_time is None:
print("Skipping slider sync due to invalid inputs.")
# Return an update that does nothing if inputs are invalid
return gr.Slider() # No change update
# Calculate frame number - use math.ceil to lean towards the frame *after* the pause time
# Or use int() for the frame *at* or *before* the pause time. Let's use int() for simplicity.
calculated_frame = int(pause_time * fps)
# Get the current max of the slider to prevent going out of bounds
# This requires the slider component itself to be passed, which is complex.
# Alternative: Re-read total frames here (less efficient but simpler state management)
cap = cv2.VideoCapture(video_path)
if cap.isOpened():
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if total_frames > 0:
max_frame = total_frames - 1
# Clamp the calculated frame to the valid range
target_frame = max(0, min(calculated_frame, max_frame))
print(f"Calculated frame: {calculated_frame}, Target frame: {target_frame}")
return gr.Slider(value=target_frame)
else:
print("Skipping slider sync because total_frames <= 0.")
return gr.Slider() # No change
else:
print("Skipping slider sync because video couldn't be opened.")
return gr.Slider() # No change
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# Video Frame Explorer and Downloader")
gr.Markdown("Upload a video file (like MP4), use the slider OR pause the video to select a frame. Click a 'Prepare' button, then click the link that appears to download.")
video_path_state = gr.State(None)
fps_state = gr.State(0.0) # NEW state for FPS
with gr.Row():
with gr.Column(scale=1):
# Assign elem_id for potential JS interaction if needed later
video_input = gr.Video(label="Upload Video", sources=["upload"], format="mp4", elem_id="video_player")
info_text = gr.Textbox(label="Video Info / Status", interactive=False, lines=5)
jpg_download_output = gr.File(label="JPG Download Link", visible=False, interactive=False)
png_download_output = gr.File(label="PNG Download Link", visible=False, interactive=False)
with gr.Column(scale=2):
frame_display = gr.Image(label="Selected Frame", type="numpy", interactive=False)
# Assign elem_id for potential JS interaction
frame_slider = gr.Slider(
minimum=0, maximum=0, value=0, step=1,
label="Frame Number", interactive=False, elem_id="frame_slider"
)
with gr.Row():
prepare_jpg_button = gr.Button("Prepare JPG Download")
prepare_png_button = gr.Button("Prepare PNG Download")
# --- Component Interactions ---
# 1. Video Upload/Clear
video_input.upload(
fn=load_video,
inputs=video_input,
# Include fps_state in outputs
outputs=[video_path_state, fps_state, frame_slider, frame_display, info_text, jpg_download_output, png_download_output]
)
video_input.clear(
fn=load_video,
inputs=video_input,
# Include fps_state in outputs
outputs=[video_path_state, fps_state, frame_slider, frame_display, info_text, jpg_download_output, png_download_output]
)
# 2. Slider Release (hides download links)
frame_slider.release(
fn=update_frame_on_slider,
inputs=[video_path_state, frame_slider],
outputs=[frame_display, info_text, jpg_download_output, png_download_output]
)
# 3. Prepare JPG Download Button Click
prepare_jpg_button.click(
fn=save_frame_temporarily,
inputs=[video_path_state, frame_slider, gr.Textbox("jpeg", visible=False)],
outputs=[jpg_download_output, info_text]
).then(
lambda: {jpg_download_output: gr.File(visible=True), png_download_output: gr.File(visible=False)},
outputs=[jpg_download_output, png_download_output]
)
# 4. Prepare PNG Download Button Click
prepare_png_button.click(
fn=save_frame_temporarily,
inputs=[video_path_state, frame_slider, gr.Textbox("png", visible=False)],
outputs=[png_download_output, info_text]
).then(
lambda: {png_download_output: gr.File(visible=True), jpg_download_output: gr.File(visible=False)},
outputs=[png_download_output, jpg_download_output]
)
# 5. NEW: Video Pause Event
# The 'pause_time' argument to sync_slider_on_pause comes from the event data
video_input.pause(
fn=sync_slider_on_pause,
inputs=[video_path_state, fps_state], # Pass needed state
outputs=[frame_slider] # Update the slider value
)
# Optional: Trigger frame update immediately after slider syncs from pause
# video_input.pause(...).then(
# fn=update_frame_on_slider,
# inputs=[video_path_state, frame_slider], # Use the NEW slider value
# outputs=[frame_display, info_text, jpg_download_output, png_download_output]
# )
# --- Launch the App ---
if __name__ == "__main__":
demo.launch(debug=True) # Add debug=True for easier troubleshooting