Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import tempfile | |
| import zipfile | |
| from datetime import datetime | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| # Program A imports | |
| from utils import MEGABenchEvalDataLoader | |
| from constants import * # This is assumed to define CITATION_BUTTON_TEXT, CITATION_BUTTON_LABEL, TABLE_INTRODUCTION, LEADERBOARD_INTRODUCTION, DATA_INFO, SUBMIT_INTRODUCTION, BASE_MODEL_GROUPS, etc. | |
| # Program B imports | |
| import spaces | |
| from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, Qwen2_5_VLForConditionalGeneration | |
| from qwen_vl_utils import process_vision_info | |
| from gliner import GLiNER | |
| # ---------------------------------------------------------------- | |
| # Combined CSS | |
| # ---------------------------------------------------------------- | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| with open(os.path.join(current_dir, "static", "css", "style.css"), "r") as f: | |
| base_css = f.read() | |
| with open(os.path.join(current_dir, "static", "css", "table.css"), "r") as f: | |
| table_css = f.read() | |
| css_program_b = """ | |
| /* Program B CSS */ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| margin: 0 auto; | |
| padding: 20px; | |
| background-color: #f8f9fa; | |
| } | |
| .tabs { | |
| border-radius: 8px; | |
| background: white; | |
| padding: 20px; | |
| box-shadow: 0 2px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| .input-container, .output-container { | |
| background: white; | |
| border-radius: 8px; | |
| padding: 15px; | |
| margin: 10px 0; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05); | |
| } | |
| .submit-btn { | |
| background-color: #2d31fa !important; | |
| border: none !important; | |
| padding: 8px 20px !important; | |
| border-radius: 6px !important; | |
| color: white !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .submit-btn:hover { | |
| background-color: #1f24c7 !important; | |
| transform: translateY(-1px); | |
| } | |
| #output { | |
| height: 500px; | |
| overflow: auto; | |
| border: 1px solid #e0e0e0; | |
| border-radius: 6px; | |
| padding: 15px; | |
| background: #ffffff; | |
| font-family: 'Arial', sans-serif; | |
| } | |
| .gr-dropdown { | |
| border-radius: 6px !important; | |
| border: 1px solid #e0e0e0 !important; | |
| } | |
| .gr-image-input { | |
| border: 2px dashed #ccc; | |
| border-radius: 8px; | |
| padding: 20px; | |
| transition: all 0.3s ease; | |
| } | |
| .gr-image-input:hover { | |
| border-color: #2d31fa; | |
| } | |
| """ | |
| css_global = base_css + "\n" + table_css + "\n" + css_program_b | |
| # ---------------------------------------------------------------- | |
| # Program A Global Initializations | |
| # ---------------------------------------------------------------- | |
| default_loader = MEGABenchEvalDataLoader("./static/eval_results/Default") | |
| si_loader = MEGABenchEvalDataLoader("./static/eval_results/SI") | |
| # ---------------------------------------------------------------- | |
| # Program B Global Initializations | |
| # ---------------------------------------------------------------- | |
| gliner_model = GLiNER.from_pretrained("knowledgator/modern-gliner-bi-large-v1.0") | |
| DEFAULT_NER_LABELS = "person, organization, location, date, event" | |
| models = { | |
| "Qwen/Qwen2.5-VL-7B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| "Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True, torch_dtype="auto" | |
| ).cuda().eval() | |
| } | |
| processors = { | |
| "Qwen/Qwen2.5-VL-7B-Instruct": AutoProcessor.from_pretrained( | |
| "Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True | |
| ) | |
| } | |
| user_prompt = '<|user|>\n' | |
| assistant_prompt = '<|assistant|>\n' | |
| prompt_suffix = "<|end|>\n" | |
| # A simple metadata container for OCR results and entity information. | |
| class TextWithMetadata(list): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args) | |
| self.original_text = kwargs.get('original_text', '') | |
| self.entities = kwargs.get('entities', []) | |
| # ---------------------------------------------------------------- | |
| # UI DEFINITION (placed at the top) | |
| # ---------------------------------------------------------------- | |
| with gr.Blocks(css=css_global) as demo: | |
| with gr.Tabs(): | |
| # ------------------------- | |
| # Tab 1: Dashboard (Program A) | |
| # ------------------------- | |
| with gr.TabItem("Dashboard"): | |
| with gr.Tabs(elem_classes="tab-buttons") as dashboard_tabs: | |
| # --- MEGA-Bench Leaderboard Tab --- | |
| with gr.TabItem("📊 MEGA-Bench"): | |
| # Inject table CSS (will be updated when the table is refreshed) | |
| css_style = gr.HTML(f"<style>{base_css}\n{table_css}</style>", visible=False) | |
| # Define captions for default vs. single-image tables | |
| default_caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks " | |
| "of each keyword. <br> The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by " | |
| "rule-based metrics, and the Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a " | |
| "VLM judge (we use GPT-4o-0806). <br> Different from the results in our paper, we only use the Core " | |
| "results with CoT prompting here for clarity and compatibility with the released data. <br> " | |
| "$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} " | |
| "\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$ <br> * indicates self-reported " | |
| "results from the model authors.") | |
| single_image_caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks " | |
| "in each keyword. <br> This subset contains 273 single-image tasks from the Core set and 42 single-image tasks " | |
| "from the Open-ended set. For open-source models, we drop the image input in the 1-shot demonstration example so that " | |
| "the entire query contains a single image only. <br> Compared to the default table, some models with only " | |
| "single-image support are added.") | |
| caption_component = gr.Markdown( | |
| value=default_caption, | |
| elem_classes="table-caption", | |
| latex_delimiters=[{"left": "$", "right": "$", "display": False}], | |
| ) | |
| with gr.Row(): | |
| super_group_selector = gr.Radio( | |
| choices=list(default_loader.SUPER_GROUPS.keys()), | |
| label="Select a dimension to display breakdown results. We use different column colors to distinguish the overall benchmark scores and breakdown results.", | |
| value=list(default_loader.SUPER_GROUPS.keys())[0] | |
| ) | |
| model_group_selector = gr.Radio( | |
| choices=list(BASE_MODEL_GROUPS.keys()), | |
| label="Select a model group", | |
| value="All" | |
| ) | |
| initial_headers, initial_data = default_loader.get_leaderboard_data( | |
| list(default_loader.SUPER_GROUPS.keys())[0], "All" | |
| ) | |
| data_component = gr.Dataframe( | |
| value=initial_data, | |
| headers=initial_headers, | |
| datatype=["number", "html"] + ["number"] * (len(initial_headers) - 2), | |
| interactive=True, | |
| elem_classes="custom-dataframe", | |
| max_height=2400, | |
| column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(initial_headers) - 5), | |
| ) | |
| with gr.Row(): | |
| with gr.Accordion("Citation", open=False): | |
| citation_button = gr.Textbox( | |
| value=CITATION_BUTTON_TEXT, | |
| label=CITATION_BUTTON_LABEL, | |
| elem_id="citation-button", | |
| lines=10, | |
| ) | |
| gr.Markdown(TABLE_INTRODUCTION) | |
| with gr.Row(): | |
| table_selector = gr.Radio( | |
| choices=["Default", "Single Image"], | |
| label="Select table to display. Default: all MEGA-Bench tasks; Single Image: single-image tasks only.", | |
| value="Default" | |
| ) | |
| refresh_button = gr.Button("Refresh") | |
| # Wire up event handlers (functions defined below) | |
| refresh_button.click( | |
| fn=update_table_and_caption, | |
| inputs=[table_selector, super_group_selector, model_group_selector], | |
| outputs=[data_component, caption_component, css_style] | |
| ) | |
| super_group_selector.change( | |
| fn=update_table_and_caption, | |
| inputs=[table_selector, super_group_selector, model_group_selector], | |
| outputs=[data_component, caption_component, css_style] | |
| ) | |
| model_group_selector.change( | |
| fn=update_table_and_caption, | |
| inputs=[table_selector, super_group_selector, model_group_selector], | |
| outputs=[data_component, caption_component, css_style] | |
| ) | |
| table_selector.change( | |
| fn=update_selectors, | |
| inputs=[table_selector], | |
| outputs=[super_group_selector, model_group_selector] | |
| ).then( | |
| fn=update_table_and_caption, | |
| inputs=[table_selector, super_group_selector, model_group_selector], | |
| outputs=[data_component, caption_component, css_style] | |
| ) | |
| # --- Introduction Tab --- | |
| with gr.TabItem("📚 Introduction"): | |
| gr.Markdown(LEADERBOARD_INTRODUCTION) | |
| # --- Data Information Tab --- | |
| with gr.TabItem("📝 Data Information"): | |
| gr.Markdown(DATA_INFO, elem_classes="markdown-text") | |
| # --- Submit Tab --- | |
| with gr.TabItem("🚀 Submit"): | |
| with gr.Row(): | |
| gr.Markdown(SUBMIT_INTRODUCTION, elem_classes="markdown-text") | |
| # ------------------------- | |
| # Tab 2: Image Processing (Program B) | |
| # ------------------------- | |
| with gr.TabItem("Image Processing"): | |
| # A default image is shown for context. | |
| gr.Image("Caracal.jpg", interactive=False) | |
| # It is important to create a state variable to store the OCR/NER result. | |
| ocr_state = gr.State() | |
| with gr.Tab(label="Image Input", elem_classes="tabs"): | |
| with gr.Row(): | |
| with gr.Column(elem_classes="input-container"): | |
| input_img = gr.Image(label="Input Picture", elem_classes="gr-image-input") | |
| model_selector = gr.Dropdown( | |
| choices=list(models.keys()), | |
| label="Model", | |
| value="Qwen/Qwen2.5-VL-7B-Instruct", | |
| elem_classes="gr-dropdown" | |
| ) | |
| with gr.Row(): | |
| ner_checkbox = gr.Checkbox(label="Run Named Entity Recognition", value=False) | |
| ner_labels = gr.Textbox( | |
| label="NER Labels (comma-separated)", | |
| value=DEFAULT_NER_LABELS, | |
| visible=False | |
| ) | |
| submit_btn = gr.Button(value="Submit", elem_classes="submit-btn") | |
| with gr.Column(elem_classes="output-container"): | |
| output_text = gr.HighlightedText(label="Output Text", elem_id="output") | |
| # Toggle visibility of the NER labels textbox. | |
| ner_checkbox.change( | |
| lambda x: gr.update(visible=x), | |
| inputs=[ner_checkbox], | |
| outputs=[ner_labels] | |
| ) | |
| submit_btn.click( | |
| fn=run_example, | |
| inputs=[input_img, model_selector, ner_checkbox, ner_labels], | |
| outputs=[output_text, ocr_state] | |
| ) | |
| with gr.Row(): | |
| filename = gr.Textbox(label="Save filename (without extension)", placeholder="Enter filename to save") | |
| download_btn = gr.Button("Download Image & Text", elem_classes="submit-btn") | |
| download_output = gr.File(label="Download") | |
| download_btn.click( | |
| fn=create_zip, | |
| inputs=[input_img, filename, ocr_state], | |
| outputs=[download_output] | |
| ) | |
| # ---------------------------------------------------------------- | |
| # FUNCTION DEFINITIONS | |
| # ---------------------------------------------------------------- | |
| def update_table_and_caption(table_type, super_group, model_group): | |
| """ | |
| Updates the leaderboard DataFrame, caption and CSS based on the table type and selectors. | |
| """ | |
| if table_type == "Default": | |
| headers, data = default_loader.get_leaderboard_data(super_group, model_group) | |
| caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks " | |
| "of each keyword. <br> The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by rule-based metrics, and the " | |
| "Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a VLM judge (we use GPT-4o-0806). <br> " | |
| "Different from the results in our paper, we only use the Core results with CoT prompting here for clarity and compatibility " | |
| "with the released data. <br> $\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} " | |
| "\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$ <br> * indicates self-reported results from the model authors.") | |
| else: # Single-image table | |
| headers, data = si_loader.get_leaderboard_data(super_group, model_group) | |
| caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks " | |
| "in each keyword. <br> This subset contains 273 single-image tasks from the Core set and 42 single-image tasks from the Open-ended set. " | |
| "For open-source models, we drop the image input in the 1-shot demonstration example so that the entire query contains a single image only. <br> " | |
| "Compared to the default table, some models with only single-image support are added.") | |
| dataframe = gr.Dataframe( | |
| value=data, | |
| headers=headers, | |
| datatype=["number", "html"] + ["number"] * (len(headers) - 2), | |
| interactive=True, | |
| column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(headers) - 5), | |
| ) | |
| style_html = f"<style>{base_css}\n{table_css}</style>" | |
| return dataframe, caption, style_html | |
| def update_selectors(table_type): | |
| """ | |
| Updates the options in the radio selectors based on the selected table type. | |
| """ | |
| loader = default_loader if table_type == "Default" else si_loader | |
| return [gr.Radio.update(choices=list(loader.SUPER_GROUPS.keys())), | |
| gr.Radio.update(choices=list(loader.MODEL_GROUPS.keys()))] | |
| def array_to_image_path(image_array): | |
| """ | |
| Converts a NumPy image array to a PIL Image, saves it to disk, and returns its path. | |
| """ | |
| img = Image.fromarray(np.uint8(image_array)) | |
| img.thumbnail((1024, 1024)) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"image_{timestamp}.png" | |
| img.save(filename) | |
| return os.path.abspath(filename) | |
| def run_example(image, model_id="Qwen/Qwen2.5-VL-7B-Instruct", run_ner=False, ner_labels=DEFAULT_NER_LABELS): | |
| """ | |
| Given an input image, uses the selected VL model to perform OCR (and optionally NER). | |
| Returns the highlighted text and stores the raw OCR output in state. | |
| """ | |
| text_input = "Convert the image to text." | |
| image_path = array_to_image_path(image) | |
| model = models[model_id] | |
| processor = processors[model_id] | |
| prompt = f"{user_prompt}<|image_1|>\n{text_input}{prompt_suffix}{assistant_prompt}" | |
| image_pil = Image.fromarray(image).convert("RGB") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image_path}, | |
| {"type": "text", "text": text_input}, | |
| ], | |
| } | |
| ] | |
| # Prepare text and vision inputs | |
| text_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor( | |
| text=[text_full], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| inputs = inputs.to("cuda") | |
| # Generate model output | |
| generated_ids = model.generate(**inputs, max_new_tokens=1024) | |
| generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] | |
| output_text = processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| ) | |
| ocr_text = output_text[0] | |
| if run_ner: | |
| ner_results = gliner_model.predict_entities(ocr_text, ner_labels.split(","), threshold=0.3) | |
| highlighted_text = [] | |
| last_end = 0 | |
| for entity in sorted(ner_results, key=lambda x: x["start"]): | |
| if last_end < entity["start"]: | |
| highlighted_text.append((ocr_text[last_end:entity["start"]], None)) | |
| highlighted_text.append((ocr_text[entity["start"]:entity["end"]], entity["label"])) | |
| last_end = entity["end"] | |
| if last_end < len(ocr_text): | |
| highlighted_text.append((ocr_text[last_end:], None)) | |
| result = TextWithMetadata(highlighted_text, original_text=ocr_text, entities=ner_results) | |
| return result, result # one for display, one for state | |
| result = TextWithMetadata([(ocr_text, None)], original_text=ocr_text, entities=[]) | |
| return result, result | |
| def create_zip(image, fname, ocr_result): | |
| """ | |
| Creates a zip file containing the saved image, the OCR text, and a JSON of the OCR output. | |
| """ | |
| if not fname or image is None: | |
| return None | |
| try: | |
| if isinstance(image, np.ndarray): | |
| image_pil = Image.fromarray(image) | |
| elif isinstance(image, Image.Image): | |
| image_pil = image | |
| else: | |
| return None | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| img_path = os.path.join(temp_dir, f"{fname}.png") | |
| image_pil.save(img_path) | |
| original_text = ocr_result.original_text if ocr_result else "" | |
| txt_path = os.path.join(temp_dir, f"{fname}.txt") | |
| with open(txt_path, 'w', encoding='utf-8') as f: | |
| f.write(original_text) | |
| json_data = { | |
| "text": original_text, | |
| "entities": ocr_result.entities if ocr_result else [], | |
| "image_file": f"{fname}.png" | |
| } | |
| json_path = os.path.join(temp_dir, f"{fname}.json") | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(json_data, f, indent=2, ensure_ascii=False) | |
| output_dir = "downloads" | |
| os.makedirs(output_dir, exist_ok=True) | |
| zip_path = os.path.join(output_dir, f"{fname}.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(img_path, os.path.basename(img_path)) | |
| zipf.write(txt_path, os.path.basename(txt_path)) | |
| zipf.write(json_path, os.path.basename(json_path)) | |
| return zip_path | |
| except Exception as e: | |
| print(f"Error creating zip: {str(e)}") | |
| return None | |
| # ---------------------------------------------------------------- | |
| # Launch the merged Gradio app | |
| # ---------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| demo.queue(api_open=False) | |
| demo.launch(debug=True) | |