Spaces:
Runtime error
Runtime error
| from argparse import ArgumentParser | |
| import os, threading, time, yaml | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pprint import pprint | |
| import numpy as np | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| class VideoCfg: | |
| value: str | |
| label: str | |
| visible: bool | |
| interactive: bool = False | |
| autoplay: bool = True | |
| show_share_button: bool = False | |
| show_download_button: bool = False | |
| loop: bool = False | |
| show_label: bool = True | |
| width: int | str | None = None | |
| def time_now(): | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%s") | |
| class SurveyEngine: | |
| def __init__(self, args): | |
| self.args = args | |
| self.api = HfApi() | |
| # pull videos from hf dataset to hf space | |
| self.repo_id = "zirui-wang/survey-hf-dataset" | |
| self.local_dir = "./survey-hf-dataset" | |
| self.api.snapshot_download(self.repo_id, local_dir=self.local_dir, repo_type="dataset") | |
| # create a file to record the user study | |
| response_file_name = f"{time_now()}.csv" | |
| self.response_file_path_local = f"{self.local_dir}/response/{response_file_name}" | |
| self.response_file_path_remote = f"response/{response_file_name}" | |
| os.makedirs(f"{self.local_dir}/response", exist_ok=True) | |
| self._update_local_file("timestamp,path1,path2,selection\n", self.response_file_path_local) | |
| # create a file to record optional feedback | |
| feedback_file_name = f"{time_now()}.txt" | |
| self.feedback_file_path_local = f"{self.local_dir}/optional_feedback/{feedback_file_name}" | |
| self.feedback_file_path_remote = f"optional_feedback/{feedback_file_name}" | |
| os.makedirs(f"{self.local_dir}/optional_feedback", exist_ok=True) | |
| self._update_local_file("", self.feedback_file_path_local) | |
| self.video_paths, self.N_prompts, self.N_methods = self._get_all_video_paths() | |
| self.theme = gr.themes.Default( | |
| text_size=gr.themes.sizes.text_lg, | |
| spacing_size=gr.themes.sizes.spacing_sm, | |
| radius_size=gr.themes.sizes.radius_md, | |
| ) | |
| if not args.no_sync: | |
| self._start_periodic_sync() | |
| def _start_periodic_sync(self): | |
| def _upload_periodically(path_local, path_remote): | |
| while True: | |
| print(time_now()) | |
| print(f"Uploading {path_local}.") | |
| try: | |
| self._update_remote_file(path_local, path_remote) | |
| except Exception as e: | |
| print(e) | |
| time.sleep(args.period_upload) | |
| def _squash_commits_periodically(): | |
| while True: | |
| print(time_now()) | |
| print("Squashing commits.") | |
| try: | |
| self.api.super_squash_history("zirui-wang/survey-videos", repo_type="dataset") | |
| except Exception as e: | |
| print(e) | |
| time.sleep(args.period_squash) | |
| thread_upload_response = threading.Thread( | |
| target=_upload_periodically, | |
| args=(self.response_file_path_local, self.response_file_path_remote), | |
| ) | |
| thread_upload_response.daemon = True | |
| thread_upload_response.start() | |
| thread_upload_feedback = threading.Thread( | |
| target=_upload_periodically, | |
| args=(self.feedback_file_path_local, self.feedback_file_path_remote), | |
| ) | |
| thread_upload_feedback.daemon = True | |
| thread_upload_feedback.start() | |
| thread_squash_commits = threading.Thread(target=_squash_commits_periodically) | |
| thread_squash_commits.daemon = True | |
| thread_squash_commits.start() | |
| def _get_all_video_paths(self): | |
| video_dir = f"{self.local_dir}/video" | |
| method_list = sorted(os.listdir(video_dir)) | |
| # filter methods | |
| if len(self.args.method_filter) > 0: | |
| method_filter = np.array(self.args.method_filter) | |
| method_list = np.intersect1d(method_list, method_filter) | |
| video_name_list = sorted(os.listdir(f"{video_dir}/{method_list[0]}")) | |
| N_prompts = len(video_name_list) | |
| N_methods = len(method_list) | |
| video_paths = [] # (N_prompts, N_methods) | |
| for video_name in video_name_list: | |
| paths = [os.path.join(video_dir, method, video_name) for method in method_list] | |
| video_paths.append(paths) | |
| video_paths = np.array(video_paths) | |
| return video_paths, N_prompts, N_methods | |
| def _sample_video_pair(self, videos_left): | |
| """ | |
| videos_left: (N_prompts, N_methods) | |
| """ | |
| # random choose two prompts | |
| N_videos_left = len(videos_left) | |
| prompt_ids = np.random.choice(N_videos_left, 2, replace=False) | |
| video_pair1 = videos_left[prompt_ids[0]] | |
| video_pair1 = np.random.permutation(video_pair1) | |
| video_pair2 = videos_left[prompt_ids[1]] | |
| video_pair2 = np.random.permutation(video_pair2) | |
| # update videos_left | |
| # print(f"N_video_left before: {len(videos_left)}") | |
| videos_left = np.delete(videos_left, prompt_ids, axis=0) | |
| # print(f"N_video_left after: {len(videos_left)}") | |
| radio_select_to_path_lut = { | |
| "Video 1 👍": str(video_pair1[0]), | |
| "Video 2 👍": str(video_pair1[1]), | |
| "Video 3 👍": str(video_pair2[0]), | |
| "Video 4 👍": str(video_pair2[1]), | |
| "Similar 🤔": "Similar", | |
| } | |
| print("---------------") | |
| print(time_now()) | |
| pprint(radio_select_to_path_lut) | |
| return video_pair1, video_pair2, radio_select_to_path_lut, videos_left | |
| def _update_local_file(self, message, file_path_local): | |
| with open(file_path_local, "a") as f: | |
| f.write(message) | |
| def _update_remote_file(self, file_path_local, file_path_remote): | |
| self.api.upload_file( | |
| path_or_fileobj=file_path_local, | |
| path_in_repo=file_path_remote, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| ) | |
| def _setup_video(self, path_a, path_b, label_a, label_b, visible): | |
| cfg_a = VideoCfg(value=path_a, label=label_a, visible=visible) | |
| cfg_b = VideoCfg(value=path_b, label=label_b, visible=visible) | |
| video_a = gr.Video(**cfg_a.__dict__) | |
| video_b = gr.Video(**cfg_b.__dict__) | |
| return video_a, video_b | |
| def _load_callback(self, videos_left): | |
| ( | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| ) = self._sample_video_pair(videos_left) | |
| update_video1 = gr.update(value=video_pair1[0]) | |
| update_video2 = gr.update(value=video_pair1[1]) | |
| update_video3 = gr.update(value=video_pair2[0]) | |
| update_video4 = gr.update(value=video_pair2[1]) | |
| update_md_run_out_videos = gr.update(visible=False) | |
| return ( | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| update_video1, | |
| update_video2, | |
| update_video3, | |
| update_video4, | |
| update_md_run_out_videos, | |
| ) | |
| def _click_select_radio(self): | |
| update1 = gr.update(visible=True) | |
| return update1 | |
| def _click_button_confirm1(self, radio_select, radio_select_to_path_lut): | |
| # update response file with the acutal file path | |
| selected_path = radio_select_to_path_lut[radio_select] | |
| path1 = radio_select_to_path_lut["Video 1 👍"] | |
| path2 = radio_select_to_path_lut["Video 2 👍"] | |
| message = f"{time_now()},{path1},{path2},{selected_path}\n" | |
| self._update_local_file(message, self.response_file_path_local) | |
| confirm_message = f""" | |
| Your selection was: | |
| <span style="font-size:20px; color:orange "> {radio_select} </span> \n\n | |
| """ | |
| # display confirm message | |
| update_md_confirm1 = gr.update(visible=True, value=confirm_message) | |
| # hide the radio and button for video 1-2 | |
| update_button_confirm1 = gr.update(visible=False) | |
| update_ratio_select1 = gr.update(visible=False) | |
| # show video 3-4 and radio | |
| update_md_pair_34 = gr.update(visible=True) | |
| update_video_3 = gr.update(visible=True) | |
| update_video_4 = gr.update(visible=True) | |
| update_radio_select2 = gr.update(visible=True) | |
| return ( | |
| update_md_confirm1, | |
| update_button_confirm1, | |
| update_ratio_select1, | |
| update_md_pair_34, | |
| update_video_3, | |
| update_video_4, | |
| update_radio_select2, | |
| ) | |
| def _click_button_confirm2(self, radio_select, radio_select_to_path_lut): | |
| # update response file with the acutal file path | |
| selected_path = radio_select_to_path_lut[radio_select] | |
| path1 = radio_select_to_path_lut["Video 3 👍"] | |
| path2 = radio_select_to_path_lut["Video 4 👍"] | |
| message = f"{time_now()},{path1},{path2},{selected_path}\n" | |
| self._update_local_file(message, self.response_file_path_local) | |
| confirm_message = f""" | |
| Your selection was: | |
| <span style="font-size:20px; color:orange "> {radio_select} </span> \n\n | |
| ## Study Done! | |
| Click the button below 🎲 if you'd like to evaluate another set. \n\n | |
| You can exit this study by closing this page. | |
| For more details about this study, click | |
| [here](https://huggingface.co/spaces/zirui-wang/video_quality_study/blob/main/detail.md). | |
| """ | |
| # display confirm message | |
| update_md_confirm2 = gr.update(visible=True, value=confirm_message) | |
| # hide the radio and button for video 3-4 | |
| update_button_confirm2 = gr.update(visible=False) | |
| update_radio_select2 = gr.update(visible=False) | |
| # show button_new | |
| update_button_new = gr.update(visible=True) | |
| # show textbox and button for optional feedback | |
| update_textbox_optional = gr.update(visible=True) | |
| update_button_submit_optional = gr.update(visible=True) | |
| return ( | |
| update_md_confirm2, | |
| update_button_confirm2, | |
| update_radio_select2, | |
| update_button_new, | |
| update_textbox_optional, | |
| update_button_submit_optional, | |
| ) | |
| def _click_button_new(self, videos_left): | |
| if len(videos_left) == 0: | |
| return [None] * 3 + [[]] + [gr.update(visible=False)] * 15 + [gr.update(visible=True)] | |
| print("---------------") | |
| print(f"N_video_left before: {len(videos_left)}") | |
| ( | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| ) = self._sample_video_pair(videos_left) | |
| update_video1 = gr.update(value=video_pair1[0]) | |
| update_video2 = gr.update(value=video_pair1[1]) | |
| update_radio_select1 = gr.update(visible=True, value=None) | |
| update_button_confirm1 = gr.update(visible=False) | |
| update_md_confirm1 = gr.update(visible=False) | |
| update_md_pair_34 = gr.update(visible=False) | |
| update_video3 = gr.update(value=video_pair2[0], visible=False) | |
| update_video4 = gr.update(value=video_pair2[1], visible=False) | |
| update_radio_select2 = gr.update(visible=False, value=None) | |
| update_button_confirm2 = gr.update(visible=False) | |
| update_md_confirm2 = gr.update(visible=False) | |
| update_button_new = gr.update(visible=False) | |
| update_textbox_optional = gr.update(visible=False, value=None) | |
| update_button_submit_optional = gr.update(visible=False) | |
| update_md_optional_feedback = gr.update(visible=False) | |
| update_md_run_out_videos = gr.update(visible=False) | |
| return ( | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| update_video1, | |
| update_video2, | |
| update_radio_select1, | |
| update_button_confirm1, | |
| update_md_confirm1, | |
| update_md_pair_34, | |
| update_video3, | |
| update_video4, | |
| update_radio_select2, | |
| update_button_confirm2, | |
| update_md_confirm2, | |
| update_button_new, | |
| update_textbox_optional, | |
| update_button_submit_optional, | |
| update_md_optional_feedback, | |
| update_md_run_out_videos, | |
| ) | |
| def _click_button_optional_feedback(self, textbox_optional_feedback): | |
| if textbox_optional_feedback == "": | |
| return gr.skip(), gr.skip() | |
| message = f"{time_now()}\n{textbox_optional_feedback}\n\n" | |
| self._update_local_file(message, self.feedback_file_path_local) | |
| update_md_optional_feedback = gr.update(visible=True) | |
| update_button_submit_optional = gr.update(visible=False) | |
| return update_md_optional_feedback, update_button_submit_optional | |
| def main(self): | |
| # read in md file | |
| with open("start.md", "r") as f: | |
| md_start = f.read() | |
| with gr.Blocks(theme=self.theme, title="Video Quality User Study") as demo: | |
| # set up session states | |
| # random pop videos from this list to get video pairs | |
| videos_left = gr.State(value=self.video_paths, time_to_live=900) | |
| video_pair1 = gr.State(value=["path1", "path2"], time_to_live=900) | |
| video_pair2 = gr.State(value=["path2", "path4"], time_to_live=900) | |
| radio_select_to_path_lut = gr.State(value={}, time_to_live=900) # hold a dict | |
| # set up layout | |
| with gr.Column(): | |
| gr.Markdown(md_start) | |
| # a debug button | |
| if self.args.debug: | |
| def _click_button_debug( | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| ): | |
| print(f"video_pair1: {video_pair1}") | |
| print(f"video_pair2: {video_pair2}") | |
| print(f"radio_select_to_path_lut: {radio_select_to_path_lut}") | |
| print(f"N videos_left: {len(videos_left)}") | |
| button_debug = gr.Button("debug", variant="primary", scale=1) | |
| button_debug.click( | |
| _click_button_debug, | |
| inputs=[video_pair1, video_pair2, radio_select_to_path_lut, videos_left], | |
| ) | |
| # ---------------- video 1-2 ---------------- | |
| gr.Markdown("## Video Pair 1") | |
| with gr.Row(): | |
| video1, video2 = self._setup_video( | |
| path_a=video_pair1.value[0], | |
| path_b=video_pair1.value[1], | |
| label_a="Video 1", | |
| label_b="Video 2", | |
| visible=True, | |
| ) | |
| with gr.Row(): | |
| radio_select1 = gr.Radio( | |
| choices=["Video 1 👍", "Video 2 👍", "Similar 🤔"], | |
| label="Your Preference:", | |
| scale=2, | |
| ) | |
| button_confirm1 = gr.Button( | |
| value="Confirm", | |
| variant="primary", | |
| scale=1, | |
| visible=False, | |
| ) | |
| md_confirm1 = gr.Markdown(visible=False, inputs=button_confirm1) | |
| # ---------------- video 3-4 ---------------- | |
| md_pair_34 = gr.Markdown("## Video Pair 2", visible=False) | |
| with gr.Row(): | |
| video3, video4 = self._setup_video( | |
| path_a=video_pair2.value[0], | |
| path_b=video_pair2.value[1], | |
| label_a="Video 3", | |
| label_b="Video 4", | |
| visible=False, | |
| ) | |
| with gr.Row(): | |
| radio_select2 = gr.Radio( | |
| choices=["Video 3 👍", "Video 4 👍", "Similar 🤔"], | |
| label="Your Preference:", | |
| scale=2, | |
| visible=False, | |
| ) | |
| button_confirm2 = gr.Button( | |
| value="Confirm", | |
| variant="primary", | |
| scale=1, | |
| visible=False, | |
| ) | |
| md_confirm2 = gr.Markdown(visible=False, inputs=button_confirm2) | |
| # ---------------- new button ---------------- | |
| button_new = gr.Button( | |
| value="New One 🎲", | |
| variant="primary", | |
| visible=False, | |
| ) | |
| md_run_out_videos = gr.Markdown( | |
| "You've evaluated all video pairs. Thank you for your participation! 🙏", | |
| visible=False, | |
| ) | |
| # ---------------- optional feedback ---------------- | |
| with gr.Row(): | |
| textbox_optional_feedback = gr.Textbox( | |
| label="Optional Comments:", | |
| placeholder="Type here...", | |
| lines=1, | |
| scale=4, | |
| interactive=True, | |
| visible=False, | |
| ) | |
| button_submit_optional_feedback = gr.Button( | |
| value="Submit Comments", | |
| variant="secondary", | |
| scale=1, | |
| visible=False, | |
| ) | |
| md_optional_feedback = gr.Markdown( | |
| "Thank you for providing additional comments! 🙏", | |
| visible=False, | |
| ) | |
| # set up callbacks | |
| radio_select1.select( | |
| self._click_select_radio, | |
| trigger_mode="once", | |
| outputs=button_confirm1, | |
| ) | |
| button_confirm1.click( | |
| self._click_button_confirm1, | |
| trigger_mode="once", | |
| inputs=[radio_select1, radio_select_to_path_lut], | |
| outputs=[ | |
| md_confirm1, | |
| button_confirm1, | |
| radio_select1, | |
| md_pair_34, | |
| video3, | |
| video4, | |
| radio_select2, | |
| ], | |
| ) | |
| radio_select2.select( | |
| self._click_select_radio, | |
| trigger_mode="once", | |
| outputs=button_confirm2, | |
| ) | |
| button_confirm2.click( | |
| self._click_button_confirm2, | |
| trigger_mode="once", | |
| inputs=[radio_select2, radio_select_to_path_lut], | |
| outputs=[ | |
| md_confirm2, | |
| button_confirm2, | |
| radio_select2, | |
| button_new, | |
| textbox_optional_feedback, | |
| button_submit_optional_feedback, | |
| ], | |
| ) | |
| button_new.click( | |
| self._click_button_new, | |
| trigger_mode="once", | |
| inputs=videos_left, | |
| outputs=[ | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| video1, | |
| video2, | |
| radio_select1, | |
| button_confirm1, | |
| md_confirm1, | |
| md_pair_34, | |
| video3, | |
| video4, | |
| radio_select2, | |
| button_confirm2, | |
| md_confirm2, | |
| button_new, | |
| textbox_optional_feedback, | |
| button_submit_optional_feedback, | |
| md_optional_feedback, | |
| md_run_out_videos, | |
| ], | |
| ) | |
| button_submit_optional_feedback.click( | |
| self._click_button_optional_feedback, | |
| trigger_mode="once", | |
| inputs=textbox_optional_feedback, | |
| outputs=[md_optional_feedback, button_submit_optional_feedback], | |
| ) | |
| demo.load( | |
| self._load_callback, | |
| inputs=videos_left, | |
| outputs=[ | |
| video_pair1, | |
| video_pair2, | |
| radio_select_to_path_lut, | |
| videos_left, | |
| video1, | |
| video2, | |
| video3, | |
| video4, | |
| md_run_out_videos, | |
| ], | |
| ) | |
| demo.launch(share=self.args.share, show_api=False) | |
| def parse_args(): | |
| parser = ArgumentParser() | |
| # use this config as HF space does not take args | |
| parser.add_argument("--config", type=str, default="config.yaml") | |
| # these args are useful for local debugging | |
| parser.add_argument("--debug", action="store_true") | |
| parser.add_argument("--share", action="store_true") | |
| parser.add_argument("--no_sync", action="store_true") | |
| args = parser.parse_args() | |
| with open(args.config, "r") as f: | |
| config = yaml.safe_load(f) | |
| for key, value in config.items(): | |
| setattr(args, key, value) | |
| pprint(vars(args)) | |
| return args | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| survey_engine = SurveyEngine(args) | |
| survey_engine.main() | |