Spaces:
Sleeping
Sleeping
| import datetime | |
| from pathlib import Path | |
| import random | |
| import gradio as gr | |
| import os | |
| import json | |
| import firebase_admin | |
| from firebase_admin import db, credentials | |
| ################################################################## | |
| # Constants | |
| ################################################################## | |
| # Constants for video display | |
| VIDEO_FOLDERS = { | |
| "ours": "./userstudy_200_ours/", | |
| "omni": "./userstudy_200_omni/" | |
| } | |
| NUMBER_OF_EXPERIMENTS = 200 # Assuming sample0.mp4 to sample199.mp4 | |
| ################################################################################################################################################# | |
| # Authentication | |
| ################################################################################################################################################# | |
| # Read secret API key and other configurations from environment variables | |
| FIREBASE_API_KEY = os.environ.get('FirebaseSecret') | |
| FIREBASE_URL = os.environ.get('FirebaseURL') | |
| DATASET = os.environ.get('Dataset') | |
| # Initialize Firebase service | |
| try: | |
| firebase_creds = credentials.Certificate(json.loads(FIREBASE_API_KEY)) | |
| firebase_app = firebase_admin.initialize_app(firebase_creds, {'databaseURL': FIREBASE_URL}) | |
| firebase_data_ref = db.reference("data") | |
| print("Firebase initialized successfully.") | |
| except Exception as e: | |
| print(f"Failed to initialize Firebase: {e}") | |
| raise e | |
| ################################################################## | |
| # Data Layer | |
| ################################################################## | |
| class Experiment(dict): | |
| """ | |
| Represents an experiment consisting of two videos and the user's selection. | |
| """ | |
| def __init__(self, dataset, video_left, video_right, selected_video=None): | |
| super().__init__( | |
| dataset=dataset, | |
| video_left=video_left, | |
| video_right=video_right, | |
| selected_video=selected_video, | |
| ) | |
| def experiment_to_dict(experiment, skip=False): | |
| """ | |
| Converts the Experiment object to a dictionary for Firebase storage. | |
| Includes whether the selected video is from 'omni' or 'ours' folder. | |
| """ | |
| info = { | |
| "dataset": experiment["dataset"], | |
| "video_left": experiment["video_left"], | |
| "video_right": experiment["video_right"], | |
| } | |
| if skip: | |
| info["selected_video"] = "None" | |
| info["Control"] = "None" | |
| else: | |
| info["selected_video"] = experiment["selected_video"] | |
| # Determine "Control" based on which video was selected | |
| if experiment["selected_video"] == "left": | |
| selected_video_path = Path(experiment["video_left"]) | |
| elif experiment["selected_video"] == "right": | |
| selected_video_path = Path(experiment["video_right"]) | |
| else: | |
| selected_video_path = None | |
| if selected_video_path: | |
| # Check the parent folder name to determine the source | |
| if "userstudy_200_omni" in selected_video_path.parts: | |
| info["Control"] = "OmniControl" | |
| elif "userstudy_200_ours" in selected_video_path.parts: | |
| info["Control"] = "Ours" | |
| else: | |
| info["Control"] = "Unknown" | |
| else: | |
| info["Control"] = "Unknown" | |
| info["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| return info | |
| def generate_new_experiment() -> Experiment: | |
| """ | |
| Generates a new experiment by randomly selecting a sample number and fetching corresponding videos from both folders. | |
| The positions of the videos (left/right) are randomized. | |
| """ | |
| # Randomly select an integer between 0 and 199 inclusive | |
| random_int = random.randint(0, NUMBER_OF_EXPERIMENTS - 1) | |
| video_name = f"sample{random_int}.mp4" | |
| # Paths to the two videos | |
| video_ours_path = Path(VIDEO_FOLDERS["ours"]) / video_name | |
| video_omni_path = Path(VIDEO_FOLDERS["omni"]) / video_name | |
| # Ensure that both videos exist | |
| if not video_ours_path.exists() or not video_omni_path.exists(): | |
| raise FileNotFoundError(f"One or both video files not found: {video_ours_path}, {video_omni_path}") | |
| # Randomly decide the order (left/right) | |
| if random.choice([True, False]): | |
| video_left, video_right = video_ours_path, video_omni_path | |
| else: | |
| video_left, video_right = video_omni_path, video_ours_path | |
| print(f"Generated new experiment with left video: {video_left}, right video: {video_right}") | |
| return Experiment( | |
| DATASET, | |
| str(video_left), | |
| str(video_right), | |
| ) | |
| def load_initial(): | |
| """ | |
| Initializes the first experiment on app load. | |
| """ | |
| try: | |
| new_experiment = generate_new_experiment() | |
| print("Initial experiment loaded.") | |
| return [ | |
| new_experiment, | |
| gr.update(value=new_experiment["video_left"]), | |
| gr.update(value=new_experiment["video_right"]), | |
| "" # Empty message | |
| ] | |
| except Exception as e: | |
| print(f"Error loading initial experiment: {e}") | |
| return [None, gr.update(value=""), gr.update(value=""), "❌ Failed to load initial videos ❌"] | |
| def select_and_load(selected_label, experiment, message_component): | |
| """ | |
| Handles the selection of a video by the user. | |
| Saves the selection and loads the next experiment. | |
| """ | |
| try: | |
| # Update the experiment's selected video | |
| if selected_label.lower() == "left": | |
| experiment["selected_video"] = "left" | |
| elif selected_label.lower() == "right": | |
| experiment["selected_video"] = "right" | |
| else: | |
| experiment["selected_video"] = None # For safety | |
| # Save the current selection to Firebase | |
| dict_to_save = experiment_to_dict(experiment, skip=False) | |
| firebase_data_ref.push(dict_to_save) | |
| print("=====================") | |
| print(dict_to_save) | |
| print("=====================") | |
| # Set success message | |
| new_message = "✅ Your choice has been saved to Firebase ✅" | |
| except Exception as e: | |
| # Set error message | |
| new_message = f"❌ Failed to save your choice: {e} ❌" | |
| print(f"Error saving to Firebase: {e}") | |
| # Generate the next experiment | |
| try: | |
| new_experiment = generate_new_experiment() | |
| # Update the video components with new video paths | |
| return [ | |
| new_experiment, | |
| gr.update(value=new_experiment["video_left"]), | |
| gr.update(value=new_experiment["video_right"]), | |
| new_message | |
| ] | |
| except Exception as e: | |
| # If generating a new experiment fails, notify the user | |
| new_message = f"❌ Failed to load new videos: {e} ❌" | |
| print(f"Error generating new experiment: {e}") | |
| # Keep the current experiment and videos | |
| return [ | |
| experiment, | |
| gr.update(value=experiment["video_left"]), | |
| gr.update(value=experiment["video_right"]), | |
| new_message | |
| ] | |
| def skip_experiment(experiment, message_component): | |
| """ | |
| Handles the skipping of an experiment. | |
| Saves the skip and loads the next experiment. | |
| """ | |
| try: | |
| # Set selected_video to "None" to indicate skip | |
| experiment["selected_video"] = "None" | |
| # Save the skip to Firebase | |
| dict_to_save = experiment_to_dict(experiment, skip=True) | |
| firebase_data_ref.push(dict_to_save) | |
| print("=====================") | |
| print(dict_to_save) | |
| print("=====================") | |
| # Set success message | |
| new_message = "✅ Your skip has been recorded ✅" | |
| except Exception as e: | |
| # Set error message | |
| new_message = f"❌ Failed to record your skip: {e} ❌" | |
| print(f"Error saving skip to Firebase: {e}") | |
| # Generate the next experiment | |
| try: | |
| new_experiment = generate_new_experiment() | |
| # Update the video components with new video paths | |
| return [ | |
| new_experiment, | |
| gr.update(value=new_experiment["video_left"]), | |
| gr.update(value=new_experiment["video_right"]), | |
| new_message | |
| ] | |
| except Exception as e: | |
| # If generating a new experiment fails, notify the user | |
| new_message = f"❌ Failed to load new videos: {e} ❌" | |
| print(f"Error generating new experiment: {e}") | |
| # Keep the current experiment and videos | |
| return [ | |
| experiment, | |
| gr.update(value=experiment["video_left"]), | |
| gr.update(value=experiment["video_right"]), | |
| new_message | |
| ] | |
| ################################################################## | |
| # UI Layer | |
| ################################################################## | |
| css = """ | |
| #padded { | |
| padding-left: 2%; | |
| padding-right: 2%; | |
| } | |
| .select-button { | |
| margin-top: 10px; | |
| width: 100%; | |
| } | |
| .select-button:hover { | |
| background-color: #00c0ff; | |
| color: white; | |
| } | |
| .video-container { | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| } | |
| video { | |
| width: 100%; | |
| height: auto; | |
| } | |
| """ | |
| with gr.Blocks(title="Unsupervised Video Editing", css=css) as demo: | |
| # Initialize the state | |
| experiment = gr.State() | |
| with gr.Row(elem_id="padded"): | |
| with gr.Column(scale=3, elem_id="padded"): | |
| gr.Markdown("<div style='width: 100%'><h1 style='text-align: center;'>Choose Your Preferred Video</h1></div>") | |
| gr.Markdown("<div style='width: 100%'><h3 style='text-align: center;'>Select the video you prefer.<br/>⚠️Consider fidelity and quality⚠️</h3></div>") | |
| btn_skip = gr.Button("I have no preference") | |
| message_component = gr.Markdown("") # For displaying messages | |
| with gr.Row(): | |
| # Define both video components first | |
| with gr.Column(scale=1): | |
| video_left_component = gr.Video( | |
| label="left", | |
| elem_id="video-left", | |
| show_label=False, | |
| show_download_button=False, | |
| show_share_button=False, | |
| interactive=True | |
| ) | |
| with gr.Column(scale=1): | |
| video_right_component = gr.Video( | |
| label="right", | |
| elem_id="video-right", | |
| show_label=False, | |
| show_download_button=False, | |
| show_share_button=False, | |
| interactive=True | |
| ) | |
| with gr.Row(): | |
| # Add Select buttons below each video | |
| with gr.Column(scale=1): | |
| select_left = gr.Button("Select Left Video", elem_id="select-button-left", variant="primary") | |
| with gr.Column(scale=1): | |
| select_right = gr.Button("Select Right Video", elem_id="select-button-right", variant="primary") | |
| with gr.Row(): | |
| # Configure the skip button | |
| btn_skip.click( | |
| fn=skip_experiment, | |
| inputs=[experiment, message_component], | |
| outputs=[experiment, video_left_component, video_right_component, message_component], | |
| queue=True, | |
| show_progress=True | |
| ) | |
| # Configure the select buttons to handle selection and load next experiment | |
| select_left.click( | |
| fn=lambda exp, msg: select_and_load("left", exp, msg), | |
| inputs=[experiment, message_component], | |
| outputs=[experiment, video_left_component, video_right_component, message_component], | |
| queue=True, | |
| show_progress=True | |
| ) | |
| select_right.click( | |
| fn=lambda exp, msg: select_and_load("right", exp, msg), | |
| inputs=[experiment, message_component], | |
| outputs=[experiment, video_left_component, video_right_component, message_component], | |
| queue=True, | |
| show_progress=True | |
| ) | |
| # Load the first experiment on app startup | |
| demo.load( | |
| fn=load_initial, | |
| inputs=None, | |
| outputs=[experiment, video_left_component, video_right_component, message_component] | |
| ) | |
| # Launch the app with share=True to create a public link | |
| demo.launch(share=True) | |