Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from pathlib import Path | |
| import os | |
| import time | |
| from typing import Dict, Any, Tuple, Optional, List | |
| import tempfile | |
| import io | |
| # PDF processing | |
| try: | |
| from pdf2image import convert_from_bytes, convert_from_path | |
| PDF_AVAILABLE = True | |
| except ImportError: | |
| PDF_AVAILABLE = False | |
| # Import configuration | |
| from config import * | |
| # Global variables to store model (similar to Streamlit's session state) | |
| model_cache = { | |
| 'model': None, | |
| 'processor': None, | |
| 'device': None, | |
| 'loaded': False | |
| } | |
| def load_florence_model(): | |
| """Load Florence-2 model and processor on-demand""" | |
| if model_cache['loaded']: | |
| return model_cache['model'], model_cache['processor'], model_cache['device'] | |
| try: | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| device = "cpu" if FORCE_CPU else ("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Loading Florence-2 model on {device}...") | |
| # Load model with compatibility fixes | |
| model = AutoModelForCausalLM.from_pretrained( | |
| FLORENCE_MODEL_ID, | |
| torch_dtype=torch.float16 if (torch.cuda.is_available() and not FORCE_CPU) else torch.float32, | |
| trust_remote_code=True, | |
| attn_implementation="eager" # Use eager attention for compatibility | |
| ).to(device) | |
| # Fix for transformers compatibility issue | |
| if hasattr(model, 'config'): | |
| model.config.use_cache = False | |
| processor = AutoProcessor.from_pretrained(FLORENCE_MODEL_ID, trust_remote_code=True) | |
| model_cache['model'] = model | |
| model_cache['processor'] = processor | |
| model_cache['device'] = device | |
| model_cache['loaded'] = True | |
| print(f"β Model loaded successfully on {device}") | |
| return model, processor, device | |
| except Exception as e: | |
| print(f"Failed to load Florence-2 model: {e}") | |
| return None, None, None | |
| def analyze_image(image: Image.Image, task_type: str) -> Dict[str, Any]: | |
| """Analyze image with Florence-2 model""" | |
| # Load model if not already loaded | |
| model, processor, device = load_florence_model() | |
| if not model or not processor: | |
| return {"error": "Model not loaded", "success": False} | |
| try: | |
| task_config = FLORENCE_TASKS.get(task_type, FLORENCE_TASKS["detailed_caption"]) | |
| task_prompt = task_config["prompt"] | |
| # Resize image if too large | |
| if image.size[0] > MAX_IMAGE_SIZE[0] or image.size[1] > MAX_IMAGE_SIZE[1]: | |
| image.thumbnail(MAX_IMAGE_SIZE, Image.Resampling.LANCZOS) | |
| inputs = processor(text=task_prompt, images=image, return_tensors="pt").to(device) | |
| generated_ids = model.generate( | |
| input_ids=inputs["input_ids"], | |
| pixel_values=inputs["pixel_values"], | |
| max_new_tokens=task_config["max_tokens"], | |
| num_beams=3, | |
| do_sample=False | |
| ) | |
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
| parsed_answer = processor.post_process_generation( | |
| generated_text, | |
| task=task_prompt, | |
| image_size=(image.width, image.height) | |
| ) | |
| return { | |
| "parsed_results": parsed_answer, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return {"error": f"Analysis failed: {str(e)}", "success": False} | |
| def draw_bounding_boxes(image: Image.Image, results: Dict[str, Any]) -> Image.Image: | |
| """Draw bounding boxes and labels on image""" | |
| if not results.get("success", False): | |
| return image | |
| annotated_image = image.copy() | |
| draw = ImageDraw.Draw(annotated_image) | |
| try: | |
| font = ImageFont.load_default() | |
| parsed_results = results.get("parsed_results", {}) | |
| if "bboxes" in parsed_results and "labels" in parsed_results: | |
| bboxes = parsed_results["bboxes"] | |
| labels = parsed_results["labels"] | |
| for i, (bbox, label) in enumerate(zip(bboxes, labels)): | |
| color = BBOX_COLORS[i % len(BBOX_COLORS)] | |
| x1, y1, x2, y2 = bbox | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=BBOX_WIDTH) | |
| draw.text((x1, max(y1-20, 0)), label[:30], fill=color, font=font) | |
| except Exception as e: | |
| print(f"Error drawing annotations: {e}") | |
| return annotated_image | |
| def process_pdf(pdf_file) -> List[Image.Image]: | |
| """Convert PDF to images""" | |
| if not PDF_AVAILABLE: | |
| raise ValueError("PDF processing not available. Please install pdf2image.") | |
| try: | |
| # Convert PDF to images | |
| if hasattr(pdf_file, 'read'): | |
| # File object | |
| pdf_bytes = pdf_file.read() | |
| images = convert_from_bytes(pdf_bytes, dpi=PDF_DPI) | |
| else: | |
| # File path | |
| images = convert_from_path(pdf_file, dpi=PDF_DPI) | |
| # Limit number of pages | |
| if len(images) > MAX_PDF_PAGES: | |
| images = images[:MAX_PDF_PAGES] | |
| return images | |
| except Exception as e: | |
| raise ValueError(f"Failed to process PDF: {str(e)}") | |
| def format_results_text(results: Dict[str, Any], task_type: str) -> str: | |
| """Format analysis results as text""" | |
| if not results.get("success", False): | |
| return f"β Analysis failed: {results.get('error', 'Unknown error')}" | |
| parsed = results.get("parsed_results", {}) | |
| if task_type == "detailed_caption": | |
| if isinstance(parsed, dict) and "detailed_caption" in parsed: | |
| return f"π **Caption:** {parsed['detailed_caption']}" | |
| elif isinstance(parsed, str): | |
| return f"π **Caption:** {parsed}" | |
| elif task_type == "object_detection": | |
| if "labels" in parsed and parsed["labels"]: | |
| labels = parsed["labels"] | |
| bbox_count = len(labels) | |
| labels_text = ', '.join(labels[:10]) | |
| if len(labels) > 10: | |
| labels_text += f" ...and {len(labels) - 10} more" | |
| return f"π― **Detected Objects ({bbox_count}):** {labels_text}" | |
| elif task_type == "ocr": | |
| if "text" in parsed: | |
| ocr_text = parsed.get("text", "") | |
| if ocr_text: | |
| return f"π€ **Extracted Text:**\n{ocr_text}" | |
| else: | |
| return "π€ **OCR Result:** No text detected in the image" | |
| elif task_type == "dense_captioning": | |
| if "labels" in parsed and parsed["labels"]: | |
| captions = parsed["labels"] | |
| return f"π **Region Captions:**\n" + '\n'.join([f"β’ {cap}" for cap in captions[:5]]) | |
| return "β Analysis completed successfully!" | |
| def process_uploaded_file(file_path: str) -> Tuple[Image.Image, str]: | |
| """Process uploaded file (image or PDF) and return first image""" | |
| if file_path is None: | |
| return None, "Please upload a file first." | |
| try: | |
| file_extension = Path(file_path).suffix.lower() | |
| if file_extension == '.pdf': | |
| if not PDF_AVAILABLE: | |
| return None, "PDF processing not available. Please upload an image instead." | |
| # Convert PDF to images | |
| images = process_pdf(file_path) | |
| if not images: | |
| return None, "No images found in PDF." | |
| # Use the first page for now | |
| image = images[0] | |
| status = f"β PDF processed successfully. Showing page 1 of {len(images)}." | |
| elif file_extension in ['.png', '.jpg', '.jpeg']: | |
| # Load image | |
| image = Image.open(file_path).convert("RGB") | |
| status = "β Image loaded successfully." | |
| else: | |
| return None, "Unsupported file format. Please upload PNG, JPG, JPEG, or PDF files." | |
| return image, status | |
| except Exception as e: | |
| return None, f"β Error processing file: {str(e)}" | |
| def process_image(image: Image.Image, task_type: str) -> Tuple[Image.Image, str, str]: | |
| """Process uploaded image and return results""" | |
| if image is None: | |
| return None, "Please upload an image first.", "" | |
| # Convert to RGB if needed | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Analyze the image | |
| results = analyze_image(image, task_type) | |
| # Create annotated image | |
| annotated_image = draw_bounding_boxes(image, results) | |
| # Format results text | |
| results_text = format_results_text(results, task_type) | |
| # Create status message | |
| if results.get("success", False): | |
| status = f"β Analysis completed successfully using Florence-2 on {model_cache.get('device', 'unknown device')}" | |
| else: | |
| status = f"β Analysis failed: {results.get('error', 'Unknown error')}" | |
| return annotated_image, results_text, status | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| # Custom CSS for better styling | |
| custom_css = """ | |
| .gradio-container { | |
| font-family: 'Arial', sans-serif; | |
| } | |
| .analysis-results { | |
| background-color: #f0f2f6; | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| margin: 1rem 0; | |
| } | |
| """ | |
| with gr.Blocks(title="Florence-2 Document & Image Analyzer", css=custom_css, theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π Florence-2 Document & Image Analyzer | |
| Upload images to analyze them with Microsoft's Florence-2 vision model. | |
| **Note:** The model will be loaded automatically on first use (~5GB download, takes 2-3 minutes). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload Image or PDF", | |
| file_types=[".png", ".jpg", ".jpeg", ".pdf"], | |
| type="filepath" | |
| ) | |
| image_input = gr.Image( | |
| type="pil", | |
| label="Current Image", | |
| height=400, | |
| interactive=False | |
| ) | |
| task_dropdown = gr.Dropdown( | |
| choices=[ | |
| ("Object Detection", "object_detection"), | |
| ("Detailed Caption", "detailed_caption"), | |
| ("OCR (Text Extraction)", "ocr"), | |
| ("Dense Captioning", "dense_captioning") | |
| ], | |
| value="object_detection", | |
| label="Analysis Type", | |
| info="Choose the type of analysis to perform" | |
| ) | |
| analyze_btn = gr.Button("π Analyze Image", variant="primary", size="lg") | |
| with gr.Column(): | |
| annotated_output = gr.Image( | |
| label="Analysis Results", | |
| height=400 | |
| ) | |
| results_text = gr.Markdown( | |
| label="Analysis Details", | |
| value="Upload an image and click 'Analyze Image' to get started!" | |
| ) | |
| status_text = gr.Markdown( | |
| value="βΉοΈ Ready to analyze images" | |
| ) | |
| # Event handlers | |
| def handle_file_upload(file_path): | |
| if file_path is None: | |
| return None, "Please upload a file first." | |
| image, status = process_uploaded_file(file_path) | |
| return image, status | |
| def handle_analyze(image, task_type): | |
| return process_image(image, task_type) | |
| file_input.change( | |
| fn=handle_file_upload, | |
| inputs=[file_input], | |
| outputs=[image_input, status_text], | |
| show_progress=True | |
| ) | |
| analyze_btn.click( | |
| fn=handle_analyze, | |
| inputs=[image_input, task_dropdown], | |
| outputs=[annotated_output, results_text, status_text], | |
| show_progress=True | |
| ) | |
| # Information sections | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| ## βΉοΈ About Florence-2 | |
| **Florence-2** is Microsoft's foundation vision model capable of: | |
| - **π― Object Detection**: Identifies and locates objects with bounding boxes | |
| - **π Detailed Caption**: Generates comprehensive descriptions of image content | |
| - **π€ OCR**: Extracts and locates text in images | |
| - **π Dense Captioning**: Provides detailed captions for different regions | |
| The model downloads automatically on first use (~5GB) and is cached for subsequent uses. | |
| """) | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| ## β‘ Performance Notes | |
| - **First run**: Model download may take 2-3 minutes | |
| - **GPU**: Faster inference when available | |
| - **CPU**: Works but slower processing | |
| - **Model size**: ~5GB (cached after first download) | |
| - **Supported formats**: PNG, JPG, JPEG, PDF | |
| """) | |
| # Usage instructions | |
| gr.Markdown(""" | |
| ## π How to Use | |
| 1. **Upload a file**: Click "Upload Image or PDF" and choose your file | |
| 2. **Select analysis type**: Choose from the dropdown menu | |
| 3. **Click Analyze**: The image will appear and you can analyze it | |
| 4. **View results**: See the annotated image and detailed analysis | |
| **Good examples to try:** | |
| - Photos with objects (cars, people, animals) | |
| - Screenshots with text for OCR | |
| - Documents or diagrams for analysis | |
| - Multi-object scenes for detection | |
| """) | |
| return demo | |
| def main(): | |
| """Main function to launch the Gradio app""" | |
| demo = create_interface() | |
| # Launch the app | |
| demo.launch( | |
| share=SHARE_LINK, | |
| server_port=SERVER_PORT, | |
| show_error=True, | |
| quiet=False | |
| ) | |
| if __name__ == "__main__": | |
| main() |