Spaces:
Build error
Build error
| import gradio as gr | |
| import os | |
| import re | |
| import json | |
| import tempfile | |
| import zipfile | |
| from huggingface_hub import hf_hub_download | |
| from llama_cpp import Llama | |
| from llama_cpp.llama_chat_format import Llava15ChatHandler | |
| import base64 | |
| from PIL import Image | |
| from io import BytesIO | |
| # Константы | |
| REPO_ID = "mradermacher/VisualQuality-R1-7B-GGUF" | |
| MODEL_FILE = "VisualQuality-R1-7B.Q4_K_M.gguf" | |
| MMPROJ_FILE = "VisualQuality-R1-7B.mmproj-Q8_0.gguf" | |
| # Промпты | |
| PROMPT = ( | |
| "You are doing the image quality assessment task. Here is the question: " | |
| "What is your overall rating on the quality of this picture? The rating should be a float between 1 and 5, " | |
| "rounded to two decimal places, with 1 representing very poor quality and 5 representing excellent quality." | |
| ) | |
| QUESTION_TEMPLATE_THINKING = "{Question} First output the thinking process in <think> </think> tags and then output the final answer with only one score in <answer> </answer> tags." | |
| QUESTION_TEMPLATE_NO_THINKING = "{Question} Please only output the final answer with only one score in <answer> </answer> tags." | |
| # Глобальные переменные | |
| llm = None | |
| def download_models(): | |
| """Скачивание моделей""" | |
| print("Downloading model files...") | |
| model_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=MODEL_FILE, | |
| ) | |
| print(f"Model downloaded: {model_path}") | |
| mmproj_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=MMPROJ_FILE, | |
| ) | |
| print(f"MMProj downloaded: {mmproj_path}") | |
| return model_path, mmproj_path | |
| def load_model(): | |
| """Загрузка модели""" | |
| global llm | |
| if llm is not None: | |
| return | |
| model_path, mmproj_path = download_models() | |
| print("Loading model...") | |
| # Используем Llava15ChatHandler для vision моделей | |
| chat_handler = Llava15ChatHandler( | |
| clip_model_path=mmproj_path, | |
| verbose=False | |
| ) | |
| llm = Llama( | |
| model_path=model_path, | |
| chat_handler=chat_handler, | |
| n_ctx=4096, | |
| n_threads=4, | |
| n_gpu_layers=0, | |
| verbose=False, | |
| ) | |
| print("Model loaded!") | |
| def image_to_data_uri(image): | |
| """Конвертация PIL Image в data URI""" | |
| if image is None: | |
| return None | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Сжимаем для ускорения | |
| max_size = 768 | |
| if max(image.size) > max_size: | |
| ratio = max_size / max(image.size) | |
| new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) | |
| image = image.resize(new_size, Image.LANCZOS) | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG", quality=85) | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| return f"data:image/jpeg;base64,{img_base64}" | |
| def extract_score(text): | |
| """Извлечение оценки""" | |
| try: | |
| matches = re.findall(r'<answer>(.*?)</answer>', text, re.DOTALL) | |
| if matches: | |
| answer = matches[-1].strip() | |
| else: | |
| answer = text.strip() | |
| score_match = re.search(r'\d+(\.\d+)?', answer) | |
| if score_match: | |
| score = float(score_match.group()) | |
| return min(max(score, 1.0), 5.0) | |
| except: | |
| pass | |
| return None | |
| def extract_thinking(text): | |
| """Извлечение мышления""" | |
| matches = re.findall(r'<think>(.*?)</think>', text, re.DOTALL) | |
| if matches: | |
| return matches[-1].strip() | |
| return "" | |
| def score_single_image(image, use_thinking=True): | |
| """Оценка одного изображения""" | |
| global llm | |
| load_model() | |
| if image is None: | |
| return "❌ Upload an image first", "", "" | |
| template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING | |
| prompt_text = template.format(Question=PROMPT) | |
| image_uri = image_to_data_uri(image) | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": image_uri}}, | |
| {"type": "text", "text": prompt_text} | |
| ] | |
| } | |
| ] | |
| # Стриминг | |
| generated_text = "" | |
| try: | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=2048 if use_thinking else 256, | |
| temperature=0.7, | |
| top_p=0.95, | |
| stream=True, | |
| ) | |
| for chunk in response: | |
| delta = chunk.get("choices", [{}])[0].get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| generated_text += content | |
| thinking = extract_thinking(generated_text) | |
| score = extract_score(generated_text) | |
| if score is not None: | |
| score_display = f"⭐ **Score: {score:.2f} / 5.00**" | |
| else: | |
| score_display = "*Analyzing...*" | |
| yield generated_text, thinking, score_display | |
| # Финальный результат | |
| final_score = extract_score(generated_text) | |
| final_thinking = extract_thinking(generated_text) if use_thinking else "" | |
| if final_score is not None: | |
| score_display = f"⭐ **Quality Score: {final_score:.2f} / 5.00**\n\n📊 **For Leaderboard:** `{final_score:.2f}`" | |
| else: | |
| score_display = "❌ Could not extract score" | |
| yield generated_text, final_thinking, score_display | |
| except Exception as e: | |
| yield f"❌ Error: {str(e)}", "", "" | |
| def process_batch(files, use_thinking=True, progress=gr.Progress()): | |
| """Batch processing""" | |
| global llm | |
| load_model() | |
| if not files: | |
| return "❌ No files", None | |
| results = [] | |
| template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING | |
| prompt_text = template.format(Question=PROMPT) | |
| for i, file in enumerate(files): | |
| try: | |
| if hasattr(file, 'name'): | |
| image = Image.open(file.name) | |
| filename = os.path.basename(file.name) | |
| else: | |
| image = Image.open(file) | |
| filename = f"image_{i+1}.jpg" | |
| image_uri = image_to_data_uri(image) | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": image_uri}}, | |
| {"type": "text", "text": prompt_text} | |
| ] | |
| } | |
| ] | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=2048 if use_thinking else 256, | |
| temperature=0.7, | |
| top_p=0.95, | |
| ) | |
| generated_text = response["choices"][0]["message"]["content"] | |
| score = extract_score(generated_text) | |
| thinking = extract_thinking(generated_text) if use_thinking else "" | |
| results.append({ | |
| "filename": filename, | |
| "score": score if score else "N/A", | |
| "thinking": thinking, | |
| "raw_output": generated_text | |
| }) | |
| progress((i + 1) / len(files), desc=f"Processed {i+1}/{len(files)}") | |
| except Exception as e: | |
| results.append({ | |
| "filename": filename if 'filename' in dir() else f"image_{i+1}", | |
| "score": "ERROR", | |
| "thinking": "", | |
| "raw_output": str(e) | |
| }) | |
| # Создаём файлы | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| # TXT для лидерборда | |
| txt_file = os.path.join(tmpdir, "scores.txt") | |
| with open(txt_file, "w") as f: | |
| for r in results: | |
| score_str = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) | |
| f.write(f"{r['filename']}\t{score_str}\n") | |
| # JSON | |
| json_file = os.path.join(tmpdir, "results.json") | |
| with open(json_file, "w") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| # CSV | |
| csv_file = os.path.join(tmpdir, "scores.csv") | |
| with open(csv_file, "w") as f: | |
| f.write("filename,score\n") | |
| for r in results: | |
| score_str = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) | |
| f.write(f"{r['filename']},{score_str}\n") | |
| # ZIP | |
| zip_path = os.path.join(tmpdir, "results.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(txt_file, "scores.txt") | |
| zipf.write(json_file, "results.json") | |
| zipf.write(csv_file, "scores.csv") | |
| # Копируем | |
| final_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") | |
| with open(zip_path, 'rb') as f: | |
| final_zip.write(f.read()) | |
| final_zip.close() | |
| # Summary | |
| valid_scores = [r['score'] for r in results if isinstance(r['score'], float)] | |
| avg = sum(valid_scores)/len(valid_scores) if valid_scores else 0 | |
| summary = f"""## ✅ Done! | |
| **Processed:** {len(results)} images | |
| **Success:** {len(valid_scores)} | |
| **Failed:** {len(results) - len(valid_scores)} | |
| **Average:** {avg:.2f} | |
| **Min:** {min(valid_scores):.2f if valid_scores else 'N/A'} | |
| **Max:** {max(valid_scores):.2f if valid_scores else 'N/A'} | |
| ### Preview: | |
| | File | Score | | |
| |------|-------| | |
| """ + "\n".join([f"| {r['filename'][:30]} | {r['score']:.2f if isinstance(r['score'], float) else r['score']} |" for r in results[:10]]) | |
| return summary, final_zip.name | |
| # Интерфейс | |
| with gr.Blocks(title="VisualQuality-R1") as demo: | |
| gr.Markdown(""" | |
| # 🎨 VisualQuality-R1 (GGUF/CPU) | |
| Image Quality Assessment | CPU Mode (~30-60 sec/image) | |
| [](https://arxiv.org/abs/2505.14460) | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("📷 Single Image"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_input = gr.Image(label="Upload", type="pil", height=350) | |
| thinking_cb = gr.Checkbox(label="🧠 Thinking Mode", value=True) | |
| btn = gr.Button("🔍 Analyze", variant="primary", size="lg") | |
| with gr.Column(): | |
| score_out = gr.Markdown("*Upload image*") | |
| thinking_out = gr.Textbox(label="Thinking", lines=6) | |
| raw_out = gr.Textbox(label="Output", lines=8) | |
| btn.click(score_single_image, [img_input, thinking_cb], [raw_out, thinking_out, score_out]) | |
| with gr.TabItem("📁 Batch (1000+ images)"): | |
| gr.Markdown("### Upload multiple images for leaderboard submission") | |
| with gr.Row(): | |
| with gr.Column(): | |
| batch_files = gr.File(label="Images", file_count="multiple", file_types=["image"]) | |
| batch_thinking = gr.Checkbox(label="🧠 Thinking (slower)", value=False) | |
| batch_btn = gr.Button("🚀 Process All", variant="primary", size="lg") | |
| with gr.Column(): | |
| batch_summary = gr.Markdown("*Upload and click Process*") | |
| batch_download = gr.File(label="📥 Download Results") | |
| batch_btn.click(process_batch, [batch_files, batch_thinking], [batch_summary, batch_download]) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=5) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |