import gradio as gr import os import random import json from typing import List, Dict, Tuple from datetime import datetime import pandas as pd from filelock import FileLock, Timeout from pathlib import Path import tempfile import argparse class VideoArenaManager: def __init__(self, base_dir: str = "videos", data_dir: str = "/data"): self.base_dir = base_dir self.data_dir = data_dir self.models = self._load_models() self.videos = self._load_videos() self.data_file = os.path.join(self.data_dir, "arena_data_new.json") self.data_lock = FileLock(os.path.join(self.data_dir, "arena_data_new.lock")) self.data = self._load_data() self.usernames = set() def _load_models(self) -> List[Dict[str, str]]: """Load available models from directories.""" return [ {"name": "Model 1", "directory": "model_1"}, {"name": "Model 2", "directory": "model_2"}, {"name": "Model 3", "directory": "model_3"}, {"name": "Model 4", "directory": "model_4"}, {"name": "Model 5", "directory": "model_5"}, {"name": "Model 6", "directory": "model_6"}, # Add more models as needed ] def get_data_files(self) -> List[Dict[str, str]]: """Retrieve a list of data files in the data directory.""" data_path = Path(self.data_dir) files = [ {"name": file.name, "size": file.stat().st_size, "path": str(file)} for file in data_path.glob("*") if file.is_file() ] return files def read_data_file(self, file_path: str) -> bytes: """Read the content of a data file.""" with open(file_path, "rb") as f: return f.read() def _load_videos(self) -> List[str]: """Load available video files.""" base_path = os.path.join(self.base_dir, self.models[0]["directory"]) return [f for f in os.listdir(base_path) if f.endswith((".mp4", ".avi", ".mov"))] def _load_data(self) -> Dict: """Load existing Elo ratings and comparison results.""" try: with self.data_lock.acquire(timeout=10): if os.path.exists(self.data_file): with open(self.data_file, "r") as f: return json.load(f) else: # Initialize data if file does not exist default_rating = 1000.0 elo_ratings = {model["name"]: default_rating for model in self.models} results = {"comparisons": []} data = {"elo_ratings": elo_ratings, "results": results} with open(self.data_file, "w") as f: json.dump(data, f, indent=2) return data except (Timeout, KeyError): print(f"Could not acquire lock on {self.data_file}") # Handle timeout (e.g., return default data or raise an error) default_rating = 1000.0 elo_ratings = {model["name"]: default_rating for model in self.models} results = {"comparisons": []} return {"elo_ratings": elo_ratings, "results": results} def save_comparison(self, video_name: str, winner: str, loser: str, username: str): """Save a comparison result and update Elo ratings.""" try: with self.data_lock.acquire(timeout=10): # Reload data to get the latest information if os.path.exists(self.data_file): with open(self.data_file, "r") as f: self.data = json.load(f) else: # Initialize data if file does not exist default_rating = 1000.0 elo_ratings = {model["name"]: default_rating for model in self.models} results = {"comparisons": []} self.data = {"elo_ratings": elo_ratings, "results": results} # Update comparison results with username comparison = { "timestamp": datetime.now().isoformat(), "video": video_name, "winner": winner, "loser": loser, "username": username, # Add username to comparison data } self.data["results"]["comparisons"].append(comparison) # Update Elo ratings self.update_elo_ratings(winner, loser) # Save updated data with open(self.data_file, "w") as f: json.dump(self.data, f, indent=2) except Timeout: print(f"Could not acquire lock on {self.data_file}") # Handle timeout (e.g., retry or raise an error) def update_elo_ratings(self, winner: str, loser: str, k: float = 32): """Update the Elo ratings of the models.""" # Assume the lock on data_file is already held elo_ratings = self.data["elo_ratings"] winner_rating = elo_ratings[winner] loser_rating = elo_ratings[loser] # Calculate expected scores expected_winner = 1 / (1 + 10 ** ((loser_rating - winner_rating) / 400)) # Update ratings elo_ratings[winner] += k * (1 - expected_winner) elo_ratings[loser] += k * (0 - (1 - expected_winner)) # Update the data self.data["elo_ratings"] = elo_ratings def get_random_pair(self) -> Tuple[Dict[str, str], Dict[str, str]]: """Get a random pair of models to compare.""" return tuple(random.sample(self.models, 2)) def get_rankings(self) -> pd.DataFrame: """Generate current rankings based on Elo ratings.""" elo_ratings = self.data["elo_ratings"] rankings = [{"Model": model["name"], "Elo Rating": elo_ratings[model["name"]]} for model in self.models] df = pd.DataFrame(rankings) return df.sort_values(by="Elo Rating", ascending=False).reset_index(drop=True) def get_video_paths(self, video_name: str, model_pair: Tuple[Dict[str, str]]) -> List[str]: """Get video paths for the given pair of models.""" return [os.path.join(self.base_dir, model["directory"], video_name) for model in model_pair] def generate_username(self) -> str: """Generate a unique random username.""" adjectives = ["Happy", "Quick", "Clever", "Brave", "Wise", "Kind", "Swift"] animals = ["Panda", "Tiger", "Eagle", "Dolphin", "Fox", "Owl", "Bear"] while True: username = f"{random.choice(adjectives)}{random.choice(animals)}{random.randint(100, 999)}" if username not in self.usernames: self.usernames.add(username) return username def create_arena_interface(data_dir: str = None): # Instantiate the manager with the provided data_dir if set if data_dir is not None: manager = VideoArenaManager(data_dir=data_dir) else: manager = VideoArenaManager() with gr.Blocks( title="Video Model Ranking Arena", css=""" .invisible-textbox { position: fixed; opacity: 0.1; pointer-events: auto; width: 100px; height: 20px; padding: 5px; margin: 5px; background: #f0f0f0; border: 1px solid #ddd; } """, ) as demo: gr.Markdown( """### Welcome to the Dubbing Evaluation Arena! In this study, the models modify only the lip region of the characters to better match the new dubbed audio, while the rest of the video remains unchanged. Please compare the two videos and vote for the one you prefer based on the following criteria: - **Lip Synchronization with Audio**: How well the character's lip movements align with the new speech. - **Overall Coherence**: How seamlessly the modified lip movements integrate with the rest of the video. - **Image Quality**: Clarity and visual appeal of the video. Select either the left or right video as your preference. Thank you for your feedback! (**Note**: If you are on a mobile phone, try turning the screen landscape for a better experience)""" ) # State variables current_video = gr.State() current_models = gr.State() # Add username state username = gr.State(manager.generate_username()) with gr.Row(): # Display two videos side by side video_left = gr.Video(label="Model A", height=400) video_right = gr.Video(label="Model B", height=400) with gr.Row(): # Buttons for voting left_button = gr.Button("👈 Left video looks better", size="lg") right_button = gr.Button("Right video looks better 👉", size="lg") # Add a hidden passkey input with gr.Row(): with gr.Column(scale=1): passkey_input = gr.Textbox( label="", placeholder="", show_label=False, container=False, scale=0.15, min_width=100, elem_classes="invisible-textbox", ) # Rankings section (hidden by default) rankings_section = gr.Row(visible=False) with rankings_section: rankings_table = gr.DataFrame( manager.get_rankings(), label="Current Model Rankings", headers=["Model", "Elo Rating"], interactive=False, ) # Hidden download section with file management with gr.Row(visible=False) as download_section: with gr.Column(): gr.Markdown("## Download Results File") files = manager.get_data_files() file_names = [file["name"] for file in files] file_select = gr.Dropdown( choices=file_names, label="Select Results File to Download", interactive=True ) download_button = gr.Button("Download Selected File", size="sm") download_output = gr.File(label="Download", visible=False) gr.Markdown("## Reset Data") with gr.Row(): reset_button = gr.Button("Reset All Data", size="sm", variant="stop") reset_confirm = gr.Button("Confirm Reset", size="sm", variant="stop", visible=False) reset_warning = gr.Markdown( visible=False, value="⚠️ **WARNING**: This will permanently delete all rankings and comparison data. Click 'Confirm Reset' to proceed.", ) def show_reset_warning(): return { reset_warning: gr.update(visible=True), reset_confirm: gr.update(visible=True), reset_button: gr.update(visible=False), } def reset_data(): try: with manager.data_lock.acquire(timeout=10): # Initialize fresh data default_rating = 1000.0 elo_ratings = {model["name"]: default_rating for model in manager.models} results = {"comparisons": []} fresh_data = {"elo_ratings": elo_ratings, "results": results} # Write fresh data to file with open(manager.data_file, "w") as f: json.dump(fresh_data, f, indent=2) # Reset manager's data manager.data = fresh_data return { reset_warning: gr.update(visible=False), reset_confirm: gr.update(visible=False), reset_button: gr.update(visible=True), rankings_table: manager.get_rankings(), } except Timeout: return { reset_warning: gr.update(value="⚠️ Error: Could not acquire lock to reset data", visible=True), reset_confirm: gr.update(visible=False), reset_button: gr.update(visible=True), } reset_button.click(fn=show_reset_warning, inputs=[], outputs=[reset_warning, reset_confirm, reset_button]) reset_confirm.click( fn=reset_data, inputs=[], outputs=[reset_warning, reset_confirm, reset_button, rankings_table] ) def check_passkey(passkey: str): """Check if the entered passkey is correct and show/hide sections.""" correct_passkey = os.environ.get("PASSKEY", "") is_visible = passkey == correct_passkey return [ gr.Row(visible=is_visible), # download_section gr.File(visible=is_visible), # download_output gr.Row(visible=is_visible), # rankings_section ] passkey_input.change( fn=check_passkey, inputs=[passkey_input], outputs=[download_section, download_output, rankings_section], ) def load_new_comparison(): """Load a new random comparison.""" video_name = random.choice(manager.videos) model_pair = manager.get_random_pair() video_paths = manager.get_video_paths(video_name, model_pair) # Update video labels video_left.label = model_pair[0]["name"] video_right.label = model_pair[1]["name"] current_video_value = video_name current_models_value = [model["name"] for model in model_pair] return ( video_paths[0], # video_left video_paths[1], # video_right current_video_value, # current_video current_models_value, # current_models manager.get_rankings(), # rankings_table ) def handle_choice(choice_index: int, video_name: str, current_models_value: List[str], current_username: str): """Handle the user's choice and update rankings.""" if not current_models_value or len(current_models_value) < 2: print("Error: current_models is invalid:", current_models_value) return gr.update(visible=False), gr.update(visible=False), "", [], manager.get_rankings() winner = current_models_value[choice_index] loser = current_models_value[1 - choice_index] # Save the comparison result with username manager.save_comparison(video_name, winner, loser, current_username) # Load new comparison return load_new_comparison() left_button.click( fn=lambda vid, models, user: handle_choice(0, vid, models, user), inputs=[current_video, current_models, username], outputs=[video_left, video_right, current_video, current_models, rankings_table], ) right_button.click( fn=lambda vid, models, user: handle_choice(1, vid, models, user), inputs=[current_video, current_models, username], outputs=[video_left, video_right, current_video, current_models, rankings_table], ) demo.load( fn=load_new_comparison, inputs=[], outputs=[video_left, video_right, current_video, current_models, rankings_table], ) def download_file(file_name: str): """Prepare the selected file for download.""" if not file_name: return None file_path = os.path.join(manager.data_dir, file_name) try: with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_name)[1]) as tmp_file: with open(file_path, "rb") as f: tmp_file.write(f.read()) return tmp_file.name except Exception as e: print(f"Error preparing download: {e}") return None download_button.click( fn=download_file, inputs=[file_select], outputs=download_output, ) return demo if __name__ == "__main__": local = True data_dir = "./" if local else "/data" demo = create_arena_interface(data_dir=data_dir) demo.launch(share=True)