bhacquin's picture
Update demo.py
7fb1eb0 verified
import datetime
from pathlib import Path
import random
import gradio as gr
import os
import json
import firebase_admin
from firebase_admin import db, credentials
##################################################################
# Constants
##################################################################
# Constants for video display
VIDEO_FOLDERS = {
"ours": "./userstudy_200_ours/",
"omni": "./userstudy_200_omni/"
}
NUMBER_OF_EXPERIMENTS = 200 # Assuming sample0.mp4 to sample199.mp4
#################################################################################################################################################
# Authentication
#################################################################################################################################################
# Read secret API key and other configurations from environment variables
FIREBASE_API_KEY = os.environ.get('FirebaseSecret')
FIREBASE_URL = os.environ.get('FirebaseURL')
DATASET = os.environ.get('Dataset')
# Initialize Firebase service
try:
firebase_creds = credentials.Certificate(json.loads(FIREBASE_API_KEY))
firebase_app = firebase_admin.initialize_app(firebase_creds, {'databaseURL': FIREBASE_URL})
firebase_data_ref = db.reference("data")
print("Firebase initialized successfully.")
except Exception as e:
print(f"Failed to initialize Firebase: {e}")
raise e
##################################################################
# Data Layer
##################################################################
class Experiment(dict):
"""
Represents an experiment consisting of two videos and the user's selection.
"""
def __init__(self, dataset, video_left, video_right, selected_video=None):
super().__init__(
dataset=dataset,
video_left=video_left,
video_right=video_right,
selected_video=selected_video,
)
def experiment_to_dict(experiment, skip=False):
"""
Converts the Experiment object to a dictionary for Firebase storage.
Includes whether the selected video is from 'omni' or 'ours' folder.
"""
info = {
"dataset": experiment["dataset"],
"video_left": experiment["video_left"],
"video_right": experiment["video_right"],
}
if skip:
info["selected_video"] = "None"
info["Control"] = "None"
else:
info["selected_video"] = experiment["selected_video"]
# Determine "Control" based on which video was selected
if experiment["selected_video"] == "left":
selected_video_path = Path(experiment["video_left"])
elif experiment["selected_video"] == "right":
selected_video_path = Path(experiment["video_right"])
else:
selected_video_path = None
if selected_video_path:
# Check the parent folder name to determine the source
if "userstudy_200_omni" in selected_video_path.parts:
info["Control"] = "OmniControl"
elif "userstudy_200_ours" in selected_video_path.parts:
info["Control"] = "Ours"
else:
info["Control"] = "Unknown"
else:
info["Control"] = "Unknown"
info["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return info
def generate_new_experiment() -> Experiment:
"""
Generates a new experiment by randomly selecting a sample number and fetching corresponding videos from both folders.
The positions of the videos (left/right) are randomized.
"""
# Randomly select an integer between 0 and 199 inclusive
random_int = random.randint(0, NUMBER_OF_EXPERIMENTS - 1)
video_name = f"sample{random_int}.mp4"
# Paths to the two videos
video_ours_path = Path(VIDEO_FOLDERS["ours"]) / video_name
video_omni_path = Path(VIDEO_FOLDERS["omni"]) / video_name
# Ensure that both videos exist
if not video_ours_path.exists() or not video_omni_path.exists():
raise FileNotFoundError(f"One or both video files not found: {video_ours_path}, {video_omni_path}")
# Randomly decide the order (left/right)
if random.choice([True, False]):
video_left, video_right = video_ours_path, video_omni_path
else:
video_left, video_right = video_omni_path, video_ours_path
print(f"Generated new experiment with left video: {video_left}, right video: {video_right}")
return Experiment(
DATASET,
str(video_left),
str(video_right),
)
def load_initial():
"""
Initializes the first experiment on app load.
"""
try:
new_experiment = generate_new_experiment()
print("Initial experiment loaded.")
return [
new_experiment,
gr.update(value=new_experiment["video_left"]),
gr.update(value=new_experiment["video_right"]),
"" # Empty message
]
except Exception as e:
print(f"Error loading initial experiment: {e}")
return [None, gr.update(value=""), gr.update(value=""), "❌ Failed to load initial videos ❌"]
def select_and_load(selected_label, experiment, message_component):
"""
Handles the selection of a video by the user.
Saves the selection and loads the next experiment.
"""
try:
# Update the experiment's selected video
if selected_label.lower() == "left":
experiment["selected_video"] = "left"
elif selected_label.lower() == "right":
experiment["selected_video"] = "right"
else:
experiment["selected_video"] = None # For safety
# Save the current selection to Firebase
dict_to_save = experiment_to_dict(experiment, skip=False)
firebase_data_ref.push(dict_to_save)
print("=====================")
print(dict_to_save)
print("=====================")
# Set success message
new_message = "✅ Your choice has been saved to Firebase ✅"
except Exception as e:
# Set error message
new_message = f"❌ Failed to save your choice: {e} ❌"
print(f"Error saving to Firebase: {e}")
# Generate the next experiment
try:
new_experiment = generate_new_experiment()
# Update the video components with new video paths
return [
new_experiment,
gr.update(value=new_experiment["video_left"]),
gr.update(value=new_experiment["video_right"]),
new_message
]
except Exception as e:
# If generating a new experiment fails, notify the user
new_message = f"❌ Failed to load new videos: {e} ❌"
print(f"Error generating new experiment: {e}")
# Keep the current experiment and videos
return [
experiment,
gr.update(value=experiment["video_left"]),
gr.update(value=experiment["video_right"]),
new_message
]
def skip_experiment(experiment, message_component):
"""
Handles the skipping of an experiment.
Saves the skip and loads the next experiment.
"""
try:
# Set selected_video to "None" to indicate skip
experiment["selected_video"] = "None"
# Save the skip to Firebase
dict_to_save = experiment_to_dict(experiment, skip=True)
firebase_data_ref.push(dict_to_save)
print("=====================")
print(dict_to_save)
print("=====================")
# Set success message
new_message = "✅ Your skip has been recorded ✅"
except Exception as e:
# Set error message
new_message = f"❌ Failed to record your skip: {e} ❌"
print(f"Error saving skip to Firebase: {e}")
# Generate the next experiment
try:
new_experiment = generate_new_experiment()
# Update the video components with new video paths
return [
new_experiment,
gr.update(value=new_experiment["video_left"]),
gr.update(value=new_experiment["video_right"]),
new_message
]
except Exception as e:
# If generating a new experiment fails, notify the user
new_message = f"❌ Failed to load new videos: {e} ❌"
print(f"Error generating new experiment: {e}")
# Keep the current experiment and videos
return [
experiment,
gr.update(value=experiment["video_left"]),
gr.update(value=experiment["video_right"]),
new_message
]
##################################################################
# UI Layer
##################################################################
css = """
#padded {
padding-left: 2%;
padding-right: 2%;
}
.select-button {
margin-top: 10px;
width: 100%;
}
.select-button:hover {
background-color: #00c0ff;
color: white;
}
.video-container {
display: flex;
justify-content: center;
align-items: center;
}
video {
width: 100%;
height: auto;
}
"""
with gr.Blocks(title="Unsupervised Video Editing", css=css) as demo:
# Initialize the state
experiment = gr.State()
with gr.Row(elem_id="padded"):
with gr.Column(scale=3, elem_id="padded"):
gr.Markdown("<div style='width: 100%'><h1 style='text-align: center;'>Choose Your Preferred Video</h1></div>")
gr.Markdown("<div style='width: 100%'><h3 style='text-align: center;'>Select the video you prefer.<br/>⚠️Consider fidelity and quality⚠️</h3></div>")
btn_skip = gr.Button("I have no preference")
message_component = gr.Markdown("") # For displaying messages
with gr.Row():
# Define both video components first
with gr.Column(scale=1):
video_left_component = gr.Video(
label="left",
elem_id="video-left",
show_label=False,
show_download_button=False,
show_share_button=False,
interactive=True
)
with gr.Column(scale=1):
video_right_component = gr.Video(
label="right",
elem_id="video-right",
show_label=False,
show_download_button=False,
show_share_button=False,
interactive=True
)
with gr.Row():
# Add Select buttons below each video
with gr.Column(scale=1):
select_left = gr.Button("Select Left Video", elem_id="select-button-left", variant="primary")
with gr.Column(scale=1):
select_right = gr.Button("Select Right Video", elem_id="select-button-right", variant="primary")
with gr.Row():
# Configure the skip button
btn_skip.click(
fn=skip_experiment,
inputs=[experiment, message_component],
outputs=[experiment, video_left_component, video_right_component, message_component],
queue=True,
show_progress=True
)
# Configure the select buttons to handle selection and load next experiment
select_left.click(
fn=lambda exp, msg: select_and_load("left", exp, msg),
inputs=[experiment, message_component],
outputs=[experiment, video_left_component, video_right_component, message_component],
queue=True,
show_progress=True
)
select_right.click(
fn=lambda exp, msg: select_and_load("right", exp, msg),
inputs=[experiment, message_component],
outputs=[experiment, video_left_component, video_right_component, message_component],
queue=True,
show_progress=True
)
# Load the first experiment on app startup
demo.load(
fn=load_initial,
inputs=None,
outputs=[experiment, video_left_component, video_right_component, message_component]
)
# Launch the app with share=True to create a public link
demo.launch(share=True)