Spaces:
Sleeping
Sleeping
| import os, uuid, csv, random | |
| from datetime import datetime | |
| from PIL import Image | |
| import gradio as gr | |
| # —— 1. 环境 & 文件准备 —— | |
| os.environ["GRADIO_SSR_MODE"] = "False" # 关掉 SSR | |
| # 确保 data 目录及子目录存在 | |
| os.makedirs("data/images/task0/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task0/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task1/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task1/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task2/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task2/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task3/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task3/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task4/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task4/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task5/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task5/processed_imgs", exist_ok=True) | |
| os.makedirs("data/images/task6/orig_imgs", exist_ok=True) | |
| os.makedirs("data/images/task6/processed_imgs", exist_ok=True) | |
| # 在文件开头添加必要的目录创建 | |
| os.makedirs("data/evaluations", exist_ok=True) | |
| os.makedirs("data/metadatas", exist_ok=True) | |
| meta0 = "data/metadatas/meta0.csv" | |
| meta1 = "data/metadatas/meta1.csv" | |
| meta2 = "data/metadatas/meta2.csv" | |
| meta3 = "data/metadatas/meta3.csv" | |
| meta4 = "data/metadatas/meta4.csv" | |
| meta5 = "data/metadatas/meta5.csv" | |
| meta6 = "data/metadatas/meta6.csv" | |
| if not os.path.exists(meta0): | |
| with open(meta0, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta1): | |
| with open(meta1, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta2): | |
| with open(meta2, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta3): | |
| with open(meta3, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta4): | |
| with open(meta4, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta5): | |
| with open(meta5, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(meta6): | |
| with open(meta6, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path","prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writeheader() | |
| eval0 = "data/evaluations/eval0.csv" | |
| eval1 = "data/evaluations/eval1.csv" | |
| eval2 = "data/evaluations/eval2.csv" | |
| eval3 = "data/evaluations/eval3.csv" | |
| eval4 = "data/evaluations/eval4.csv" | |
| eval5 = "data/evaluations/eval5.csv" | |
| eval6 = "data/evaluations/eval6.csv" | |
| if not os.path.exists(eval0): | |
| with open(eval0, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval1): | |
| with open(eval1, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval2): | |
| with open(eval2, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval3): | |
| with open(eval3, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval4): | |
| with open(eval4, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval5): | |
| with open(eval5, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| if not os.path.exists(eval6): | |
| with open(eval6, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writeheader() | |
| def run_agent_on_image(original_img: Image.Image, prompt: str, agent_name: str) -> Image.Image: | |
| if original_img is None: | |
| raise ValueError("Input image cannot be None") | |
| if not prompt or prompt.strip() == "": | |
| raise ValueError("Prompt cannot be empty") | |
| return original_img # TODO: implement actual agent processing | |
| def save_to_library(task_id, orig_img, prompt, a1, a2, img1, img2): | |
| try: | |
| if any(img is None for img in [orig_img, img1, img2]): | |
| raise ValueError("All images must be valid") | |
| if not prompt or prompt.strip() == "": | |
| raise ValueError("Prompt cannot be empty") | |
| orig_id = uuid.uuid4().hex | |
| orig_path = f"data/images/task{task_id}/orig_imgs/{orig_id}.png" | |
| img1_path = f"data/images/task{task_id}/processed_imgs/{orig_id}_a1.png" | |
| img2_path = f"data/images/task{task_id}/processed_imgs/{orig_id}_a2.png" | |
| # 使用 try-except 处理图片保存 | |
| try: | |
| orig_img.save(orig_path) | |
| img1.save(img1_path) | |
| img2.save(img2_path) | |
| except Exception as e: | |
| raise IOError(f"Failed to save images: {str(e)}") | |
| # 使用 try-except 处理 CSV 写入 | |
| try: | |
| with open(f"data/metadatas/meta{task_id}.csv", "a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id","original_path", "prompt", | |
| "agent1","img1_path","agent2","img2_path" | |
| ]) | |
| writer.writerow({ | |
| "id": orig_id, | |
| "original_path": orig_path, | |
| "prompt": prompt, | |
| "agent1": a1, | |
| "img1_path": img1_path, | |
| "agent2": a2, | |
| "img2_path": img2_path | |
| }) | |
| except Exception as e: | |
| # 如果写入CSV失败,清理已保存的图片 | |
| for path in [orig_path, img1_path, img2_path]: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| raise IOError(f"Failed to write metadata: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"Error in save_to_library: {str(e)}") | |
| def generate_and_store(task_id, orig_img, prompt, a1, a2): | |
| try: | |
| if orig_img is None: | |
| return None, None | |
| if not prompt or prompt.strip() == "": | |
| return None, None | |
| if a1 == a2: | |
| return None, None # 不允许选择相同的Agent | |
| out1 = run_agent_on_image(orig_img, prompt, a1) | |
| out2 = run_agent_on_image(orig_img, prompt, a2) | |
| save_to_library(task_id, orig_img, prompt, a1, a2, out1, out2) | |
| return out1, out2 | |
| except Exception as e: | |
| print(f"Error in generate_and_store: {str(e)}") | |
| return None, None | |
| def load_random_record(task_id): | |
| try: | |
| # 检查文件是否存在 | |
| meta_file = f"data/metadatas/meta{task_id}.csv" | |
| if not os.path.exists(meta_file): | |
| return "", None, "Metadata file not found", None, None | |
| # 读取所有记录 | |
| with open(meta_file, "r", encoding="utf-8") as f: | |
| all_records = list(csv.DictReader(f)) | |
| if not all_records: | |
| return "", None, "No records in library", None, None | |
| # 读取最近5分钟内的评测记录 | |
| recent_evaluated_ids = set() | |
| current_time = datetime.now() | |
| eval_file = f"data/evaluations/eval{task_id}.csv" | |
| if os.path.exists(eval_file): | |
| try: | |
| with open(eval_file, "r", encoding="utf-8") as f: | |
| eval_records = list(csv.DictReader(f)) | |
| for record in eval_records: | |
| try: | |
| eval_time = datetime.fromisoformat(record["timestamp"]) | |
| time_diff = (current_time - eval_time).total_seconds() / 60 | |
| if time_diff <= 5: | |
| recent_evaluated_ids.add(record["record_id"]) | |
| except ValueError: | |
| # 跳过无效的时间戳 | |
| continue | |
| except Exception as e: | |
| print(f"Error reading evaluation file: {str(e)}") | |
| available_records = [r for r in all_records if r["id"] not in recent_evaluated_ids] | |
| if not available_records: | |
| return "", None, "All available records have been recently evaluated", None, None | |
| rec = random.choice(available_records) | |
| # 验证图片文件是否存在 | |
| for path in [rec["original_path"], rec["img1_path"], rec["img2_path"]]: | |
| if not os.path.exists(path): | |
| return "", None, f"Image file not found: {path}", None, None | |
| return ( | |
| rec["id"], | |
| rec["original_path"], | |
| rec["prompt"], | |
| rec["img1_path"], | |
| rec["img2_path"] | |
| ) | |
| except Exception as e: | |
| return "", None, f"Error loading record: {str(e)}", None, None | |
| def save_evaluation(task_id, record_id, | |
| a1_follow, a1_creativity, a1_finesse, | |
| a2_follow, a2_creativity, a2_finesse): | |
| try: | |
| # 验证输入 | |
| if not record_id: | |
| return "❌ Invalid record ID", *load_random_record(task_id) | |
| # 验证评分 | |
| scores = [a1_follow, a1_creativity, a1_finesse, | |
| a2_follow, a2_creativity, a2_finesse] | |
| if any(score is None for score in scores): | |
| return "❌ Please complete all evaluations", *load_random_record(task_id) | |
| with open(f"data/evaluations/eval{task_id}.csv", "a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "timestamp", "record_id", | |
| "a1_follow","a1_creativity","a1_finesse", | |
| "a2_follow","a2_creativity","a2_finesse" | |
| ]) | |
| writer.writerow({ | |
| "timestamp": datetime.now().isoformat(), | |
| "record_id": record_id, | |
| "a1_follow": a1_follow, | |
| "a1_creativity": a1_creativity, | |
| "a1_finesse": a1_finesse, | |
| "a2_follow": a2_follow, | |
| "a2_creativity": a2_creativity, | |
| "a2_finesse": a2_finesse | |
| }) | |
| return "✅ Evaluation submitted!", *load_random_record(task_id) | |
| except Exception as e: | |
| return f"❌ Error saving evaluation: {str(e)}", *load_random_record(task_id) | |
| MODEL_CHOICES = ["Model A", "Model B", "Model C"] | |
| TASK_CHOICES = [ | |
| "Image Restoration", | |
| "Image Enhancement", | |
| "Domain & Style Transfer", | |
| "Semantic-Aware Editing", | |
| "Image Composition & Expansion", | |
| "Face & Appeal Editing", | |
| "Steganography & Security Handling" | |
| ] | |
| with gr.Blocks() as demo: | |
| with gr.Tabs(): | |
| # ——— Tab 1: Agent Arena ——— | |
| with gr.TabItem("Agent Arena"): | |
| gr.Markdown("## CV Agent Arena 🎨🤖") | |
| with gr.Row(): | |
| with gr.Column(): | |
| task_dropdown = gr.Dropdown(choices=TASK_CHOICES, label="Task Category", type="index") | |
| original = gr.Image(type="pil", label="Upload Original Image") | |
| prompt = gr.Textbox(lines=2, label="Prompt", | |
| placeholder="e.g. Make it look like a sunny day") | |
| with gr.Column(): | |
| agent1 = gr.Dropdown(choices=MODEL_CHOICES, label="Select Agent 1") | |
| agent2 = gr.Dropdown(choices=MODEL_CHOICES, label="Select Agent 2") | |
| run_btn = gr.Button("Run Agents") | |
| with gr.Row(): | |
| out1 = gr.Image(type="pil", label="Agent 1 Output") | |
| out2 = gr.Image(type="pil", label="Agent 2 Output") | |
| run_btn.click( | |
| fn=generate_and_store, | |
| inputs=[task_dropdown, original, prompt, agent1, agent2], | |
| outputs=[out1, out2], | |
| show_api=False | |
| ) | |
| # ——— Tab 2: Human as Judge ——— | |
| with gr.TabItem("Human as Judge"): | |
| record_id_state = gr.State("") | |
| task_dropdown = gr.Dropdown(choices=TASK_CHOICES, label="Task Category", type="index") | |
| # 原图与 Prompt 并排 | |
| with gr.Row(): | |
| judge_orig = gr.Image(label="Original Image") | |
| judge_prompt = gr.Textbox(label="Prompt", interactive=False) | |
| # 两张结果图并排 | |
| with gr.Row(): | |
| judge_out1 = gr.Image(label="Agent 1 Result") | |
| judge_out2 = gr.Image(label="Agent 2 Result") | |
| # 当选 Task 时加载随机样本 | |
| task_dropdown.change( | |
| fn=load_random_record, | |
| inputs=[task_dropdown], | |
| outputs=[record_id_state, judge_orig, judge_prompt, judge_out1, judge_out2], | |
| show_api=False | |
| ) | |
| with gr.Row(): | |
| gr.Markdown( | |
| "## Please Evaluate the Processed Images from 3 Aspects", | |
| elem_classes=["center-text"] | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| a1_follow = gr.Radio([0,1,2,3,4,5], label="Follow Prompt") | |
| a1_creativity = gr.Radio([0,1,2,3,4,5], label="Creativity") | |
| a1_finesse = gr.Radio([0,1,2,3,4,5], label="Finesse/Detail") | |
| with gr.Column(): | |
| a2_follow = gr.Radio([0,1,2,3,4,5], label="Follow Prompt") | |
| a2_creativity = gr.Radio([0,1,2,3,4,5], label="Creativity") | |
| a2_finesse = gr.Radio([0,1,2,3,4,5], label="Finesse/Detail") | |
| submit_btn = gr.Button("Submit Evaluation") | |
| submit_status = gr.Textbox(label="Status", interactive=False) | |
| submit_btn.click( | |
| fn=save_evaluation, | |
| inputs=[ | |
| task_dropdown, record_id_state, | |
| a1_follow, a1_creativity, a1_finesse, | |
| a2_follow, a2_creativity, a2_finesse | |
| ], | |
| outputs=[ | |
| submit_status, | |
| record_id_state, judge_orig, judge_prompt, judge_out1, judge_out2 | |
| ], | |
| show_api=False | |
| ) | |
| demo.queue() | |
| demo.launch( | |
| share=False, | |
| show_api=False, | |
| ssr_mode=False | |
| ) |