CU1-X / ui /detection_wrapper.py
AI-DrivenTesting's picture
init
77da9e2
raw
history blame
9.3 kB
"""
Detection Function Wrappers
Provides unified detection function signatures for different backends:
- Direct service access (for HF Spaces / local)
- API client access (for production service-oriented architecture)
This eliminates duplication of detection logic across app.py and ui/gradio_interface.py
"""
import os
import requests
import base64
import io
from PIL import Image
from typing import Tuple, Optional
import traceback
from detection.service_factory import get_detection_service
from detection import ocr_handler, response_builder
def detect_with_service(
image: Image.Image,
confidence_threshold: float,
line_thickness: int,
enable_clip: bool,
enable_ocr: bool,
enable_blip: bool,
ocr_only: bool,
blip_scope_choice: str,
preprocess: bool = False,
preprocess_mode_choice: str = "RF-DETR Optimized (Recommended)",
preprocess_preset: str = "standard"
) -> Tuple[Optional[Image.Image], str, Optional[dict]]:
"""
Detect UI elements using detection service directly (no API)
Used by: app.py (HF Spaces / local mode)
Returns:
Tuple of (annotated_image, summary_text, json_payload)
"""
try:
if image is None:
return None, "❌ Please upload an image first.", None
# Map BLIP scope choice to internal value
scope_value = "all" if (blip_scope_choice or "").lower().startswith("all") else "icons"
# Map preprocessing mode choice to internal value
preprocess_mode = "rfdetr" if "RF-DETR" in preprocess_mode_choice else "generic"
# OCR-only path
if ocr_only:
detections = ocr_handler.process_ocr_only(image)
annotated = ocr_handler.annotate_ocr_detections(
image,
detections,
thickness=line_thickness,
return_format="pil"
)
json_payload = response_builder.build_ocr_only_response(
detections=detections,
image_width=image.width,
image_height=image.height,
annotated_image=None,
confidence_threshold=confidence_threshold,
line_thickness=line_thickness
)
summary_text = response_builder.format_summary_text(
detections=detections,
parameters=json_payload["parameters"],
ocr_only=True
)
return annotated, summary_text, json_payload
# Standard detection path
service = get_detection_service()
# Run analysis (pass parameters directly to avoid race conditions)
analysis = service.analyze(
image,
confidence_threshold=confidence_threshold,
extract_text=enable_ocr,
use_clip=enable_clip,
use_blip=enable_blip,
merge_global_ocr=True,
blip_scope=scope_value,
preprocess=preprocess,
preprocess_mode=preprocess_mode,
preprocess_preset=preprocess_preset
)
# Generate annotated image
annotated = service.get_prediction_image(
image,
confidence_threshold=confidence_threshold,
extract_content=True,
thickness=line_thickness,
return_format="pil",
analysis=analysis
)
# Build JSON response
json_payload = {
"success": True,
"detections": analysis["detections"],
"total_detections": len(analysis["detections"]),
"image_size": analysis["image_size"],
"parameters": {
"confidence_threshold": confidence_threshold,
"enable_clip": enable_clip,
"enable_ocr": enable_ocr,
"enable_blip": enable_blip,
"blip_scope": scope_value if enable_blip else None,
"ocr_only": False,
"line_thickness": line_thickness
},
"type_distribution": response_builder.build_type_distribution(analysis["detections"]) if enable_clip else None
}
# Build summary text
summary_text = response_builder.format_summary_text(
detections=analysis["detections"],
parameters=json_payload["parameters"],
ocr_only=False
)
return annotated, summary_text, json_payload
except Exception as e:
error_msg = f"""❌ **Error during detection:**
```
{str(e)}
{traceback.format_exc()}
```
"""
print(error_msg)
return None, error_msg, None
def detect_with_api(
image: Image.Image,
confidence_threshold: float,
line_thickness: int,
enable_clip: bool,
enable_ocr: bool,
enable_blip: bool,
ocr_only: bool,
blip_scope_choice: str,
preprocess: bool = False,
preprocess_mode_choice: str = "RF-DETR Optimized (Recommended)",
preprocess_preset: str = "standard",
api_url: str = "http://localhost:8000"
) -> Tuple[Optional[Image.Image], str, Optional[dict]]:
"""
Detect UI elements by calling the API
Used by: app_ui.py (service-oriented mode)
Returns:
Tuple of (annotated_image, summary_text, json_payload)
"""
try:
if image is None:
return None, "❌ Please upload an image first.", None
# Map BLIP scope choice to internal value
scope_value = "all" if (blip_scope_choice or "").lower().startswith("all") else "icons"
# Map preprocessing mode choice to internal value
preprocess_mode = "rfdetr" if "RF-DETR" in preprocess_mode_choice else "generic"
# Prepare image for upload
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
# Prepare form data
files = {
'image': ('image.png', img_byte_arr, 'image/png')
}
data = {
'confidence_threshold': confidence_threshold,
'line_thickness': line_thickness,
'enable_clip': str(enable_clip).lower(),
'enable_ocr': str(enable_ocr).lower(),
'enable_blip': str(enable_blip).lower(),
'blip_scope': scope_value,
'ocr_only': str(ocr_only).lower(),
'preprocess': str(preprocess).lower(),
'preprocess_mode': preprocess_mode,
'preprocess_preset': preprocess_preset
}
# Call API
try:
response = requests.post(
f"{api_url}/detect",
files=files,
data=data,
timeout=120
)
response.raise_for_status()
except requests.exceptions.ConnectionError:
return None, f"""❌ **Connection Error**
Cannot connect to API server at `{api_url}`
**To fix this:**
1. Make sure the API server is running:
```bash
python app_api.py
```
2. The API should be accessible at http://localhost:8000
3. Check that no firewall is blocking the connection
**Current API URL:** {api_url}
You can change this by setting the `CU1_API_URL` environment variable.
""", None
except requests.exceptions.Timeout:
return None, f"""❌ **Timeout Error**
The API request timed out after 120 seconds.
This might happen with:
- Very large images
- First run (models need to download)
- CPU-only processing (slower than GPU)
**Try:**
- Using a smaller image
- Waiting for model downloads to complete
- Checking API server logs for errors
""", None
except requests.exceptions.HTTPError as e:
error_detail = "Unknown error"
try:
error_json = response.json()
error_detail = error_json.get("detail", str(e))
except:
error_detail = str(e)
return None, f"""❌ **API Error ({response.status_code})**
{error_detail}
**API URL:** {api_url}
""", None
# Parse response
json_payload = response.json()
if not json_payload.get("success", False):
return None, f"❌ Detection failed: {json_payload.get('error', 'Unknown error')}", json_payload
# Decode annotated image
annotated_image = None
if "annotated_image" in json_payload and json_payload["annotated_image"]:
try:
img_data = base64.b64decode(json_payload["annotated_image"]["base64"])
annotated_image = Image.open(io.BytesIO(img_data))
except Exception as e:
print(f"Failed to decode annotated image: {e}")
# Build summary text using response_builder
summary_text = response_builder.format_summary_text(
detections=json_payload.get("detections", []),
parameters=json_payload.get("parameters", {}),
ocr_only=json_payload.get("parameters", {}).get("ocr_only", False)
)
return annotated_image, summary_text, json_payload
except Exception as e:
error_msg = f"""❌ **Error during detection:**
```
{str(e)}
{traceback.format_exc()}
```
**API URL:** {api_url}
"""
print(error_msg)
return None, error_msg, None