Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import random | |
| import firebase_admin | |
| from firebase_admin import credentials | |
| from firebase_admin import firestore | |
| import uuid | |
| import json | |
| import os | |
| from dotenv import load_dotenv | |
| import re | |
| import pandas as pd | |
| load_dotenv() | |
| video_pairs = pd.read_csv('file_pairs.csv')[['file_name', 'vista_id', 'gem_id', 'rgb_id']].values.tolist() | |
| random.seed(42) | |
| random.shuffle(video_pairs) | |
| my_credentials = { | |
| "type": "service_account", | |
| "project_id": "human-eval-c4f83", | |
| "private_key_id": os.environ.get("PRIVATE_KEY_ID"), | |
| "private_key": os.environ.get("PRIVATE_KEY").replace(r'\n', '\n'), | |
| "client_email": os.environ.get("CLIENT_EMAIL"), | |
| "client_id": os.environ.get("CLIENT_ID"), | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "auth_provider_x509_cert_url": os.environ.get("AUTH_PROVIDER_X509_CERT_URL"), | |
| "client_x509_cert_url": os.environ.get("CLIENT_X509_CERT_URL") | |
| } | |
| # Initialize Firebase | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate(my_credentials) | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| def get_embed_link(file_id): | |
| link = f"https://player.vimeo.com/video/{file_id}?title=0&byline=0&portrait=0&sidedock=0&badge=0&autopause=0&player_id=0&app_id=58479&quality=540p" | |
| return link | |
| def get_video_pair(state): | |
| pair_index = state['pair_index'] | |
| shuffled_pairs = state['shuffled_pairs'] | |
| user_votes = state['user_votes'] | |
| while pair_index < len(shuffled_pairs): | |
| video_name, vista_id, gem_id, rgb_id = shuffled_pairs[pair_index] | |
| pair_key = f"{vista_id}_{gem_id}" | |
| state['rgb_id'] = rgb_id | |
| if pair_key not in user_votes: | |
| # Randomize left-right positions | |
| if random.choice([True, False]): | |
| video1_id, video2_id = vista_id, gem_id | |
| else: | |
| video1_id, video2_id = gem_id, vista_id | |
| left_video_url = get_embed_link(video1_id) | |
| right_video_url = get_embed_link(video2_id) | |
| state['video_name'] = video_name | |
| state['left_video_url'] = left_video_url | |
| state['right_video_url'] = right_video_url | |
| state['video1_id'] = video1_id | |
| state['video2_id'] = video2_id | |
| state['vista_id'] = vista_id | |
| state['gem_id'] = gem_id | |
| return left_video_url, right_video_url | |
| else: | |
| pair_index += 1 | |
| state['pair_index'] = pair_index | |
| return None, None # No more pairs | |
| def generate_video_html(url): | |
| return f'<div style="padding:56.25% 0 0 0;position:relative;"><iframe src="{url}" frameborder="0" allow="autoplay; fullscreen; picture-in-picture; clipboard-write" style="position:absolute;top:0;left:0;width:100%;height:100%;" title="GenericTitle"></iframe></div><script src="https://player.vimeo.com/api/player.js"></script>' | |
| def save_vote(video_name, email, vista_id, gem_id, video1_id, video2_id, responses): | |
| vote_data = { | |
| 'video_name': video_name, | |
| 'email': email, | |
| 'vista_id': vista_id, | |
| 'gem_id': gem_id, | |
| 'video1_id': video1_id, | |
| 'video2_id': video2_id, | |
| 'q1': responses['q1'], | |
| 'q2': responses['q2'], | |
| } | |
| db.collection('votes').add(vote_data) | |
| # Update user's vote history | |
| user_ref = db.collection('users').document(email) | |
| user_doc = user_ref.get() | |
| if user_doc.exists: | |
| user_votes = set(user_doc.to_dict().get('votes', [])) | |
| else: | |
| user_votes = set() | |
| pair_key = f"{vista_id}_{gem_id}" | |
| user_votes.add(pair_key) | |
| user_ref.set({'votes': list(user_votes)}, merge=True) | |
| def update_interface(responses, state): | |
| email = state['email'] | |
| user_votes = state['user_votes'] | |
| pair_index = state['pair_index'] | |
| video1_id = state['video1_id'] | |
| video2_id = state['video2_id'] | |
| vista_id = state['vista_id'] | |
| gem_id = state['gem_id'] | |
| video_name = state['video_name'] | |
| # Save the user's responses | |
| save_vote(video_name, email, vista_id, gem_id, video1_id, video2_id, responses) | |
| # Update state | |
| pair_index += 1 | |
| state['pair_index'] = pair_index | |
| # Update user_votes in state | |
| pair_key = f"{vista_id}_{gem_id}" | |
| user_votes.add(pair_key) | |
| state['user_votes'] = user_votes | |
| video1_url, video2_url = get_video_pair(state) | |
| if video1_url is None: | |
| # No more pairs | |
| output_message = "Thank you for participating! No more videos." | |
| return ( | |
| gr.update(visible=False), # video_column | |
| gr.update(visible=False), # video_column | |
| gr.update(value=""), # video1 | |
| gr.update(value=""), # video2 | |
| gr.update(value=""), # rgb_video | |
| gr.update(visible=False), # question_column | |
| gr.update(visible=False), # button_row | |
| output_message, # output | |
| gr.update(value=None), # q1 | |
| gr.update(value=None), # q2 | |
| gr.update(interactive=False), # next_btn | |
| state | |
| ) | |
| else: | |
| video1_html = generate_video_html(video1_url) | |
| video2_html = generate_video_html(video2_url) | |
| rgb_html = generate_video_html(get_embed_link(state['rgb_id'])) | |
| # Update videos and reset questions | |
| return ( | |
| gr.update(visible=True), # video_column | |
| gr.update(visible=True), # video_column | |
| gr.update(value=video1_html), # video1 | |
| gr.update(value=video2_html), # video2 | |
| gr.update(value=rgb_html), # rgb_video | |
| gr.update(visible=True), # question_column | |
| gr.update(visible=True), # button_row | |
| "", # output | |
| gr.update(value=None), # q1 | |
| gr.update(value=None), # q2 | |
| gr.update(interactive=False), # next_btn | |
| state | |
| ) | |
| def check_all_answers(q1, q2): | |
| if q1 and q2: | |
| return gr.update(interactive=True) | |
| else: | |
| return gr.update(interactive=False) | |
| def is_valid_email(email): | |
| regex = r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' | |
| return re.match(regex, email) | |
| def authenticate_user(email, state): | |
| email = email.strip() | |
| if not email or not is_valid_email(email): | |
| return ( | |
| gr.update(), # email_input (remains visible) | |
| gr.update(), # submit_email (remains visible) | |
| "Please enter a valid email.", # email_output | |
| gr.update(visible=False), # video_column | |
| gr.update(visible=False), # video_column | |
| gr.update(value=""), # video1 | |
| gr.update(value=""), # video2 | |
| gr.update(value=""), # rgb_video | |
| gr.update(visible=False), # question_column | |
| gr.update(visible=False), # button_row | |
| "", # output | |
| state | |
| ) | |
| else: | |
| # Initialize user state | |
| user_ref = db.collection('users').document(email) | |
| user_doc = user_ref.get() | |
| if user_doc.exists: | |
| user_votes = set(user_doc.to_dict().get('votes', [])) | |
| else: | |
| user_votes = set() | |
| # Shuffle video pairs for this user | |
| shuffled_pairs = video_pairs.copy() | |
| state.update({ | |
| 'email': email, | |
| 'user_votes': user_votes, | |
| 'pair_index': 0, | |
| 'shuffled_pairs': shuffled_pairs | |
| }) | |
| # Load the first pair | |
| video1_url, video2_url = get_video_pair(state) | |
| if video1_url is None: | |
| output_message = "No new videos available for you." | |
| return ( | |
| gr.update(visible=False), # email_input | |
| gr.update(visible=False), # submit_email | |
| "", # email_output | |
| gr.update(visible=False), # video_column | |
| gr.update(visible=False), # video_column | |
| gr.update(value=""), # video1 | |
| gr.update(value=""), # video2 | |
| gr.update(value=""), # rgb_video | |
| gr.update(visible=False), # question_column | |
| gr.update(visible=False), # button_row | |
| output_message, # output | |
| state | |
| ) | |
| else: | |
| video1_html = generate_video_html(video1_url) | |
| video2_html = generate_video_html(video2_url) | |
| rgb_html = generate_video_html(get_embed_link(state['rgb_id'])) | |
| return ( | |
| gr.update(visible=False), # email_input | |
| gr.update(visible=False), # submit_email | |
| "", # email_output | |
| gr.update(visible=True), # video_column | |
| gr.update(visible=True), # video_column | |
| gr.update(value=video1_html), # video1 | |
| gr.update(value=video2_html), # video2 | |
| gr.update(value=rgb_html), # rgb_video | |
| gr.update(visible=True), # question_column | |
| gr.update(visible=True), # button_row | |
| "", # output | |
| state | |
| ) | |
| with gr.Blocks() as demo: | |
| state = gr.State(value={}) | |
| gr.Markdown( | |
| """ | |
| You'll be seeing three videos per question. | |
| At the top, you'll see a video with the RGB view. | |
| Below, you'll see two depth videos. | |
| You'll be asked to select which depth video seems to be better quality with respect to the RGB video. | |
| **There are 30 videos in total.** | |
| **Avoid "No preference" answers as much as possible.** | |
| """ | |
| ) | |
| # Email Input | |
| email_input = gr.Textbox(label="Enter your email to begin:", placeholder="your.email@example.com", type="text") | |
| submit_email = gr.Button("Submit") | |
| email_output = gr.Markdown() | |
| # Video components (initially hidden) | |
| with gr.Column(): | |
| with gr.Column() as video1_column: | |
| gr.Markdown("### RGB Video") | |
| rgb_video = gr.HTML() | |
| with gr.Row(): | |
| with gr.Column(visible=False) as video1_column: | |
| gr.Markdown("### Video 1") | |
| video1 = gr.HTML() | |
| with gr.Column(visible=False) as video2_column: | |
| gr.Markdown("### Video 2") | |
| video2 = gr.HTML() | |
| # Questions (initially hidden) | |
| with gr.Column(visible=False) as question_column: | |
| gr.Markdown("## Please answer the following questions:") | |
| q1 = gr.Radio( | |
| choices=["Video 1", "Video 2", "No preference"], | |
| label="1. Which depth video is more aligned with RGB?", | |
| type="value" | |
| ) | |
| q2 = gr.Radio( | |
| choices=["Video 1", "Video 2", "No preference"], | |
| label="2. Which depth video is more consistent and has less flickering?", | |
| type="value" | |
| ) | |
| # Buttons (initially hidden) | |
| with gr.Row(visible=False) as button_row: | |
| next_btn = gr.Button("Next", interactive=False) | |
| # skip_btn = gr.Button("Skip") # Commented out if not used | |
| output = gr.Markdown() | |
| def on_next(q1, q2, state): | |
| responses = { | |
| 'q1': q1, | |
| 'q2': q2, | |
| } | |
| return update_interface(responses, state) | |
| next_btn.click( | |
| fn=on_next, | |
| inputs=[q1, q2, state], | |
| outputs=[ | |
| video1_column, # Update video_column | |
| video2_column, # Update video_column | |
| video1, # Update video1 | |
| video2, # Update video2 | |
| rgb_video, | |
| question_column, # Update question_column | |
| button_row, # Update button_row | |
| output, # Update output message | |
| q1, q2, # Reset questions | |
| next_btn, # Update next_btn | |
| state | |
| ] | |
| ) | |
| def on_change(q1, q2): | |
| return check_all_answers(q1, q2) | |
| q1.change( | |
| fn=on_change, | |
| inputs=[q1, q2], | |
| outputs=next_btn | |
| ) | |
| q2.change( | |
| fn=on_change, | |
| inputs=[q1, q2], | |
| outputs=next_btn | |
| ) | |
| submit_email.click( | |
| fn=authenticate_user, | |
| inputs=[email_input, state], | |
| outputs=[ | |
| email_input, # Update email_input | |
| submit_email, # Update submit_email | |
| email_output, # Update email_output | |
| video1_column, # Update video_column | |
| video2_column, # Update video_column | |
| video1, # Update video1 | |
| video2, # Update video2 | |
| rgb_video, | |
| question_column, # Update question_column | |
| button_row, # Update button_row | |
| output, # Update output message | |
| state | |
| ] | |
| ) | |
| demo.launch() |