Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import os | |
| import json | |
| import base64 | |
| import requests | |
| import gradio as gr | |
| from PIL import Image | |
| from io import BytesIO | |
| import pypdfium2 as pdfium | |
| ENDPOINT = os.environ.get("VLLM_ENDPOINT") | |
| MODEL = os.environ.get("VLLM_MODEL") | |
| if not ENDPOINT or not MODEL: | |
| raise ValueError("VLLM_ENDPOINT and VLLM_MODEL environment variables must be set.") | |
| def image_to_base64(image): | |
| buffered = BytesIO() | |
| if image.mode == 'RGBA': | |
| image = image.convert('RGB') | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| def render_pdf_page(page, max_resolution=1540, scale=2.77): | |
| width, height = page.get_size() | |
| pixel_width = width * scale | |
| pixel_height = height * scale | |
| resize_factor = min(1, max_resolution / pixel_width, max_resolution / pixel_height) | |
| target_scale = scale * resize_factor | |
| return page.render(scale=target_scale, rev_byteorder=True).to_pil() | |
| def process_pdf(pdf_path, page_num=1): | |
| pdf = pdfium.PdfDocument(pdf_path) | |
| total_pages = len(pdf) | |
| page_idx = min(max(int(page_num) - 1, 0), total_pages - 1) | |
| page = pdf[page_idx] | |
| img = render_pdf_page(page) | |
| pdf.close() | |
| return img, total_pages, page_idx + 1 | |
| def process_input(file_input, temperature, page_num): | |
| if file_input is None: | |
| yield "Please upload an image or PDF first.", "", "", None, gr.update() | |
| return | |
| image_to_process = None | |
| page_info = "" | |
| file_path = file_input if isinstance(file_input, str) else file_input.name | |
| if file_path.lower().endswith('.pdf'): | |
| try: | |
| image_to_process, total_pages, actual_page = process_pdf(file_path, int(page_num)) | |
| page_info = f"Processing page {actual_page} of {total_pages}" | |
| except Exception as e: | |
| yield f"Error processing PDF", "", "", None, gr.update() | |
| return | |
| else: | |
| try: | |
| image_to_process = Image.open(file_path) | |
| page_info = "Processing image" | |
| except Exception as e: | |
| yield f"Error opening image", "", "", None, gr.update() | |
| return | |
| content = [ | |
| {"type": "text", "text": ""}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{image_to_base64(image_to_process)}"} | |
| } | |
| ] | |
| payload = { | |
| "model": MODEL, | |
| "messages": [{"role": "user", "content": content}], | |
| "temperature": temperature, | |
| "stream": True | |
| } | |
| try: | |
| response = requests.post( | |
| ENDPOINT, | |
| headers={"Content-Type": "application/json"}, | |
| data=json.dumps(payload), | |
| stream=True | |
| ) | |
| response.raise_for_status() | |
| accumulated_response = "" | |
| first_chunk = True | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode('utf-8') | |
| if line.startswith('data: '): | |
| line = line[6:] | |
| if line.strip() == '[DONE]': | |
| break | |
| try: | |
| chunk = json.loads(line) | |
| if 'choices' in chunk and len(chunk['choices']) > 0: | |
| delta = chunk['choices'][0].get('delta', {}) | |
| content_delta = delta.get('content', '') | |
| if content_delta: | |
| accumulated_response += content_delta | |
| if first_chunk: | |
| yield accumulated_response, accumulated_response, page_info, image_to_process, gr.update() | |
| first_chunk = False | |
| else: | |
| yield accumulated_response, accumulated_response, page_info, gr.update(), gr.update() | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| error_msg = f"Error" | |
| yield error_msg, error_msg, page_info, image_to_process, gr.update() | |
| def update_slider(file_input): | |
| if file_input is None: | |
| return gr.update(maximum=20, value=1) | |
| file_path = file_input if isinstance(file_input, str) else file_input.name | |
| if file_path.lower().endswith('.pdf'): | |
| try: | |
| pdf = pdfium.PdfDocument(file_path) | |
| total_pages = len(pdf) | |
| pdf.close() | |
| return gr.update(maximum=total_pages, value=1) | |
| except: | |
| return gr.update(maximum=20, value=1) | |
| else: | |
| return gr.update(maximum=1, value=1) | |
| with gr.Blocks(title="π Image/PDF OCR", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π Image/PDF to Text Extraction | |
| **π‘ How to use:** | |
| 1. Upload an image or PDF | |
| 2. For PDFs: select which page to extract (1-20) | |
| 3. Adjust temperature if needed | |
| 4. Click "Extract Text" | |
| **Note:** The Markdown rendering for tables is not always correct, check the raw output for complex tables! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="πΌοΈ Upload Image or PDF", | |
| file_types=[".pdf", ".png", ".jpg", ".jpeg"], | |
| type="filepath" | |
| ) | |
| rendered_image = gr.Image( | |
| label="π Preview", | |
| type="pil", | |
| height=400, | |
| interactive=False | |
| ) | |
| num_pages = gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| value=1, | |
| step=1, | |
| label="PDF: Page Number", | |
| info="Select which page to extract" | |
| ) | |
| page_info = gr.Textbox( | |
| label="Processing Info", | |
| value="", | |
| interactive=False | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.2, | |
| step=0.05, | |
| label="Temperature" | |
| ) | |
| submit_btn = gr.Button("Extract Text", variant="primary") | |
| clear_btn = gr.Button("Clear", variant="secondary") | |
| with gr.Column(scale=2): | |
| output_text = gr.Markdown( | |
| label="π Extracted Text (Rendered)", | |
| value="*Extracted text will appear here...*" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| raw_output = gr.Textbox( | |
| label="Raw Markdown Output", | |
| placeholder="Raw text will appear here...", | |
| lines=20, | |
| max_lines=30, | |
| show_copy_button=True | |
| ) | |
| submit_btn.click( | |
| fn=process_input, | |
| inputs=[file_input, temperature, num_pages], | |
| outputs=[output_text, raw_output, page_info, rendered_image, num_pages] | |
| ) | |
| file_input.change( | |
| fn=update_slider, | |
| inputs=[file_input], | |
| outputs=[num_pages] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: (None, "*Extracted text will appear here...*", "", "", None, 1), | |
| outputs=[file_input, output_text, raw_output, page_info, rendered_image, num_pages] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |