|
|
import os |
|
|
import json |
|
|
import hashlib |
|
|
import random |
|
|
import threading |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
import gradio as gr |
|
|
from PIL import Image |
|
|
from huggingface_hub import HfApi, CommitOperationAdd |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HF_RESULTS_REPO = os.getenv("HF_RESULTS_REPO") |
|
|
HF_RESULTS_REPO_TYPE = "dataset" |
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
_hf_api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
|
|
|
TARGET_PER_PERSON = 30 |
|
|
CONTACT_EMAIL = "ffallah@asu.edu" |
|
|
|
|
|
|
|
|
|
|
|
GT_MASKED_DIR = "data/gt_b" |
|
|
GT_UNMASKED_DIR = "data/adc_b" |
|
|
SR_DIR = "data/sr_b" |
|
|
ORIGINAL_DIR = "data/lr_b" |
|
|
IMAGE_5_DIR = "data/see_b" |
|
|
|
|
|
|
|
|
RESULTS_DIR = "results" |
|
|
PROGRESS_PATH = os.path.join(RESULTS_DIR, "progress.json") |
|
|
ALL_RESULTS_JSONL = os.path.join(RESULTS_DIR, "all_results.jsonl") |
|
|
SAVE_PII = True |
|
|
|
|
|
WRITE_LOCK = threading.Lock() |
|
|
STRICT_ENFORCEMENT = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Sample: |
|
|
sample_id: str |
|
|
masked_gt_path: str |
|
|
unmasked_gt_path: str |
|
|
sr_path: str |
|
|
original_path: str |
|
|
image_5_path: str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def user_target_count(samples: List[Sample]) -> int: |
|
|
return min(len(samples), TARGET_PER_PERSON) |
|
|
|
|
|
def user_left_count(user_seen: List[str], samples: List[Sample]) -> int: |
|
|
target = user_target_count(samples) |
|
|
seen = set(user_seen or []) |
|
|
allowed_ids = {s.sample_id for s in samples} |
|
|
seen_in_allowed = len([sid for sid in seen if sid in allowed_ids]) |
|
|
return max(0, target - seen_in_allowed) |
|
|
|
|
|
def _ensure_private_repo(repo_id: str): |
|
|
try: |
|
|
_hf_api.repo_info(repo_id, repo_type=HF_RESULTS_REPO_TYPE) |
|
|
except Exception: |
|
|
_hf_api.create_repo(repo_id=repo_id, repo_type=HF_RESULTS_REPO_TYPE, private=True) |
|
|
|
|
|
def push_results_to_private_repo(uid: str): |
|
|
if not HF_TOKEN or not HF_RESULTS_REPO: |
|
|
return |
|
|
try: |
|
|
os.makedirs(RESULTS_DIR, exist_ok=True) |
|
|
user_file = os.path.join(RESULTS_DIR, f"{uid}.jsonl") |
|
|
|
|
|
ops = [ |
|
|
CommitOperationAdd( |
|
|
path_in_repo="results/all_results.jsonl", |
|
|
path_or_fileobj=ALL_RESULTS_JSONL |
|
|
), |
|
|
CommitOperationAdd( |
|
|
path_in_repo=f"results/users/{uid}.jsonl", |
|
|
path_or_fileobj=user_file |
|
|
), |
|
|
CommitOperationAdd( |
|
|
path_in_repo="results/progress.json", |
|
|
path_or_fileobj=PROGRESS_PATH |
|
|
), |
|
|
] |
|
|
_hf_api.create_commit( |
|
|
repo_id=HF_RESULTS_REPO, |
|
|
repo_type=HF_RESULTS_REPO_TYPE, |
|
|
operations=ops, |
|
|
commit_message="Update RTS eval results" |
|
|
) |
|
|
except Exception as e: |
|
|
print("[WARN] push_results_to_private_repo failed:", e) |
|
|
|
|
|
def ensure_paths(): |
|
|
os.makedirs(RESULTS_DIR, exist_ok=True) |
|
|
for pth, name in [ |
|
|
(GT_MASKED_DIR, "GT_MASKED_DIR"), |
|
|
(GT_UNMASKED_DIR, "GT_UNMASKED_DIR"), |
|
|
(SR_DIR, "SR_DIR"), |
|
|
(ORIGINAL_DIR, "ORIGINAL_DIR"), |
|
|
(IMAGE_5_DIR, "IMAGE_5_DIR"), |
|
|
]: |
|
|
if not os.path.isdir(pth): |
|
|
print(f"Warning: Directory '{pth}' for {name} not found.") |
|
|
|
|
|
def load_image(path: str) -> Image.Image: |
|
|
if not path or not os.path.exists(path): |
|
|
|
|
|
return Image.new("RGB", (256, 256), color="gray") |
|
|
try: |
|
|
return Image.open(path).convert("RGB") |
|
|
except Exception: |
|
|
return Image.new("RGB", (256, 256), color="gray") |
|
|
|
|
|
def load_dataset( |
|
|
gt_masked_dir: str, |
|
|
gt_unmasked_dir: str, |
|
|
sr_dir: str, |
|
|
original_dir: str, |
|
|
image_5_dir: str, |
|
|
) -> List[Sample]: |
|
|
""" |
|
|
Build samples only from the 5 folders. |
|
|
Each folder should have the same filenames. |
|
|
Example layout: |
|
|
data/gt_b/xxx.png |
|
|
data/adc_b/xxx.png |
|
|
data/sr_b/xxx.png |
|
|
data/lr_b/xxx.png |
|
|
data/see_b/xxx.png |
|
|
""" |
|
|
|
|
|
def list_images(dir_path: str) -> set: |
|
|
if not os.path.isdir(dir_path): |
|
|
print(f"Warning: directory not found: {dir_path}") |
|
|
return set() |
|
|
files = [] |
|
|
for f in os.listdir(dir_path): |
|
|
f_lower = f.lower() |
|
|
if f_lower.endswith((".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp")): |
|
|
files.append(f) |
|
|
return set(files) |
|
|
|
|
|
masked_files = list_images(gt_masked_dir) |
|
|
unmasked_files = list_images(gt_unmasked_dir) |
|
|
sr_files = list_images(sr_dir) |
|
|
orig_files = list_images(original_dir) |
|
|
img5_files = list_images(image_5_dir) |
|
|
|
|
|
|
|
|
common_files = masked_files & unmasked_files & sr_files & orig_files & img5_files |
|
|
|
|
|
if not common_files: |
|
|
print("No common image files found in all 5 folders.") |
|
|
return [] |
|
|
|
|
|
|
|
|
print(f"Found {len(common_files)} common images.") |
|
|
|
|
|
samples: List[Sample] = [] |
|
|
for base_filename in sorted(common_files): |
|
|
sample_id = os.path.splitext(base_filename)[0] |
|
|
|
|
|
paths = { |
|
|
"masked": os.path.join(gt_masked_dir, base_filename), |
|
|
"unmasked": os.path.join(gt_unmasked_dir, base_filename), |
|
|
"sr": os.path.join(sr_dir, base_filename), |
|
|
"original": os.path.join(original_dir, base_filename), |
|
|
"img5": os.path.join(image_5_dir, base_filename), |
|
|
} |
|
|
|
|
|
|
|
|
if STRICT_ENFORCEMENT: |
|
|
if not all(os.path.exists(p) for p in paths.values()): |
|
|
missing = [k for k, v in paths.items() if not os.path.exists(v)] |
|
|
print(f"Skipping {base_filename}: missing in folders {missing}") |
|
|
continue |
|
|
|
|
|
samples.append( |
|
|
Sample( |
|
|
sample_id=sample_id, |
|
|
masked_gt_path=paths["masked"], |
|
|
unmasked_gt_path=paths["unmasked"], |
|
|
sr_path=paths["sr"], |
|
|
original_path=paths["original"], |
|
|
image_5_path=paths["img5"], |
|
|
) |
|
|
) |
|
|
|
|
|
return samples |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hash_user_id(name: str, email: str) -> str: |
|
|
norm = (name or "").strip().lower() + "|" + (email or "").strip().lower() |
|
|
return hashlib.sha256(norm.encode("utf-8")).hexdigest()[:16] |
|
|
|
|
|
def load_progress() -> Dict[str, Dict[str, Any]]: |
|
|
if not os.path.exists(PROGRESS_PATH): |
|
|
return {} |
|
|
try: |
|
|
with open(PROGRESS_PATH, "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
except Exception: |
|
|
return {} |
|
|
|
|
|
def save_progress(progress: Dict[str, Dict[str, Any]]): |
|
|
with WRITE_LOCK: |
|
|
with open(PROGRESS_PATH, "w", encoding="utf-8") as f: |
|
|
json.dump(progress, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
def append_jsonl(path: str, record: Dict[str, Any]): |
|
|
line = json.dumps(record, ensure_ascii=False) |
|
|
with WRITE_LOCK: |
|
|
with open(path, "a", encoding="utf-8") as f: |
|
|
f.write(line + "\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_scores_to_rank(s1, s2, s3, s4, s5) -> Dict[str, int]: |
|
|
scores = [ |
|
|
("image_1", s1), |
|
|
("image_2", s2), |
|
|
("image_3", s3), |
|
|
("image_4", s4), |
|
|
("image_5", s5) |
|
|
] |
|
|
scores.sort(key=lambda x: x[1], reverse=True) |
|
|
ranks = {} |
|
|
current_rank = 1 |
|
|
for img_key, score in scores: |
|
|
ranks[img_key] = current_rank |
|
|
current_rank += 1 |
|
|
return ranks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pick_next_index(user_seen: List[str], samples: List[Sample]) -> int: |
|
|
|
|
|
seen_set = set(user_seen or []) |
|
|
remaining = [i for i, s in enumerate(samples) if s.sample_id not in seen_set] |
|
|
if not remaining: |
|
|
return -1 |
|
|
return random.choice(remaining) |
|
|
|
|
|
def start_or_resume(name: str, email: str): |
|
|
if not name or not email: |
|
|
raise gr.Error("Please enter your name and email to begin.") |
|
|
|
|
|
ensure_paths() |
|
|
samples = load_dataset(GT_MASKED_DIR, GT_UNMASKED_DIR, SR_DIR, ORIGINAL_DIR, IMAGE_5_DIR) |
|
|
|
|
|
if not samples: |
|
|
raise gr.Error("No images found. Please check dataset configuration.") |
|
|
|
|
|
uid = hash_user_id(name, email) |
|
|
progress = load_progress() |
|
|
if uid not in progress: |
|
|
progress[uid] = {"seen": []} |
|
|
save_progress(progress) |
|
|
|
|
|
user_seen: List[str] = progress[uid].get("seen", []) |
|
|
left = user_left_count(user_seen, samples) |
|
|
|
|
|
|
|
|
placeholder_img = Image.new("RGB", (256, 256), color="gray") |
|
|
|
|
|
|
|
|
if left == 0 and len(user_seen) >= user_target_count(samples): |
|
|
status = ( |
|
|
f"Welcome back, {name}. You’ve completed all {user_target_count(samples)} images. 🎉\n" |
|
|
f"Your personal results file: {os.path.join(RESULTS_DIR, f'{uid}.jsonl')}" |
|
|
) |
|
|
return ( |
|
|
uid, |
|
|
samples, |
|
|
user_seen, |
|
|
-1, |
|
|
placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, |
|
|
status, |
|
|
os.path.join(RESULTS_DIR, f"{uid}.jsonl"), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
) |
|
|
|
|
|
idx = pick_next_index(user_seen, samples) |
|
|
if idx == -1: |
|
|
return ( |
|
|
uid, |
|
|
samples, |
|
|
user_seen, |
|
|
-1, |
|
|
placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, |
|
|
"No more new images available.", |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True) |
|
|
) |
|
|
|
|
|
sample = samples[idx] |
|
|
|
|
|
status = ( |
|
|
f"Welcome, {name}. Personal progress — images left: {left} of {user_target_count(samples)}.\n" |
|
|
f"Current sample: {sample.sample_id}" |
|
|
) |
|
|
|
|
|
os.makedirs(RESULTS_DIR, exist_ok=True) |
|
|
user_file_path = os.path.join(RESULTS_DIR, f"{uid}.jsonl") |
|
|
|
|
|
return ( |
|
|
uid, |
|
|
samples, |
|
|
user_seen, |
|
|
idx, |
|
|
load_image(sample.masked_gt_path), |
|
|
load_image(sample.unmasked_gt_path), |
|
|
load_image(sample.sr_path), |
|
|
load_image(sample.original_path), |
|
|
load_image(sample.image_5_path), |
|
|
status, |
|
|
user_file_path, |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
) |
|
|
|
|
|
|
|
|
def _save_record_and_progress( |
|
|
name: str, |
|
|
email: str, |
|
|
uid: str, |
|
|
samples: List[Sample], |
|
|
user_seen: List[str], |
|
|
idx: int, |
|
|
score_1: float, |
|
|
score_2: float, |
|
|
score_3: float, |
|
|
score_4: float, |
|
|
score_5: float, |
|
|
q1_notes: str, |
|
|
): |
|
|
if not name or not email: |
|
|
raise gr.Error("Please enter your name and email.") |
|
|
|
|
|
|
|
|
if idx is None or idx < 0 or idx >= len(samples): |
|
|
return load_progress() |
|
|
|
|
|
rank_dict = convert_scores_to_rank(score_1, score_2, score_3, score_4, score_5) |
|
|
|
|
|
sample = samples[idx] |
|
|
progress = load_progress() |
|
|
progress.setdefault(uid, {"seen": []}) |
|
|
seen = set(progress[uid].get("seen", [])) |
|
|
|
|
|
if sample.sample_id in seen: |
|
|
return progress |
|
|
|
|
|
record: Dict[str, Any] = { |
|
|
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
|
|
"user_id": uid, |
|
|
"name": name if SAVE_PII else None, |
|
|
"email": email if SAVE_PII else None, |
|
|
"sample_id": sample.sample_id, |
|
|
"raw_scores": { |
|
|
"image_1": score_1, |
|
|
"image_2": score_2, |
|
|
"image_3": score_3, |
|
|
"image_4": score_4, |
|
|
"image_5": score_5, |
|
|
}, |
|
|
"responses": { |
|
|
"notes": q1_notes or "", |
|
|
"image_ranking": rank_dict, |
|
|
}, |
|
|
} |
|
|
|
|
|
os.makedirs(RESULTS_DIR, exist_ok=True) |
|
|
append_jsonl(os.path.join(RESULTS_DIR, f"{uid}.jsonl"), record) |
|
|
append_jsonl(ALL_RESULTS_JSONL, record) |
|
|
|
|
|
|
|
|
try: |
|
|
thread = threading.Thread(target=push_results_to_private_repo, args=(uid,)) |
|
|
thread.daemon = True |
|
|
thread.start() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
seen.add(sample.sample_id) |
|
|
progress[uid]["seen"] = sorted(list(seen)) |
|
|
save_progress(progress) |
|
|
return progress |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def submit_finish( |
|
|
name: str, |
|
|
email: str, |
|
|
uid: str, |
|
|
samples: List[Sample], |
|
|
user_seen: List[str], |
|
|
idx: int, |
|
|
s1: float, s2: float, s3: float, s4: float, s5: float, |
|
|
q1_notes: str |
|
|
): |
|
|
try: |
|
|
_save_record_and_progress( |
|
|
name, email, uid, samples, user_seen, idx, |
|
|
s1, s2, s3, s4, s5, |
|
|
q1_notes |
|
|
) |
|
|
except gr.Error: |
|
|
return ( |
|
|
user_seen, idx, |
|
|
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
|
|
gr.update(), |
|
|
) |
|
|
|
|
|
return ( |
|
|
user_seen, idx, |
|
|
gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), |
|
|
gr.update(value=""), |
|
|
gr.update(value="Finished!"), |
|
|
gr.update(value=5), gr.update(value=5), gr.update(value=5), gr.update(value=5), gr.update(value=5), |
|
|
gr.update(value=None), |
|
|
) |
|
|
|
|
|
def pause_exit(user_seen, samples): |
|
|
return user_seen, samples |
|
|
|
|
|
def submit_next_image( |
|
|
name: str, |
|
|
email: str, |
|
|
uid: str, |
|
|
samples: List[Sample], |
|
|
user_seen: List[Sample], |
|
|
idx: int, |
|
|
s1: float, s2: float, s3: float, s4: float, s5: float, |
|
|
q1_notes: str |
|
|
): |
|
|
try: |
|
|
progress = _save_record_and_progress( |
|
|
name, email, uid, samples, user_seen, idx, |
|
|
s1, s2, s3, s4, s5, |
|
|
q1_notes |
|
|
) |
|
|
except gr.Error as e: |
|
|
raise e |
|
|
|
|
|
seen_list = progress.get(uid, {}).get("seen", []) |
|
|
left_after = user_left_count(seen_list, samples) |
|
|
target = user_target_count(samples) |
|
|
|
|
|
|
|
|
placeholder_img = Image.new("RGB", (256, 256), color="gray") |
|
|
|
|
|
|
|
|
if left_after == 0: |
|
|
status = ( |
|
|
f"Saved! You’ve completed all {target} images. 🎉 " |
|
|
f"Click **Exit** to close this session." |
|
|
) |
|
|
return ( |
|
|
seen_list, -1, |
|
|
placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, |
|
|
gr.update(value=status), |
|
|
gr.update(value=""), |
|
|
5, 5, 5, 5, 5, |
|
|
) |
|
|
|
|
|
idx_next = pick_next_index(seen_list, samples) |
|
|
if idx_next == -1: |
|
|
|
|
|
return ( |
|
|
seen_list, -1, |
|
|
placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, |
|
|
"No more images.", |
|
|
"", |
|
|
5, 5, 5, 5, 5, |
|
|
) |
|
|
|
|
|
|
|
|
sample_next = samples[idx_next] |
|
|
|
|
|
return ( |
|
|
seen_list, idx_next, |
|
|
load_image(sample_next.masked_gt_path), |
|
|
load_image(sample_next.unmasked_gt_path), |
|
|
load_image(sample_next.sr_path), |
|
|
load_image(sample_next.original_path), |
|
|
load_image(sample_next.image_5_path), |
|
|
gr.update(value=""), |
|
|
gr.update(value=""), |
|
|
5, 5, 5, 5, 5, |
|
|
) |
|
|
|
|
|
|
|
|
def to_thanks(name: str, user_seen: List[str], samples: List[Sample]): |
|
|
left = user_left_count(user_seen, samples) |
|
|
target = user_target_count(samples) |
|
|
if left > 0: |
|
|
msg = ( |
|
|
f"### ⏸️ Session Paused!\n\n" |
|
|
f"### ✅ Thanks, {name}! Your progress has been saved.\n\n" |
|
|
f"We’re grateful for your time and expertise. Our suggested target is " |
|
|
f"{TARGET_PER_PERSON} images per reviewer.\n\n" |
|
|
f"You have **{left}** images left.\n\n" |
|
|
f"You can close this tab and return whenever you like—just use the same Name and Email to **continue where you left off**.\n\n" |
|
|
f"If you have questions, issues, or suggestions, please email **{CONTACT_EMAIL}**.\n\n" |
|
|
f"Click **Start Again** to evaluate another image." |
|
|
) |
|
|
else: |
|
|
msg = ( |
|
|
f"### ✅ All Done, {name}!\n\n" |
|
|
f"You’ve completed the target of **{target}** images. Your responses are securely saved.\n\n" |
|
|
f"We’re extremely grateful for your time and expertise. You are welcome to continue with more images if you wish, or you can finish here.\n\n" |
|
|
f"If you have questions, issues, or suggestions, please email **{CONTACT_EMAIL}**.\n\n" |
|
|
) |
|
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=msg) |
|
|
|
|
|
def hide_thanks(): |
|
|
return gr.update(visible=False) |
|
|
|
|
|
def maybe_show_thanks(name: str, seen: List[str], samples: List[Sample]): |
|
|
if len(set(seen or [])) >= TARGET_PER_PERSON: |
|
|
return to_thanks(name, seen, samples) |
|
|
return gr.update(visible=True), gr.update(visible=False), gr.update() |
|
|
|
|
|
def reset_to_start(): |
|
|
return ( |
|
|
gr.update(value=""), |
|
|
gr.update(value=""), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="RTS Human Evaluation", theme=gr.themes.Soft()) as demo: |
|
|
intro_md = gr.Markdown( |
|
|
f""" |
|
|
# Retrogressive Thaw Slump (RTS) Human Evaluation |
|
|
|
|
|
### 👋 Welcome, and thanks for lending your expertise! |
|
|
We’re inviting domain experts to help evaluate satellite image patches for RTS. |
|
|
|
|
|
--- |
|
|
|
|
|
### 📋 Instructions |
|
|
* **Suggested target:** ~{TARGET_PER_PERSON} images per reviewer. |
|
|
* **The Task:** For each set, you will see 5 variations of the same satellite image. |
|
|
* **Rating:** Rate each image from **1 (Poor)** to **10 (Excellent)** based on how clearly the RTS feature (indicated by the **Red Box**) is depicted. |
|
|
|
|
|
### ⏸️ Saving & Resuming |
|
|
* **Automatic Saving:** Your progress is saved automatically after every "Submit". |
|
|
* **Take a Break:** You can close this tab at any time. |
|
|
* **How to Resume:** Simply return here and enter the **exact same Name and Email**. The system will pick up exactly where you left off. |
|
|
|
|
|
--- |
|
|
**Questions or issues?** Email **{CONTACT_EMAIL}** — we appreciate your feedback and suggestions. |
|
|
|
|
|
**Ready?** Enter your details below to begin. |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
state_uid = gr.State("") |
|
|
state_samples = gr.State([]) |
|
|
state_seen = gr.State([]) |
|
|
state_idx = gr.State(-1) |
|
|
|
|
|
with gr.Group() as start_group: |
|
|
with gr.Row(): |
|
|
name = gr.Textbox(label="Full name", placeholder="Jane Doe", autofocus=True) |
|
|
email = gr.Textbox(label="Email address", placeholder="jane@example.com") |
|
|
start_btn = gr.Button("Start / Resume", variant="primary") |
|
|
status = gr.Markdown("\n") |
|
|
|
|
|
eval_panel = gr.Group(visible=False) |
|
|
with eval_panel: |
|
|
gr.Markdown( |
|
|
""" |
|
|
Focus your attention on the area inside the **Red Box**. This marks the potential location of the RTS. Compare the five images below. Rate how clearly and realistically each image depicts the **RTS** feature. |
|
|
|
|
|
**Rating Scale (1 - 10):** |
|
|
* **10 (Excellent):** The RTS feature is sharp, distinct, and clearly visible. |
|
|
* **1 (Poor):** The RTS feature is blurry, distorted, or impossible to distinguish. |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1, min_width=150): |
|
|
gr.Markdown("<div style='text-align:center; font-weight:600;'>Image 1</div>") |
|
|
image_1 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) |
|
|
score_1 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") |
|
|
|
|
|
with gr.Column(scale=1, min_width=150): |
|
|
gr.Markdown("<div style='text-align:center; font-weight:600;'>Image 2</div>") |
|
|
image_2 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) |
|
|
score_2 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") |
|
|
|
|
|
with gr.Column(scale=1, min_width=150): |
|
|
gr.Markdown("<div style='text-align:center; font-weight:600;'>Image 3</div>") |
|
|
image_3 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) |
|
|
score_3 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") |
|
|
|
|
|
with gr.Column(scale=1, min_width=150): |
|
|
gr.Markdown("<div style='text-align:center; font-weight:600;'>Image 4</div>") |
|
|
image_4 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) |
|
|
score_4 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") |
|
|
|
|
|
with gr.Column(scale=1, min_width=150): |
|
|
gr.Markdown("<div style='text-align:center; font-weight:600;'>Image 5</div>") |
|
|
image_5 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) |
|
|
score_5 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") |
|
|
|
|
|
notes_q1 = gr.Textbox( |
|
|
label="Notes (Optional)", |
|
|
lines=2, |
|
|
placeholder="If there are multiple RTS or ambiguities, please note here." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
submit_next_btn = gr.Button("Submit & Next Image", variant="primary") |
|
|
pause_exit_btn = gr.Button("Exit", variant="secondary") |
|
|
|
|
|
your_jsonl_path = gr.State() |
|
|
|
|
|
with gr.Group(visible=False) as thanks_group: |
|
|
thanks_md = gr.Markdown("### ✅ Thanks! Your responses were saved.\n\nClick **Start Again** to evaluate another image.") |
|
|
restart_btn = gr.Button("Start Again", variant="primary") |
|
|
|
|
|
|
|
|
start_event = start_btn.click( |
|
|
start_or_resume, |
|
|
inputs=[name, email], |
|
|
outputs=[ |
|
|
state_uid, state_samples, state_seen, state_idx, |
|
|
image_1, image_2, image_3, image_4, image_5, |
|
|
status, your_jsonl_path, |
|
|
eval_panel, intro_md, start_group |
|
|
], |
|
|
) |
|
|
start_event.then(hide_thanks, inputs=None, outputs=[thanks_group]) |
|
|
|
|
|
|
|
|
pause_event = pause_exit_btn.click( |
|
|
pause_exit, |
|
|
inputs=[state_seen, state_samples], |
|
|
outputs=[state_seen, state_samples], |
|
|
) |
|
|
|
|
|
|
|
|
pause_event.then( |
|
|
to_thanks, |
|
|
inputs=[name, state_seen, state_samples], |
|
|
outputs=[eval_panel, thanks_group, thanks_md], |
|
|
) |
|
|
|
|
|
nextimg_event = submit_next_btn.click( |
|
|
submit_next_image, |
|
|
inputs=[name, email, state_uid, state_samples, state_seen, state_idx, |
|
|
score_1, score_2, score_3, score_4, score_5, notes_q1], |
|
|
outputs=[state_seen, state_idx, |
|
|
image_1, image_2, image_3, image_4, image_5, |
|
|
status, notes_q1, |
|
|
score_1, score_2, score_3, score_4, score_5], |
|
|
) |
|
|
nextimg_event.then( |
|
|
maybe_show_thanks, |
|
|
inputs=[name, state_seen, state_samples], |
|
|
outputs=[eval_panel, thanks_group, thanks_md], |
|
|
) |
|
|
|
|
|
restart_event = restart_btn.click( |
|
|
reset_to_start, |
|
|
inputs=[], |
|
|
outputs=[ |
|
|
name, email, |
|
|
start_group, intro_md, |
|
|
eval_panel, thanks_group |
|
|
], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if HF_RESULTS_REPO: |
|
|
from huggingface_hub import snapshot_download |
|
|
try: |
|
|
snapshot_download( |
|
|
repo_id=HF_RESULTS_REPO, |
|
|
repo_type="dataset", |
|
|
local_dir=".", |
|
|
allow_patterns=["data/*", "results/*"], |
|
|
token=HF_TOKEN |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error reading from HF: {e}") |
|
|
|
|
|
ensure_paths() |
|
|
_ = load_dataset(GT_MASKED_DIR, GT_UNMASKED_DIR, SR_DIR, ORIGINAL_DIR, IMAGE_5_DIR) |
|
|
|
|
|
print("✅ Launching app.") |
|
|
demo.queue() |
|
|
demo.launch() |
|
|
|