| import gradio as gr |
| import json |
| from datetime import datetime |
| import random |
| import os |
| |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import EntryNotFoundError |
|
|
| random.seed(20240128) |
|
|
| |
| |
|
|
| hf_token = os.getenv("HF_TOKEN") |
| print("HF Token is none?", hf_token is None) |
|
|
| |
| REPO_ID = "peterw2333/test" |
| LOCAL_DIR = "user_responses" |
| SUBMISSIONS_FILE = "response_202507.jsonl" |
| LOCAL_FILE_PATH = os.path.join(LOCAL_DIR, SUBMISSIONS_FILE) |
|
|
| |
| api = HfApi(token=hf_token) |
|
|
| |
| os.makedirs(LOCAL_DIR, exist_ok=True) |
| try: |
| print("Downloading existing data from Hub...") |
| hf_hub_download( |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| filename=SUBMISSIONS_FILE, |
| local_dir=LOCAL_DIR |
| ) |
| except (EntryNotFoundError, Exception) as e: |
| print(f"File not found or error downloading (creates new if first time): {e}") |
|
|
| def prepare_test_cases(): |
| |
| json_path = "videos/rir.json" |
| |
| |
| if not os.path.exists(json_path): |
| print(f"Warning: {json_path} not found.") |
| return {} |
|
|
| with open(json_path, "r") as f: |
| video_dict = json.load(f) |
| |
| video_ids = list(video_dict.keys()) |
| for video_id in video_ids: |
| if random.random() > 0.5: |
| video_list = [video_dict[video_id]['before'], video_dict[video_id]['after']] |
| else: |
| video_list = [video_dict[video_id]['after'], video_dict[video_id]['before']] |
|
|
| random.shuffle(video_list) |
|
|
| video_dict[video_id]['Video 1'] = video_list[0] |
| video_dict[video_id]['Video 2'] = video_list[1] |
|
|
| return video_dict |
|
|
| video_dict = prepare_test_cases() |
| video_ids = list(video_dict.keys()) |
| random.shuffle(video_ids) |
|
|
| print("DEBUG: video_dict", video_dict) |
|
|
| questions = [ |
| "Between Video 1 (left) and Video 2 (right), which one's results are more accurate according to the text prompt?", |
| "Between Video 1 (left) and Video 2 (right), which one's results are more has a higher quality of human-human interaction?" |
| ] |
|
|
| def has_already_submitted(user_id): |
| if os.path.exists(LOCAL_FILE_PATH): |
| with open(LOCAL_FILE_PATH, "r") as f: |
| for line in f: |
| try: |
| submission = json.loads(line) |
| if submission.get("u_id") == user_id: |
| return True |
| except json.JSONDecodeError: |
| continue |
| return False |
|
|
| |
| def save_responses(unique_submission, *responses): |
| timestamp = datetime.now().isoformat() |
| info = responses[-1] |
| responses = responses[:-1] |
| unique_id = info["session_id"] |
| user_id = f"{unique_id}" |
|
|
| |
| if unique_submission and has_already_submitted(user_id): |
| return "You have already submitted responses. Thank you for participating!" |
|
|
| |
| result = { |
| "u_id": user_id, |
| "timestamp": timestamp, |
| "responses": [] |
| } |
|
|
| print("DEBUG responses:", responses) |
|
|
| for index in range(len(video_ids)): |
| start_idx = index * len(questions) |
| end_idx = start_idx + len(questions) |
|
|
| response = responses[start_idx:end_idx] |
| if any(r is None for r in response): |
| return "Please answer all questions before submitting." |
|
|
| video_id = video_ids[index] |
| pair_response = { |
| video_id: { |
| 'accuracy': "before" if "videos/before" in video_dict[video_id][response[0]][0] else "after", |
| 'quality': "before" if "videos/before" in video_dict[video_id][response[1]][0] else "after", |
| } |
| } |
| print("DEBUG pair_response:", video_id, pair_response) |
| result["responses"].append(pair_response) |
|
|
| result["responses"] = sorted(result["responses"], key=lambda x: x.keys()) |
| |
| |
| with open(LOCAL_FILE_PATH, "a") as f: |
| f.write(json.dumps(result) + "\n") |
| |
| |
| try: |
| print("Uploading to Hub...") |
| api.upload_file( |
| path_or_fileobj=LOCAL_FILE_PATH, |
| path_in_repo=SUBMISSIONS_FILE, |
| repo_id=REPO_ID, |
| repo_type="dataset", |
| commit_message=f"New submission from {user_id}" |
| ) |
| except Exception as e: |
| return f"Error uploading to Hugging Face: {str(e)}" |
|
|
| return "All responses saved! Thank you for participating!" |
|
|
| def create_interface(unique_submission=False): |
| with gr.Blocks() as demo: |
| gr.Markdown("# Human Preference Study: Two Person Interaction") |
| gr.Markdown(""" |
| ## In each of the following pairs, you will be presented with two videos.\n |
| ## For each pair, please first read the text prompt carefully, then examine the videos and answer the following questions. |
| """) |
|
|
| responses = [] |
| for index, video_id in enumerate(video_ids): |
| video_prompt = video_dict[video_id]['prompt'] |
| video1_list = video_dict[video_id]['Video 1'] |
| video2_list = video_dict[video_id]['Video 2'] |
|
|
| gr.Markdown(f"# Video Pair {index + 1}") |
| gr.Markdown(f"## Text Prompt: {video_prompt}") |
| |
| with gr.Row(): |
| for video in video1_list: |
| gr.Video(video, label="Video 1") |
| |
| for video in video2_list: |
| gr.Video(video, label="Video 2") |
| |
| with gr.Row(): |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[0], value=None)) |
| with gr.Row(): |
| responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[1], value=None)) |
|
|
| gr.Markdown("---") |
|
|
| info = gr.JSON(visible=False) |
| demo.load(predict, None, info) |
|
|
| submit_btn = gr.Button("Submit") |
| result_message = gr.Textbox(label="Message (please only submit once)", interactive=False) |
|
|
| submit_btn.click( |
| fn=lambda *args: save_responses(unique_submission, *args), |
| inputs=responses+[info], |
| outputs=result_message |
| ) |
|
|
| return demo |
|
|
| def predict(request: gr.Request): |
| headers = request.headers |
| host = request.client.host |
| user_agent = request.headers["user-agent"] |
| session_id = request.session_hash |
| return { |
| "ip": host, |
| "user_agent": user_agent, |
| "headers": headers, |
| "session_id": session_id |
| } |
|
|
| if __name__ == "__main__": |
| |
| demo = create_interface(unique_submission=True) |
| demo.launch(share=True) |