Spaces:
Sleeping
Sleeping
File size: 16,641 Bytes
dcacd98 57e272c a1ce00e af3b648 96f48d3 57e272c dcacd98 96f48d3 dcacd98 96f48d3 dcacd98 96f48d3 57e272c dcacd98 57e272c dcacd98 b2a92f6 cec3c59 6f431ac dcacd98 57e272c cec3c59 96f48d3 cec3c59 af3b648 cec3c59 dcacd98 57e272c dcacd98 57e272c b86ade6 57e272c 76c8895 57e272c 37f4955 57e272c b19faf8 57e272c b19faf8 57e272c dcacd98 57e272c dcacd98 57e272c dcacd98 57e272c dcacd98 57e272c dcacd98 57e272c 76c8895 37f4955 dcacd98 96f48d3 57e272c 116fb8a ecaf4ed 116fb8a ecaf4ed 116fb8a 76c8895 5f26962 0b648dd 5f26962 0b648dd 76c8895 57e272c 76c8895 57e272c 76c8895 57e272c dcacd98 76c8895 a40e443 76c8895 9e2364d 76c8895 d23f3e4 cf3c695 d23f3e4 cf3c695 d23f3e4 cf3c695 76c8895 96f48d3 682a93b b9b0cec 96f48d3 1c1b680 b9b0cec b19faf8 cf3c695 682a93b cf3c695 682a93b cf3c695 682a93b 4d8fd2b 1c1b680 76c8895 a40e443 76c8895 dcacd98 57e272c dcacd98 76c8895 96f48d3 dcacd98 57e272c dcacd98 76c8895 96f48d3 dcacd98 57e272c dcacd98 57e272c 76c8895 96f48d3 dcacd98 57e272c 76c8895 96f48d3 76c8895 d03e673 76c8895 dcacd98 57e272c dcacd98 cebd7b7 96f48d3 e384ed7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | import gradio as gr
import os
import random
import json
from typing import List, Dict, Tuple
from datetime import datetime
import pandas as pd
from filelock import FileLock, Timeout
from pathlib import Path
import tempfile
import argparse
class VideoArenaManager:
def __init__(self, base_dir: str = "videos", data_dir: str = "/data"):
self.base_dir = base_dir
self.data_dir = data_dir
self.models = self._load_models()
self.videos = self._load_videos()
self.data_file = os.path.join(self.data_dir, "arena_data_new.json")
self.data_lock = FileLock(os.path.join(self.data_dir, "arena_data_new.lock"))
self.data = self._load_data()
self.usernames = set()
def _load_models(self) -> List[Dict[str, str]]:
"""Load available models from directories."""
return [
{"name": "Model 1", "directory": "model_1"},
{"name": "Model 2", "directory": "model_2"},
{"name": "Model 3", "directory": "model_3"},
{"name": "Model 4", "directory": "model_4"},
{"name": "Model 5", "directory": "model_5"},
{"name": "Model 6", "directory": "model_6"},
# Add more models as needed
]
def get_data_files(self) -> List[Dict[str, str]]:
"""Retrieve a list of data files in the data directory."""
data_path = Path(self.data_dir)
files = [
{"name": file.name, "size": file.stat().st_size, "path": str(file)}
for file in data_path.glob("*")
if file.is_file()
]
return files
def read_data_file(self, file_path: str) -> bytes:
"""Read the content of a data file."""
with open(file_path, "rb") as f:
return f.read()
def _load_videos(self) -> List[str]:
"""Load available video files."""
base_path = os.path.join(self.base_dir, self.models[0]["directory"])
return [f for f in os.listdir(base_path) if f.endswith((".mp4", ".avi", ".mov"))]
def _load_data(self) -> Dict:
"""Load existing Elo ratings and comparison results."""
try:
with self.data_lock.acquire(timeout=10):
if os.path.exists(self.data_file):
with open(self.data_file, "r") as f:
return json.load(f)
else:
# Initialize data if file does not exist
default_rating = 1000.0
elo_ratings = {model["name"]: default_rating for model in self.models}
results = {"comparisons": []}
data = {"elo_ratings": elo_ratings, "results": results}
with open(self.data_file, "w") as f:
json.dump(data, f, indent=2)
return data
except (Timeout, KeyError):
print(f"Could not acquire lock on {self.data_file}")
# Handle timeout (e.g., return default data or raise an error)
default_rating = 1000.0
elo_ratings = {model["name"]: default_rating for model in self.models}
results = {"comparisons": []}
return {"elo_ratings": elo_ratings, "results": results}
def save_comparison(self, video_name: str, winner: str, loser: str, username: str):
"""Save a comparison result and update Elo ratings."""
try:
with self.data_lock.acquire(timeout=10):
# Reload data to get the latest information
if os.path.exists(self.data_file):
with open(self.data_file, "r") as f:
self.data = json.load(f)
else:
# Initialize data if file does not exist
default_rating = 1000.0
elo_ratings = {model["name"]: default_rating for model in self.models}
results = {"comparisons": []}
self.data = {"elo_ratings": elo_ratings, "results": results}
# Update comparison results with username
comparison = {
"timestamp": datetime.now().isoformat(),
"video": video_name,
"winner": winner,
"loser": loser,
"username": username, # Add username to comparison data
}
self.data["results"]["comparisons"].append(comparison)
# Update Elo ratings
self.update_elo_ratings(winner, loser)
# Save updated data
with open(self.data_file, "w") as f:
json.dump(self.data, f, indent=2)
except Timeout:
print(f"Could not acquire lock on {self.data_file}")
# Handle timeout (e.g., retry or raise an error)
def update_elo_ratings(self, winner: str, loser: str, k: float = 32):
"""Update the Elo ratings of the models."""
# Assume the lock on data_file is already held
elo_ratings = self.data["elo_ratings"]
winner_rating = elo_ratings[winner]
loser_rating = elo_ratings[loser]
# Calculate expected scores
expected_winner = 1 / (1 + 10 ** ((loser_rating - winner_rating) / 400))
# Update ratings
elo_ratings[winner] += k * (1 - expected_winner)
elo_ratings[loser] += k * (0 - (1 - expected_winner))
# Update the data
self.data["elo_ratings"] = elo_ratings
def get_random_pair(self) -> Tuple[Dict[str, str], Dict[str, str]]:
"""Get a random pair of models to compare."""
return tuple(random.sample(self.models, 2))
def get_rankings(self) -> pd.DataFrame:
"""Generate current rankings based on Elo ratings."""
elo_ratings = self.data["elo_ratings"]
rankings = [{"Model": model["name"], "Elo Rating": elo_ratings[model["name"]]} for model in self.models]
df = pd.DataFrame(rankings)
return df.sort_values(by="Elo Rating", ascending=False).reset_index(drop=True)
def get_video_paths(self, video_name: str, model_pair: Tuple[Dict[str, str]]) -> List[str]:
"""Get video paths for the given pair of models."""
return [os.path.join(self.base_dir, model["directory"], video_name) for model in model_pair]
def generate_username(self) -> str:
"""Generate a unique random username."""
adjectives = ["Happy", "Quick", "Clever", "Brave", "Wise", "Kind", "Swift"]
animals = ["Panda", "Tiger", "Eagle", "Dolphin", "Fox", "Owl", "Bear"]
while True:
username = f"{random.choice(adjectives)}{random.choice(animals)}{random.randint(100, 999)}"
if username not in self.usernames:
self.usernames.add(username)
return username
def create_arena_interface(data_dir: str = None):
# Instantiate the manager with the provided data_dir if set
if data_dir is not None:
manager = VideoArenaManager(data_dir=data_dir)
else:
manager = VideoArenaManager()
with gr.Blocks(
title="Video Model Ranking Arena",
css="""
.invisible-textbox {
position: fixed;
opacity: 0.1;
pointer-events: auto;
width: 100px;
height: 20px;
padding: 5px;
margin: 5px;
background: #f0f0f0;
border: 1px solid #ddd;
}
""",
) as demo:
gr.Markdown(
"""### Welcome to the Dubbing Evaluation Arena!
In this study, the models modify only the lip region of the characters to better match the new dubbed audio, while the rest of the video remains unchanged.
Please compare the two videos and vote for the one you prefer based on the following criteria:
- **Lip Synchronization with Audio**: How well the character's lip movements align with the new speech.
- **Overall Coherence**: How seamlessly the modified lip movements integrate with the rest of the video.
- **Image Quality**: Clarity and visual appeal of the video.
Select either the left or right video as your preference. Thank you for your feedback!
(**Note**: If you are on a mobile phone, try turning the screen landscape for a better experience)"""
)
# State variables
current_video = gr.State()
current_models = gr.State()
# Add username state
username = gr.State(manager.generate_username())
with gr.Row():
# Display two videos side by side
video_left = gr.Video(label="Model A", height=400)
video_right = gr.Video(label="Model B", height=400)
with gr.Row():
# Buttons for voting
left_button = gr.Button("👈 Left video looks better", size="lg")
right_button = gr.Button("Right video looks better 👉", size="lg")
# Add a hidden passkey input
with gr.Row():
with gr.Column(scale=1):
passkey_input = gr.Textbox(
label="",
placeholder="",
show_label=False,
container=False,
scale=0.15,
min_width=100,
elem_classes="invisible-textbox",
)
# Rankings section (hidden by default)
rankings_section = gr.Row(visible=False)
with rankings_section:
rankings_table = gr.DataFrame(
manager.get_rankings(),
label="Current Model Rankings",
headers=["Model", "Elo Rating"],
interactive=False,
)
# Hidden download section with file management
with gr.Row(visible=False) as download_section:
with gr.Column():
gr.Markdown("## Download Results File")
files = manager.get_data_files()
file_names = [file["name"] for file in files]
file_select = gr.Dropdown(
choices=file_names, label="Select Results File to Download", interactive=True
)
download_button = gr.Button("Download Selected File", size="sm")
download_output = gr.File(label="Download", visible=False)
gr.Markdown("## Reset Data")
with gr.Row():
reset_button = gr.Button("Reset All Data", size="sm", variant="stop")
reset_confirm = gr.Button("Confirm Reset", size="sm", variant="stop", visible=False)
reset_warning = gr.Markdown(
visible=False,
value="⚠️ **WARNING**: This will permanently delete all rankings and comparison data. Click 'Confirm Reset' to proceed.",
)
def show_reset_warning():
return {
reset_warning: gr.update(visible=True),
reset_confirm: gr.update(visible=True),
reset_button: gr.update(visible=False),
}
def reset_data():
try:
with manager.data_lock.acquire(timeout=10):
# Initialize fresh data
default_rating = 1000.0
elo_ratings = {model["name"]: default_rating for model in manager.models}
results = {"comparisons": []}
fresh_data = {"elo_ratings": elo_ratings, "results": results}
# Write fresh data to file
with open(manager.data_file, "w") as f:
json.dump(fresh_data, f, indent=2)
# Reset manager's data
manager.data = fresh_data
return {
reset_warning: gr.update(visible=False),
reset_confirm: gr.update(visible=False),
reset_button: gr.update(visible=True),
rankings_table: manager.get_rankings(),
}
except Timeout:
return {
reset_warning: gr.update(value="⚠️ Error: Could not acquire lock to reset data", visible=True),
reset_confirm: gr.update(visible=False),
reset_button: gr.update(visible=True),
}
reset_button.click(fn=show_reset_warning, inputs=[], outputs=[reset_warning, reset_confirm, reset_button])
reset_confirm.click(
fn=reset_data, inputs=[], outputs=[reset_warning, reset_confirm, reset_button, rankings_table]
)
def check_passkey(passkey: str):
"""Check if the entered passkey is correct and show/hide sections."""
correct_passkey = os.environ.get("PASSKEY", "")
is_visible = passkey == correct_passkey
return [
gr.Row(visible=is_visible), # download_section
gr.File(visible=is_visible), # download_output
gr.Row(visible=is_visible), # rankings_section
]
passkey_input.change(
fn=check_passkey,
inputs=[passkey_input],
outputs=[download_section, download_output, rankings_section],
)
def load_new_comparison():
"""Load a new random comparison."""
video_name = random.choice(manager.videos)
model_pair = manager.get_random_pair()
video_paths = manager.get_video_paths(video_name, model_pair)
# Update video labels
video_left.label = model_pair[0]["name"]
video_right.label = model_pair[1]["name"]
current_video_value = video_name
current_models_value = [model["name"] for model in model_pair]
return (
video_paths[0], # video_left
video_paths[1], # video_right
current_video_value, # current_video
current_models_value, # current_models
manager.get_rankings(), # rankings_table
)
def handle_choice(choice_index: int, video_name: str, current_models_value: List[str], current_username: str):
"""Handle the user's choice and update rankings."""
if not current_models_value or len(current_models_value) < 2:
print("Error: current_models is invalid:", current_models_value)
return gr.update(visible=False), gr.update(visible=False), "", [], manager.get_rankings()
winner = current_models_value[choice_index]
loser = current_models_value[1 - choice_index]
# Save the comparison result with username
manager.save_comparison(video_name, winner, loser, current_username)
# Load new comparison
return load_new_comparison()
left_button.click(
fn=lambda vid, models, user: handle_choice(0, vid, models, user),
inputs=[current_video, current_models, username],
outputs=[video_left, video_right, current_video, current_models, rankings_table],
)
right_button.click(
fn=lambda vid, models, user: handle_choice(1, vid, models, user),
inputs=[current_video, current_models, username],
outputs=[video_left, video_right, current_video, current_models, rankings_table],
)
demo.load(
fn=load_new_comparison,
inputs=[],
outputs=[video_left, video_right, current_video, current_models, rankings_table],
)
def download_file(file_name: str):
"""Prepare the selected file for download."""
if not file_name:
return None
file_path = os.path.join(manager.data_dir, file_name)
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file_name)[1]) as tmp_file:
with open(file_path, "rb") as f:
tmp_file.write(f.read())
return tmp_file.name
except Exception as e:
print(f"Error preparing download: {e}")
return None
download_button.click(
fn=download_file,
inputs=[file_select],
outputs=download_output,
)
return demo
if __name__ == "__main__":
local = True
data_dir = "./" if local else "/data"
demo = create_arena_interface(data_dir=data_dir)
demo.launch(share=True)
|