peterw2333's picture
fix response before after logic
8c69271
import gradio as gr
import json
from datetime import datetime
import random
import os
# CHANGED: Import HfApi and download utility instead of Repository
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
random.seed(20240128)
# CHANGED: Removed subprocess git config.
# HfApi handles authentication via the token directly over HTTP.
hf_token = os.getenv("HF_TOKEN")
print("HF Token is none?", hf_token is None)
# Initialize the repository constants
REPO_ID = "peterw2333/test" # Clean repo ID instead of full URL
LOCAL_DIR = "user_responses"
SUBMISSIONS_FILE = "response_202507.jsonl"
LOCAL_FILE_PATH = os.path.join(LOCAL_DIR, SUBMISSIONS_FILE)
# CHANGED: Initialize API
api = HfApi(token=hf_token)
# CHANGED: Download the existing file to local disk so we can check for duplicates
os.makedirs(LOCAL_DIR, exist_ok=True)
try:
print("Downloading existing data from Hub...")
hf_hub_download(
repo_id=REPO_ID,
repo_type="dataset",
filename=SUBMISSIONS_FILE,
local_dir=LOCAL_DIR
)
except (EntryNotFoundError, Exception) as e:
print(f"File not found or error downloading (creates new if first time): {e}")
def prepare_test_cases():
# Make sure this path exists in your Docker container or is downloaded similarly
json_path = "videos/rir.json"
# Simple check to prevent crash if file missing during local dev
if not os.path.exists(json_path):
print(f"Warning: {json_path} not found.")
return {}
with open(json_path, "r") as f:
video_dict = json.load(f)
video_ids = list(video_dict.keys())
for video_id in video_ids:
if random.random() > 0.5:
video_list = [video_dict[video_id]['before'], video_dict[video_id]['after']]
else:
video_list = [video_dict[video_id]['after'], video_dict[video_id]['before']]
random.shuffle(video_list)
video_dict[video_id]['Video 1'] = video_list[0]
video_dict[video_id]['Video 2'] = video_list[1]
return video_dict
video_dict = prepare_test_cases()
video_ids = list(video_dict.keys())
random.shuffle(video_ids)
print("DEBUG: video_dict", video_dict)
questions = [
"Between Video 1 (left) and Video 2 (right), which one's results are more accurate according to the text prompt?",
"Between Video 1 (left) and Video 2 (right), which one's results are more has a higher quality of human-human interaction?"
]
def has_already_submitted(user_id):
if os.path.exists(LOCAL_FILE_PATH):
with open(LOCAL_FILE_PATH, "r") as f:
for line in f:
try:
submission = json.loads(line)
if submission.get("u_id") == user_id:
return True
except json.JSONDecodeError:
continue
return False
# Save responses
def save_responses(unique_submission, *responses):
timestamp = datetime.now().isoformat()
info = responses[-1]
responses = responses[:-1]
unique_id = info["session_id"]
user_id = f"{unique_id}"
# Check for unique submission
if unique_submission and has_already_submitted(user_id):
return "You have already submitted responses. Thank you for participating!"
# Initialize the result dictionary
result = {
"u_id": user_id,
"timestamp": timestamp,
"responses": []
}
print("DEBUG responses:", responses)
for index in range(len(video_ids)):
start_idx = index * len(questions)
end_idx = start_idx + len(questions)
response = responses[start_idx:end_idx]
if any(r is None for r in response):
return "Please answer all questions before submitting."
video_id = video_ids[index]
pair_response = {
video_id: {
'accuracy': "before" if "videos/before" in video_dict[video_id][response[0]][0] else "after",
'quality': "before" if "videos/before" in video_dict[video_id][response[1]][0] else "after",
}
}
print("DEBUG pair_response:", video_id, pair_response)
result["responses"].append(pair_response)
result["responses"] = sorted(result["responses"], key=lambda x: x.keys())
# Save response locally
with open(LOCAL_FILE_PATH, "a") as f:
f.write(json.dumps(result) + "\n")
# CHANGED: Push specific file to Hub using HfApi
try:
print("Uploading to Hub...")
api.upload_file(
path_or_fileobj=LOCAL_FILE_PATH,
path_in_repo=SUBMISSIONS_FILE,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"New submission from {user_id}"
)
except Exception as e:
return f"Error uploading to Hugging Face: {str(e)}"
return "All responses saved! Thank you for participating!"
def create_interface(unique_submission=False):
with gr.Blocks() as demo:
gr.Markdown("# Human Preference Study: Two Person Interaction")
gr.Markdown("""
## In each of the following pairs, you will be presented with two videos.\n
## For each pair, please first read the text prompt carefully, then examine the videos and answer the following questions.
""")
responses = []
for index, video_id in enumerate(video_ids):
video_prompt = video_dict[video_id]['prompt']
video1_list = video_dict[video_id]['Video 1']
video2_list = video_dict[video_id]['Video 2']
gr.Markdown(f"# Video Pair {index + 1}")
gr.Markdown(f"## Text Prompt: {video_prompt}")
with gr.Row():
for video in video1_list:
gr.Video(video, label="Video 1")
for video in video2_list:
gr.Video(video, label="Video 2")
with gr.Row():
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[0], value=None))
with gr.Row():
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[1], value=None))
gr.Markdown("---")
info = gr.JSON(visible=False)
demo.load(predict, None, info)
submit_btn = gr.Button("Submit")
result_message = gr.Textbox(label="Message (please only submit once)", interactive=False)
submit_btn.click(
fn=lambda *args: save_responses(unique_submission, *args),
inputs=responses+[info],
outputs=result_message
)
return demo
def predict(request: gr.Request):
headers = request.headers
host = request.client.host
user_agent = request.headers["user-agent"]
session_id = request.session_hash
return {
"ip": host,
"user_agent": user_agent,
"headers": headers,
"session_id": session_id
}
if __name__ == "__main__":
# Launch with unique_submission set based on `--unique` flag
demo = create_interface(unique_submission=True)
demo.launch(share=True)