import datetime from pathlib import Path import random import gradio as gr import os import json import firebase_admin from firebase_admin import db, credentials ################################################################## # Constants ################################################################## # Constants for video display VIDEO_FOLDERS = { "ours": "./userstudy_200_ours/", "omni": "./userstudy_200_omni/" } NUMBER_OF_EXPERIMENTS = 200 # Assuming sample0.mp4 to sample199.mp4 ################################################################################################################################################# # Authentication ################################################################################################################################################# # Read secret API key and other configurations from environment variables FIREBASE_API_KEY = os.environ.get('FirebaseSecret') FIREBASE_URL = os.environ.get('FirebaseURL') DATASET = os.environ.get('Dataset') # Initialize Firebase service try: firebase_creds = credentials.Certificate(json.loads(FIREBASE_API_KEY)) firebase_app = firebase_admin.initialize_app(firebase_creds, {'databaseURL': FIREBASE_URL}) firebase_data_ref = db.reference("data") print("Firebase initialized successfully.") except Exception as e: print(f"Failed to initialize Firebase: {e}") raise e ################################################################## # Data Layer ################################################################## class Experiment(dict): """ Represents an experiment consisting of two videos and the user's selection. """ def __init__(self, dataset, video_left, video_right, selected_video=None): super().__init__( dataset=dataset, video_left=video_left, video_right=video_right, selected_video=selected_video, ) def experiment_to_dict(experiment, skip=False): """ Converts the Experiment object to a dictionary for Firebase storage. Includes whether the selected video is from 'omni' or 'ours' folder. """ info = { "dataset": experiment["dataset"], "video_left": experiment["video_left"], "video_right": experiment["video_right"], } if skip: info["selected_video"] = "None" info["Control"] = "None" else: info["selected_video"] = experiment["selected_video"] # Determine "Control" based on which video was selected if experiment["selected_video"] == "left": selected_video_path = Path(experiment["video_left"]) elif experiment["selected_video"] == "right": selected_video_path = Path(experiment["video_right"]) else: selected_video_path = None if selected_video_path: # Check the parent folder name to determine the source if "userstudy_200_omni" in selected_video_path.parts: info["Control"] = "OmniControl" elif "userstudy_200_ours" in selected_video_path.parts: info["Control"] = "Ours" else: info["Control"] = "Unknown" else: info["Control"] = "Unknown" info["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") return info def generate_new_experiment() -> Experiment: """ Generates a new experiment by randomly selecting a sample number and fetching corresponding videos from both folders. The positions of the videos (left/right) are randomized. """ # Randomly select an integer between 0 and 199 inclusive random_int = random.randint(0, NUMBER_OF_EXPERIMENTS - 1) video_name = f"sample{random_int}.mp4" # Paths to the two videos video_ours_path = Path(VIDEO_FOLDERS["ours"]) / video_name video_omni_path = Path(VIDEO_FOLDERS["omni"]) / video_name # Ensure that both videos exist if not video_ours_path.exists() or not video_omni_path.exists(): raise FileNotFoundError(f"One or both video files not found: {video_ours_path}, {video_omni_path}") # Randomly decide the order (left/right) if random.choice([True, False]): video_left, video_right = video_ours_path, video_omni_path else: video_left, video_right = video_omni_path, video_ours_path print(f"Generated new experiment with left video: {video_left}, right video: {video_right}") return Experiment( DATASET, str(video_left), str(video_right), ) def load_initial(): """ Initializes the first experiment on app load. """ try: new_experiment = generate_new_experiment() print("Initial experiment loaded.") return [ new_experiment, gr.update(value=new_experiment["video_left"]), gr.update(value=new_experiment["video_right"]), "" # Empty message ] except Exception as e: print(f"Error loading initial experiment: {e}") return [None, gr.update(value=""), gr.update(value=""), "❌ Failed to load initial videos ❌"] def select_and_load(selected_label, experiment, message_component): """ Handles the selection of a video by the user. Saves the selection and loads the next experiment. """ try: # Update the experiment's selected video if selected_label.lower() == "left": experiment["selected_video"] = "left" elif selected_label.lower() == "right": experiment["selected_video"] = "right" else: experiment["selected_video"] = None # For safety # Save the current selection to Firebase dict_to_save = experiment_to_dict(experiment, skip=False) firebase_data_ref.push(dict_to_save) print("=====================") print(dict_to_save) print("=====================") # Set success message new_message = "✅ Your choice has been saved to Firebase ✅" except Exception as e: # Set error message new_message = f"❌ Failed to save your choice: {e} ❌" print(f"Error saving to Firebase: {e}") # Generate the next experiment try: new_experiment = generate_new_experiment() # Update the video components with new video paths return [ new_experiment, gr.update(value=new_experiment["video_left"]), gr.update(value=new_experiment["video_right"]), new_message ] except Exception as e: # If generating a new experiment fails, notify the user new_message = f"❌ Failed to load new videos: {e} ❌" print(f"Error generating new experiment: {e}") # Keep the current experiment and videos return [ experiment, gr.update(value=experiment["video_left"]), gr.update(value=experiment["video_right"]), new_message ] def skip_experiment(experiment, message_component): """ Handles the skipping of an experiment. Saves the skip and loads the next experiment. """ try: # Set selected_video to "None" to indicate skip experiment["selected_video"] = "None" # Save the skip to Firebase dict_to_save = experiment_to_dict(experiment, skip=True) firebase_data_ref.push(dict_to_save) print("=====================") print(dict_to_save) print("=====================") # Set success message new_message = "✅ Your skip has been recorded ✅" except Exception as e: # Set error message new_message = f"❌ Failed to record your skip: {e} ❌" print(f"Error saving skip to Firebase: {e}") # Generate the next experiment try: new_experiment = generate_new_experiment() # Update the video components with new video paths return [ new_experiment, gr.update(value=new_experiment["video_left"]), gr.update(value=new_experiment["video_right"]), new_message ] except Exception as e: # If generating a new experiment fails, notify the user new_message = f"❌ Failed to load new videos: {e} ❌" print(f"Error generating new experiment: {e}") # Keep the current experiment and videos return [ experiment, gr.update(value=experiment["video_left"]), gr.update(value=experiment["video_right"]), new_message ] ################################################################## # UI Layer ################################################################## css = """ #padded { padding-left: 2%; padding-right: 2%; } .select-button { margin-top: 10px; width: 100%; } .select-button:hover { background-color: #00c0ff; color: white; } .video-container { display: flex; justify-content: center; align-items: center; } video { width: 100%; height: auto; } """ with gr.Blocks(title="Unsupervised Video Editing", css=css) as demo: # Initialize the state experiment = gr.State() with gr.Row(elem_id="padded"): with gr.Column(scale=3, elem_id="padded"): gr.Markdown("