""" Detection Function Wrappers Provides unified detection function signatures for different backends: - Direct service access (for HF Spaces / local) - API client access (for production service-oriented architecture) This eliminates duplication of detection logic across app.py and ui/gradio_interface.py """ import os import requests import base64 import io from PIL import Image from typing import Tuple, Optional import traceback from detection.service_factory import get_detection_service from detection import ocr_handler, response_builder def detect_with_service( image: Image.Image, confidence_threshold: float, line_thickness: int, enable_clip: bool, enable_ocr: bool, enable_blip: bool, ocr_only: bool, blip_scope_choice: str, preprocess: bool = False, preprocess_mode_choice: str = "RF-DETR Optimized (Recommended)", preprocess_preset: str = "standard" ) -> Tuple[Optional[Image.Image], str, Optional[dict]]: """ Detect UI elements using detection service directly (no API) Used by: app.py (HF Spaces / local mode) Returns: Tuple of (annotated_image, summary_text, json_payload) """ try: if image is None: return None, "❌ Please upload an image first.", None # Map BLIP scope choice to internal value scope_value = "all" if (blip_scope_choice or "").lower().startswith("all") else "icons" # Map preprocessing mode choice to internal value preprocess_mode = "rfdetr" if "RF-DETR" in preprocess_mode_choice else "generic" # OCR-only path if ocr_only: detections = ocr_handler.process_ocr_only(image) annotated = ocr_handler.annotate_ocr_detections( image, detections, thickness=line_thickness, return_format="pil" ) json_payload = response_builder.build_ocr_only_response( detections=detections, image_width=image.width, image_height=image.height, annotated_image=None, confidence_threshold=confidence_threshold, line_thickness=line_thickness ) summary_text = response_builder.format_summary_text( detections=detections, parameters=json_payload["parameters"], ocr_only=True ) return annotated, summary_text, json_payload # Standard detection path service = get_detection_service() # Run analysis (pass parameters directly to avoid race conditions) analysis = service.analyze( image, confidence_threshold=confidence_threshold, extract_text=enable_ocr, use_clip=enable_clip, use_blip=enable_blip, merge_global_ocr=True, blip_scope=scope_value, preprocess=preprocess, preprocess_mode=preprocess_mode, preprocess_preset=preprocess_preset ) # Generate annotated image annotated = service.get_prediction_image( image, confidence_threshold=confidence_threshold, extract_content=True, thickness=line_thickness, return_format="pil", analysis=analysis ) # Build JSON response json_payload = { "success": True, "detections": analysis["detections"], "total_detections": len(analysis["detections"]), "image_size": analysis["image_size"], "parameters": { "confidence_threshold": confidence_threshold, "enable_clip": enable_clip, "enable_ocr": enable_ocr, "enable_blip": enable_blip, "blip_scope": scope_value if enable_blip else None, "ocr_only": False, "line_thickness": line_thickness }, "type_distribution": response_builder.build_type_distribution(analysis["detections"]) if enable_clip else None } # Build summary text summary_text = response_builder.format_summary_text( detections=analysis["detections"], parameters=json_payload["parameters"], ocr_only=False ) return annotated, summary_text, json_payload except Exception as e: error_msg = f"""❌ **Error during detection:** ``` {str(e)} {traceback.format_exc()} ``` """ print(error_msg) return None, error_msg, None def detect_with_api( image: Image.Image, confidence_threshold: float, line_thickness: int, enable_clip: bool, enable_ocr: bool, enable_blip: bool, ocr_only: bool, blip_scope_choice: str, preprocess: bool = False, preprocess_mode_choice: str = "RF-DETR Optimized (Recommended)", preprocess_preset: str = "standard", api_url: str = "http://localhost:8000" ) -> Tuple[Optional[Image.Image], str, Optional[dict]]: """ Detect UI elements by calling the API Used by: app_ui.py (service-oriented mode) Returns: Tuple of (annotated_image, summary_text, json_payload) """ try: if image is None: return None, "❌ Please upload an image first.", None # Map BLIP scope choice to internal value scope_value = "all" if (blip_scope_choice or "").lower().startswith("all") else "icons" # Map preprocessing mode choice to internal value preprocess_mode = "rfdetr" if "RF-DETR" in preprocess_mode_choice else "generic" # Prepare image for upload img_byte_arr = io.BytesIO() image.save(img_byte_arr, format='PNG') img_byte_arr.seek(0) # Prepare form data files = { 'image': ('image.png', img_byte_arr, 'image/png') } data = { 'confidence_threshold': confidence_threshold, 'line_thickness': line_thickness, 'enable_clip': str(enable_clip).lower(), 'enable_ocr': str(enable_ocr).lower(), 'enable_blip': str(enable_blip).lower(), 'blip_scope': scope_value, 'ocr_only': str(ocr_only).lower(), 'preprocess': str(preprocess).lower(), 'preprocess_mode': preprocess_mode, 'preprocess_preset': preprocess_preset } # Call API try: response = requests.post( f"{api_url}/detect", files=files, data=data, timeout=120 ) response.raise_for_status() except requests.exceptions.ConnectionError: return None, f"""❌ **Connection Error** Cannot connect to API server at `{api_url}` **To fix this:** 1. Make sure the API server is running: ```bash python app_api.py ``` 2. The API should be accessible at http://localhost:8000 3. Check that no firewall is blocking the connection **Current API URL:** {api_url} You can change this by setting the `CU1_API_URL` environment variable. """, None except requests.exceptions.Timeout: return None, f"""❌ **Timeout Error** The API request timed out after 120 seconds. This might happen with: - Very large images - First run (models need to download) - CPU-only processing (slower than GPU) **Try:** - Using a smaller image - Waiting for model downloads to complete - Checking API server logs for errors """, None except requests.exceptions.HTTPError as e: error_detail = "Unknown error" try: error_json = response.json() error_detail = error_json.get("detail", str(e)) except: error_detail = str(e) return None, f"""❌ **API Error ({response.status_code})** {error_detail} **API URL:** {api_url} """, None # Parse response json_payload = response.json() if not json_payload.get("success", False): return None, f"❌ Detection failed: {json_payload.get('error', 'Unknown error')}", json_payload # Decode annotated image annotated_image = None if "annotated_image" in json_payload and json_payload["annotated_image"]: try: img_data = base64.b64decode(json_payload["annotated_image"]["base64"]) annotated_image = Image.open(io.BytesIO(img_data)) except Exception as e: print(f"Failed to decode annotated image: {e}") # Build summary text using response_builder summary_text = response_builder.format_summary_text( detections=json_payload.get("detections", []), parameters=json_payload.get("parameters", {}), ocr_only=json_payload.get("parameters", {}).get("ocr_only", False) ) return annotated_image, summary_text, json_payload except Exception as e: error_msg = f"""❌ **Error during detection:** ``` {str(e)} {traceback.format_exc()} ``` **API URL:** {api_url} """ print(error_msg) return None, error_msg, None