Spaces:
Paused
Paused
| """ | |
| π FunCaptcha Solver API - Hugging Face Spaces Deployment | |
| Optimized for speed, memory efficiency, and scalability | |
| Features: | |
| - FastAPI async operations | |
| - API key authentication via HF secrets | |
| - Fuzzy label matching | |
| - Memory-efficient model loading | |
| - ONNX CPU optimization | |
| - NO RESPONSE CACHING for fresh/accurate predictions | |
| - Model caching only (for performance) | |
| π IMPORTANT: Response caching DISABLED untuk memastikan prediksi selalu fresh dan akurat | |
| """ | |
| import os | |
| import io | |
| import base64 | |
| import hashlib | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List, Union | |
| import logging | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import yaml | |
| import difflib | |
| # Try to import ML backends dengan multiple fallbacks | |
| ONNX_AVAILABLE = False | |
| TORCH_AVAILABLE = False | |
| TF_AVAILABLE = False | |
| ort = None | |
| # Try ONNX Runtime first | |
| try: | |
| import onnxruntime as ort | |
| ONNX_AVAILABLE = True | |
| print("β ONNX Runtime imported successfully") | |
| except ImportError as e: | |
| print(f"β ONNX Runtime import failed: {e}") | |
| ort = None # Set to None when import fails | |
| # Try PyTorch as fallback | |
| try: | |
| import torch | |
| TORCH_AVAILABLE = True | |
| print("β PyTorch imported as ONNX Runtime alternative") | |
| except ImportError: | |
| print("β PyTorch not available") | |
| # Try TensorFlow as final fallback | |
| try: | |
| import tensorflow as tf | |
| TF_AVAILABLE = True | |
| print("β TensorFlow imported as ONNX Runtime alternative") | |
| except ImportError: | |
| print("β TensorFlow not available") | |
| print("β οΈ Running without ML backend - model inference will be disabled") | |
| ML_BACKEND_AVAILABLE = ONNX_AVAILABLE or TORCH_AVAILABLE or TF_AVAILABLE | |
| from fastapi import FastAPI, HTTPException, Depends, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| import uvicorn | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ================================================================= | |
| # CONFIGURATION & MODELS | |
| # ================================================================= | |
| class FunCaptchaRequest(BaseModel): | |
| """Request model untuk FunCaptcha solving""" | |
| challenge_type: str = Field(..., description="Type of challenge (pick_the, upright)") | |
| image_b64: str = Field(..., description="Base64 encoded image") | |
| target_label: Optional[str] = Field(None, description="Target label untuk pick_the challenges") | |
| class FunCaptchaResponse(BaseModel): | |
| """Response model untuk FunCaptcha solving""" | |
| status: str = Field(..., description="Status: success, not_found, error") | |
| box: Optional[List[float]] = Field(None, description="Bounding box coordinates [x, y, w, h]") | |
| button_index: Optional[int] = Field(None, description="Button index untuk upright challenges") | |
| confidence: Optional[float] = Field(None, description="Detection confidence") | |
| message: Optional[str] = Field(None, description="Additional message") | |
| processing_time: Optional[float] = Field(None, description="Processing time in seconds") | |
| # ================================================================= | |
| # AUTHENTICATION | |
| # ================================================================= | |
| security = HTTPBearer() | |
| def get_api_key_from_secrets() -> str: | |
| """Get API key dari Hugging Face Secrets""" | |
| api_key = os.getenv("FUNCAPTCHA_API_KEY") | |
| if not api_key: | |
| logger.error("FUNCAPTCHA_API_KEY not found in environment variables") | |
| raise ValueError("API key tidak ditemukan dalam HF Secrets") | |
| return api_key | |
| def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)) -> bool: | |
| """Verify API key dari request header""" | |
| expected_key = get_api_key_from_secrets() | |
| if credentials.credentials != expected_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| return True | |
| # ================================================================= | |
| # MODEL CONFIGURATION & MANAGEMENT | |
| # ================================================================= | |
| CONFIGS = { | |
| 'default': { | |
| 'model_path': 'best.onnx', | |
| 'yaml_path': 'data.yaml', | |
| 'input_size': 640, | |
| 'confidence_threshold': 0.4, | |
| 'nms_threshold': 0.2 | |
| }, | |
| 'spiral_galaxy': { | |
| 'model_path': 'bestspiral.onnx', | |
| 'yaml_path': 'dataspiral.yaml', | |
| 'input_size': 416, | |
| 'confidence_threshold': 0.30, | |
| 'nms_threshold': 0.45 | |
| }, | |
| 'upright': { | |
| 'model_path': 'best_upright.onnx', | |
| 'yaml_path': 'data_upright.yaml', | |
| 'input_size': 640, | |
| 'confidence_threshold': 0.5, # π§ Match test script confidence (was 0.25) | |
| 'nms_threshold': 0.45 | |
| } | |
| } | |
| MODEL_ROUTING = [ | |
| (['spiral', 'galaxy'], 'spiral_galaxy') | |
| ] | |
| # Global cache untuk models saja (response cache DISABLED untuk prediksi fresh) | |
| LOADED_MODELS: Dict[str, Dict[str, Any]] = {} | |
| # RESPONSE_CACHE: Dict[str, Dict[str, Any]] = {} # β DISABLED - No response caching | |
| # CACHE_MAX_SIZE = 100 # β DISABLED | |
| class ModelManager: | |
| """Manager untuk loading dan caching models""" | |
| async def get_model(config_key: str) -> Optional[Dict[str, Any]]: | |
| """Load model dengan caching untuk efficiency""" | |
| # Check if any ML backend is available | |
| if not ML_BACKEND_AVAILABLE: | |
| logger.error("β No ML backend available - cannot load models") | |
| return None | |
| if config_key not in LOADED_MODELS: | |
| logger.info(f"Loading model: {config_key}") | |
| try: | |
| config = CONFIGS[config_key] | |
| # Check if files exist | |
| if not os.path.exists(config['model_path']): | |
| logger.warning(f"Model file not found: {config['model_path']}") | |
| return None | |
| if not os.path.exists(config['yaml_path']): | |
| logger.warning(f"YAML file not found: {config['yaml_path']}") | |
| return None | |
| # Load model dengan available backend | |
| session = None | |
| actual_input_size = config['input_size'] # Default fallback | |
| if ONNX_AVAILABLE and ort is not None: | |
| # Load ONNX session dengan CPU optimization | |
| providers = ['CPUExecutionProvider'] | |
| session_options = ort.SessionOptions() | |
| session_options.intra_op_num_threads = 2 # Optimize untuk CPU | |
| session_options.inter_op_num_threads = 2 | |
| session_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL | |
| session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| session = ort.InferenceSession( | |
| config['model_path'], | |
| providers=providers, | |
| sess_options=session_options | |
| ) | |
| # π§ AUTO-DETECT input size dari model shape (fix untuk upright model) | |
| try: | |
| input_shape = session.get_inputs()[0].shape | |
| if isinstance(input_shape, (list, tuple)) and len(input_shape) >= 4: | |
| h, w = input_shape[2], input_shape[3] | |
| if isinstance(h, int) and isinstance(w, int) and h > 0 and w > 0: | |
| actual_input_size = h # gunakan height dari model shape | |
| logger.info(f"π§ AUTO-DETECTED input size untuk {config_key}: {actual_input_size} (was {config['input_size']})") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to auto-detect input size for {config_key}: {e}") | |
| # Keep using config input_size as fallback | |
| else: | |
| # For now, only ONNX Runtime is supported for model loading | |
| # PyTorch/TensorFlow alternatives would need model conversion | |
| logger.error("β ONNX models require ONNX Runtime - other backends not yet implemented") | |
| return None | |
| # Load class names | |
| with open(config['yaml_path'], 'r', encoding='utf-8') as file: | |
| class_names = yaml.safe_load(file)['names'] | |
| LOADED_MODELS[config_key] = { | |
| 'session': session, | |
| 'class_names': class_names, | |
| 'input_name': session.get_inputs()[0].name, | |
| 'input_size': actual_input_size, # π§ Gunakan auto-detected input size | |
| 'confidence': config['confidence_threshold'], | |
| 'nms': config.get('nms_threshold', 0.45) | |
| } | |
| logger.info(f"β Model loaded successfully: {config_key}") | |
| except Exception as e: | |
| logger.error(f"β Error loading model {config_key}: {e}") | |
| return None | |
| return LOADED_MODELS[config_key] | |
| # ================================================================= | |
| # IMAGE PROCESSING & UTILITIES | |
| # ================================================================= | |
| def preprocess_image(image_bytes: bytes, input_size: int) -> np.ndarray: | |
| """Preprocess image untuk ONNX inference dengan optimasi memory""" | |
| image = Image.open(io.BytesIO(image_bytes)).convert('RGB') | |
| image_np = np.array(image) | |
| h, w, _ = image_np.shape | |
| scale = min(input_size / w, input_size / h) | |
| new_w, new_h = int(w * scale), int(h * scale) | |
| resized_image = cv2.resize(image_np, (new_w, new_h)) | |
| padded_image = np.full((input_size, input_size, 3), 114, dtype=np.uint8) | |
| # Calculate padding | |
| y_offset = (input_size - new_h) // 2 | |
| x_offset = (input_size - new_w) // 2 | |
| padded_image[y_offset:y_offset + new_h, x_offset:x_offset + new_w, :] = resized_image | |
| # Convert untuk ONNX | |
| input_tensor = padded_image.astype(np.float32) / 255.0 | |
| input_tensor = np.transpose(input_tensor, (2, 0, 1)) | |
| input_tensor = np.expand_dims(input_tensor, axis=0) | |
| return input_tensor | |
| def fuzzy_match_label(target_label: str, class_names: List[str], threshold: float = 0.6) -> Optional[str]: | |
| """Fuzzy matching untuk label variations""" | |
| target_normalized = target_label.lower().strip() | |
| # Dictionary untuk common variations | |
| label_variants = { | |
| 'ice cream': ['ice cream', 'icecream', 'ice'], | |
| 'hotdog': ['hot dog', 'hotdog', 'hot-dog'], | |
| 'hot dog': ['hot dog', 'hotdog', 'hot-dog'], | |
| 'sunglasses': ['sunglasses', 'sun glasses', 'sunglass'], | |
| 'sun glasses': ['sunglasses', 'sun glasses', 'sunglass'] | |
| } | |
| # 1. Exact match | |
| if target_normalized in class_names: | |
| return target_normalized | |
| # 2. Check known variants | |
| for main_label, variants in label_variants.items(): | |
| if target_normalized in variants and main_label in class_names: | |
| return main_label | |
| # 3. Fuzzy matching | |
| best_matches = difflib.get_close_matches( | |
| target_normalized, | |
| [name.lower() for name in class_names], | |
| n=3, | |
| cutoff=threshold | |
| ) | |
| if best_matches: | |
| for match in best_matches: | |
| for class_name in class_names: | |
| if class_name.lower() == match: | |
| return class_name | |
| # 4. Partial matching | |
| for class_name in class_names: | |
| if target_normalized in class_name.lower() or class_name.lower() in target_normalized: | |
| return class_name | |
| return None | |
| def get_config_key_for_label(target_label: str) -> str: | |
| """Determine which model config to use""" | |
| for keywords, config_key in MODEL_ROUTING: | |
| if any(keyword in target_label for keyword in keywords): | |
| return config_key | |
| return 'default' | |
| def get_button_index(x_center: float, y_center: float, img_width: int, img_height: int, | |
| grid_cols: int = 3, grid_rows: int = 2) -> int: | |
| """Calculate button index dari coordinates""" | |
| # Calculate grid cell dimensions | |
| cell_width = img_width / grid_cols | |
| cell_height = img_height / grid_rows | |
| # Calculate which cell the center point falls into | |
| col = int(x_center // cell_width) | |
| row = int(y_center // cell_height) | |
| # Ensure col and row are within bounds | |
| col = max(0, min(col, grid_cols - 1)) | |
| row = max(0, min(row, grid_rows - 1)) | |
| # Calculate button index (1-based) | |
| button_index = row * grid_cols + col + 1 | |
| # Debug logging | |
| logger.info(f"π BUTTON INDEX DEBUG: Input coordinates: ({x_center:.2f}, {y_center:.2f})") | |
| logger.info(f"π BUTTON INDEX DEBUG: Image dimensions: {img_width}x{img_height}") | |
| logger.info(f"π BUTTON INDEX DEBUG: Grid: {grid_cols}x{grid_rows}") | |
| logger.info(f"π BUTTON INDEX DEBUG: Cell dimensions: {cell_width:.2f}x{cell_height:.2f}") | |
| logger.info(f"π BUTTON INDEX DEBUG: Grid position: col={col}, row={row}") | |
| logger.info(f"π BUTTON INDEX DEBUG: Calculated button index: {button_index}") | |
| logger.info(f"π BUTTON INDEX DEBUG: Grid layout visualization:") | |
| logger.info(f"π BUTTON INDEX DEBUG: [1] [2] [3]") | |
| logger.info(f"π BUTTON INDEX DEBUG: [4] [5] [6]") | |
| logger.info(f"π BUTTON INDEX DEBUG: X ranges: [0-{cell_width:.1f}] [{cell_width:.1f}-{cell_width*2:.1f}] [{cell_width*2:.1f}-{img_width}]") | |
| logger.info(f"π BUTTON INDEX DEBUG: Y ranges: [0-{cell_height:.1f}] [{cell_height:.1f}-{img_height}]") | |
| return button_index | |
| # ================================================================= | |
| # CACHING SYSTEM - DISABLED FOR FRESH PREDICTIONS | |
| # ================================================================= | |
| # β CACHE FUNCTIONS DISABLED - No response caching for fresh predictions | |
| # def get_cache_key(request_data: dict) -> str: | |
| # """Generate cache key dari request data""" | |
| # cache_string = f"{request_data.get('challenge_type')}_{request_data.get('target_label')}_{request_data.get('image_b64', '')[:100]}" | |
| # return hashlib.md5(cache_string.encode()).hexdigest() | |
| # def get_cached_response(cache_key: str) -> Optional[dict]: | |
| # """Get response dari cache jika ada""" | |
| # return RESPONSE_CACHE.get(cache_key) | |
| # def cache_response(cache_key: str, response: dict): | |
| # """Cache response dengan size limit""" | |
| # if len(RESPONSE_CACHE) >= CACHE_MAX_SIZE: | |
| # # Remove oldest entry | |
| # oldest_key = next(iter(RESPONSE_CACHE)) | |
| # del RESPONSE_CACHE[oldest_key] | |
| # | |
| # RESPONSE_CACHE[cache_key] = response | |
| # ================================================================= | |
| # CHALLENGE HANDLERS | |
| # ================================================================= | |
| async def handle_pick_the_challenge(data: dict) -> dict: | |
| """Handle 'pick the' challenges dengan fuzzy matching - ALWAYS FRESH PREDICTIONS""" | |
| start_time = datetime.now() | |
| # π ALWAYS FRESH - No response caching for accurate pick_the predictions | |
| logger.info(f"π Processing FRESH pick_the prediction (no response cache)") | |
| target_label_original = data['target_label'] | |
| image_b64 = data['image_b64'] | |
| target_label = target_label_original | |
| config_key = get_config_key_for_label(target_label) | |
| if config_key == 'spiral_galaxy': | |
| target_label = 'spiral' | |
| model_data = await ModelManager.get_model(config_key) | |
| if not model_data: | |
| if not ML_BACKEND_AVAILABLE: | |
| return { | |
| 'status': 'error', | |
| 'message': 'No ML backend available - model inference disabled', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| return { | |
| 'status': 'error', | |
| 'message': f'Model {config_key} tidak ditemukan', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| try: | |
| # Decode image | |
| image_bytes = base64.b64decode(image_b64.split(',')[1]) | |
| # Fuzzy matching untuk label | |
| matched_label = fuzzy_match_label(target_label, model_data['class_names']) | |
| if not matched_label: | |
| return { | |
| 'status': 'not_found', | |
| 'message': f'Label "{target_label}" tidak ditemukan dalam model', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| target_label = matched_label | |
| # Preprocessing | |
| input_tensor = preprocess_image(image_bytes, model_data['input_size']) | |
| # Inference | |
| outputs = model_data['session'].run(None, {model_data['input_name']: input_tensor})[0] | |
| predictions = np.squeeze(outputs).T | |
| # Process detections | |
| boxes = [] | |
| confidences = [] | |
| class_ids = [] | |
| for pred in predictions: | |
| class_scores = pred[4:] | |
| class_id = np.argmax(class_scores) | |
| max_confidence = class_scores[class_id] | |
| if max_confidence > model_data['confidence']: | |
| confidences.append(float(max_confidence)) | |
| class_ids.append(class_id) | |
| box_model = pred[:4] | |
| x_center, y_center, width, height = box_model | |
| x1 = x_center - width / 2 | |
| y1 = y_center - height / 2 | |
| boxes.append([int(x1), int(y1), int(width), int(height)]) | |
| if not boxes: | |
| return { | |
| 'status': 'not_found', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| # Non-Maximum Suppression | |
| indices = cv2.dnn.NMSBoxes( | |
| boxes, # Use original list instead of numpy array | |
| confidences, # Use original list instead of numpy array | |
| model_data['confidence'], | |
| model_data['nms'] | |
| ) | |
| if len(indices) == 0: | |
| return { | |
| 'status': 'not_found', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| # Find target | |
| target_class_id = model_data['class_names'].index(target_label) | |
| best_match_box = None | |
| highest_score = 0 | |
| # Handle indices properly - cv2.dnn.NMSBoxes can return different types | |
| indices_flat: List[int] = [] | |
| if indices is not None and len(indices) > 0: | |
| # Convert to list of integers with proper type handling | |
| try: | |
| # Check if it's a numpy array | |
| if isinstance(indices, np.ndarray): | |
| indices_flat = indices.flatten().tolist() | |
| elif hasattr(indices, '__iter__') and not isinstance(indices, (str, bytes)): | |
| # Handle iterable (list, tuple, etc.) | |
| temp_list = [] | |
| for idx in indices: | |
| if isinstance(idx, (list, tuple, np.ndarray)): | |
| # Nested iterable - flatten it | |
| try: | |
| if isinstance(idx, np.ndarray): | |
| temp_list.extend(idx.flatten().tolist()) | |
| else: | |
| temp_list.extend([int(x) for x in idx]) | |
| except (TypeError, ValueError): | |
| # Skip invalid nested items | |
| continue | |
| else: | |
| # Single value | |
| try: | |
| temp_list.append(int(idx)) | |
| except (TypeError, ValueError): | |
| # Skip invalid items | |
| continue | |
| indices_flat = temp_list | |
| else: | |
| # Handle single numeric value | |
| try: | |
| # Check if it's numeric | |
| if isinstance(indices, (int, float)): | |
| indices_flat = [int(indices)] | |
| else: | |
| indices_flat = [] | |
| except (TypeError, ValueError): | |
| indices_flat = [] | |
| except Exception as e: | |
| # fallback to empty list if conversion fails | |
| logger.warning(f"Failed to process NMS indices: {e}") | |
| indices_flat = [] | |
| for i in indices_flat: | |
| if 0 <= i < len(class_ids) and class_ids[i] == target_class_id: | |
| current_score = confidences[i] | |
| if current_score > highest_score: | |
| highest_score = current_score | |
| best_match_box = boxes[i] | |
| if best_match_box is not None: | |
| # Scale back to original coordinates | |
| img = Image.open(io.BytesIO(image_bytes)) | |
| original_w, original_h = img.size | |
| scale = min(model_data['input_size'] / original_w, model_data['input_size'] / original_h) | |
| pad_x = (model_data['input_size'] - original_w * scale) / 2 | |
| pad_y = (model_data['input_size'] - original_h * scale) / 2 | |
| x_orig = (best_match_box[0] - pad_x) / scale | |
| y_orig = (best_match_box[1] - pad_y) / scale | |
| w_orig = best_match_box[2] / scale | |
| h_orig = best_match_box[3] / scale | |
| return { | |
| 'status': 'success', | |
| 'box': [x_orig, y_orig, w_orig, h_orig], | |
| 'confidence': highest_score, | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in handle_pick_the_challenge: {e}") | |
| return { | |
| 'status': 'error', | |
| 'message': str(e), | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| return { | |
| 'status': 'not_found', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| async def handle_upright_challenge(data: dict) -> dict: | |
| """Handle 'upright' challenges - ALWAYS FRESH PREDICTIONS""" | |
| start_time = datetime.now() | |
| # π ALWAYS FRESH - No response caching for accurate upright predictions | |
| logger.info(f"π Processing FRESH upright prediction (no response cache)") | |
| try: | |
| image_b64 = data['image_b64'] | |
| model_data = await ModelManager.get_model('upright') | |
| if not model_data: | |
| if not ML_BACKEND_AVAILABLE: | |
| return { | |
| 'status': 'error', | |
| 'message': 'No ML backend available - model inference disabled', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| return { | |
| 'status': 'error', | |
| 'message': 'Model upright tidak ditemukan', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| # Debug: Log model configuration | |
| logger.info(f"π UPRIGHT DEBUG: Model config: input_size={model_data['input_size']}, confidence={model_data['confidence']}, nms={model_data['nms']}") | |
| image_bytes = base64.b64decode(image_b64.split(',')[1]) | |
| reconstructed_image_pil = Image.open(io.BytesIO(image_bytes)) | |
| original_w, original_h = reconstructed_image_pil.size | |
| # Debug: Log image dimensions | |
| logger.info(f"π UPRIGHT DEBUG: Original image dimensions: {original_w}x{original_h}") | |
| # Use the model's configured input size consistently | |
| input_size = model_data['input_size'] | |
| # Debug: Log model configuration | |
| logger.info(f"π UPRIGHT DEBUG: Model configured input size: {input_size}") | |
| input_tensor = preprocess_image(image_bytes, input_size) | |
| outputs = model_data['session'].run(None, {model_data['input_name']: input_tensor})[0] | |
| predictions = np.squeeze(outputs).T | |
| confident_preds = predictions[predictions[:, 4] > model_data['confidence']] | |
| # Debug: Log predictions info | |
| logger.info(f"π UPRIGHT DEBUG: Total predictions: {len(predictions)}, Confident predictions: {len(confident_preds)}") | |
| logger.info(f"π UPRIGHT DEBUG: Confidence threshold: {model_data['confidence']}") | |
| if len(confident_preds) == 0: | |
| return { | |
| 'status': 'not_found', | |
| 'message': 'Tidak ada objek terdeteksi', | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| # Debug: Log all confident predictions | |
| for i, pred in enumerate(confident_preds): | |
| logger.info(f"π UPRIGHT DEBUG: Prediction {i+1}: x_center={pred[0]:.2f}, y_center={pred[1]:.2f}, width={pred[2]:.2f}, height={pred[3]:.2f}, confidence={pred[4]:.4f}") | |
| best_detection = confident_preds[np.argmax(confident_preds[:, 4])] | |
| box_model = best_detection[:4] | |
| # Debug: Log model space coordinates | |
| logger.info(f"π UPRIGHT DEBUG: Best detection (model space): x_center={box_model[0]:.2f}, y_center={box_model[1]:.2f}, width={box_model[2]:.2f}, height={box_model[3]:.2f}") | |
| scale = min(input_size / original_w, input_size / original_h) | |
| pad_x = (input_size - original_w * scale) / 2 | |
| pad_y = (input_size - original_h * scale) / 2 | |
| # Debug: Log scaling parameters | |
| logger.info(f"π UPRIGHT DEBUG: Scaling parameters: scale={scale:.4f}, pad_x={pad_x:.2f}, pad_y={pad_y:.2f}") | |
| logger.info(f"π UPRIGHT DEBUG: Input size used: {input_size}") | |
| x_center_orig = (box_model[0] - pad_x) / scale | |
| y_center_orig = (box_model[1] - pad_y) / scale | |
| # Debug: Log original space coordinates with detailed calculation | |
| logger.info(f"π UPRIGHT DEBUG: Coordinate transformation:") | |
| logger.info(f"π UPRIGHT DEBUG: Model coordinates: x={box_model[0]:.2f}, y={box_model[1]:.2f}") | |
| logger.info(f"π UPRIGHT DEBUG: Subtract padding: x={box_model[0]:.2f}-{pad_x:.2f}={box_model[0]-pad_x:.2f}, y={box_model[1]:.2f}-{pad_y:.2f}={box_model[1]-pad_y:.2f}") | |
| logger.info(f"π UPRIGHT DEBUG: Divide by scale: x={box_model[0]-pad_x:.2f}/{scale:.4f}={x_center_orig:.2f}, y={box_model[1]-pad_y:.2f}/{scale:.4f}={y_center_orig:.2f}") | |
| logger.info(f"π UPRIGHT DEBUG: Final original space coordinates: x_center={x_center_orig:.2f}, y_center={y_center_orig:.2f}") | |
| # π§ DISABLED coordinate clamping - use raw coordinates like test script | |
| # Coordinate clamping was causing button index mismatch (changed 3 to 1) | |
| # if x_center_orig < 0 or y_center_orig < 0 or x_center_orig > original_w or y_center_orig > original_h: | |
| # logger.warning(f"β οΈ UPRIGHT WARNING: Coordinates out of bounds: ({x_center_orig:.2f}, {y_center_orig:.2f}) for image {original_w}x{original_h}") | |
| # # Clamp to image bounds | |
| # x_center_orig = max(0, min(x_center_orig, original_w)) | |
| # y_center_orig = max(0, min(y_center_orig, original_h)) | |
| # logger.info(f"π§ UPRIGHT FIX: Clamped coordinates to: ({x_center_orig:.2f}, {y_center_orig:.2f})") | |
| # Debug: Log raw coordinates (no clamping) | |
| logger.info(f"π UPRIGHT DEBUG: Raw coordinates (no clamping): ({x_center_orig:.2f}, {y_center_orig:.2f})") | |
| # Debug: Log grid calculation details | |
| grid_cols, grid_rows = 3, 2 | |
| col = int(x_center_orig // (original_w / grid_cols)) | |
| row = int(y_center_orig // (original_h / grid_rows)) | |
| logger.info(f"π UPRIGHT DEBUG: Grid calculation: grid_cols={grid_cols}, grid_rows={grid_rows}") | |
| logger.info(f"π UPRIGHT DEBUG: Cell calculation: col={col}, row={row}") | |
| logger.info(f"π UPRIGHT DEBUG: Grid cell dimensions: width={original_w/grid_cols:.2f}, height={original_h/grid_rows:.2f}") | |
| button_to_click = get_button_index(x_center_orig, y_center_orig, original_w, original_h) | |
| # Debug: Log final result | |
| logger.info(f"π UPRIGHT DEBUG: Final button index: {button_to_click}") | |
| logger.info(f"π UPRIGHT DEBUG: Button layout (3x2 grid): [1, 2, 3] [4, 5, 6]") | |
| return { | |
| 'status': 'success', | |
| 'button_index': button_to_click, | |
| 'confidence': float(best_detection[4]), | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in handle_upright_challenge: {e}") | |
| return { | |
| 'status': 'error', | |
| 'message': str(e), | |
| 'processing_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| # ================================================================= | |
| # FASTAPI APPLICATION | |
| # ================================================================= | |
| app = FastAPI( | |
| title="π§© FunCaptcha Solver API", | |
| description="High-performance FunCaptcha solver dengan fuzzy matching untuk Hugging Face Spaces", | |
| version="1.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def root(): | |
| """Root endpoint dengan info API""" | |
| return { | |
| "service": "FunCaptcha Solver API", | |
| "version": "1.0.0", | |
| "status": "running", | |
| "endpoints": { | |
| "/solve": "POST - Solve FunCaptcha challenges", | |
| "/health": "GET - Health check", | |
| "/docs": "GET - API documentation" | |
| }, | |
| "models_loaded": len(LOADED_MODELS), | |
| "response_caching": "disabled" # β No response caching for fresh predictions | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| warnings = [] | |
| if not ONNX_AVAILABLE: | |
| warnings.append("ONNX Runtime not available") | |
| if not ML_BACKEND_AVAILABLE: | |
| warnings.append("No ML backend available - model inference disabled") | |
| backend_status = "none" | |
| if ONNX_AVAILABLE: | |
| backend_status = "onnxruntime" | |
| elif TORCH_AVAILABLE: | |
| backend_status = "pytorch" | |
| elif TF_AVAILABLE: | |
| backend_status = "tensorflow" | |
| return { | |
| "status": "healthy" if ML_BACKEND_AVAILABLE else "degraded", | |
| "service": "FunCaptcha Solver", | |
| "ml_backend": backend_status, | |
| "onnx_runtime_available": ONNX_AVAILABLE, | |
| "pytorch_available": TORCH_AVAILABLE, | |
| "tensorflow_available": TF_AVAILABLE, | |
| "models_loaded": len(LOADED_MODELS), | |
| "available_models": list(CONFIGS.keys()), | |
| "response_caching": "disabled", # β No response caching for fresh predictions | |
| "cache_entries": 0, # Always 0 since response cache disabled | |
| "warnings": warnings | |
| } | |
| async def clear_cache(authenticated: bool = Depends(verify_api_key)): | |
| """ποΈ Clear model cache only (response cache disabled for fresh predictions)""" | |
| try: | |
| models_cleared = len(LOADED_MODELS) | |
| LOADED_MODELS.clear() | |
| # RESPONSE_CACHE.clear() # β DISABLED - No response caching | |
| logger.info(f"ποΈ Model cache cleared: {models_cleared} models (response cache disabled)") | |
| return { | |
| "status": "success", | |
| "message": "Model cache cleared successfully (response cache disabled for fresh predictions)", | |
| "models_cleared": models_cleared, | |
| "response_caching": "disabled" | |
| } | |
| except Exception as e: | |
| logger.error(f"β Error clearing cache: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error clearing cache: {str(e)}") | |
| async def solve_funcaptcha( | |
| request: FunCaptchaRequest, | |
| authenticated: bool = Depends(verify_api_key) | |
| ) -> FunCaptchaResponse: | |
| """ | |
| π§© Solve FunCaptcha challenges - ALWAYS FRESH PREDICTIONS | |
| Supports: | |
| - pick_the: Pick specific objects dari images | |
| - upright: Find correctly oriented objects | |
| Features: | |
| - Fuzzy label matching | |
| - NO response caching (always fresh predictions) | |
| - Multi-model support | |
| """ | |
| request_dict = request.dict() | |
| # β NO CACHING - Always process fresh for accurate results | |
| logger.info(f"π Processing FRESH prediction for challenge: {request.challenge_type} (no cache)") | |
| # Process request | |
| if request.challenge_type == 'pick_the': | |
| if not request.target_label: | |
| raise HTTPException(status_code=400, detail="target_label required for pick_the challenges") | |
| result = await handle_pick_the_challenge(request_dict) | |
| elif request.challenge_type == 'upright': | |
| result = await handle_upright_challenge(request_dict) | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unsupported challenge type: {request.challenge_type}") | |
| # β NO CACHING - Direct return for fresh results | |
| logger.info(f"β Fresh challenge solved: {request.challenge_type} -> {result['status']}") | |
| return FunCaptchaResponse(**result) | |
| # ================================================================= | |
| # APPLICATION STARTUP | |
| # ================================================================= | |
| async def startup_event(): | |
| """Initialize aplikasi saat startup""" | |
| logger.info("π Starting FunCaptcha Solver API...") | |
| # Verify API key ada | |
| try: | |
| api_key = get_api_key_from_secrets() | |
| logger.info("β API key loaded successfully") | |
| except ValueError as e: | |
| logger.error(f"β API key error: {e}") | |
| raise e | |
| # Preload default model jika ada dan ML backend available | |
| if ML_BACKEND_AVAILABLE and os.path.exists('best.onnx') and os.path.exists('data.yaml'): | |
| logger.info("Preloading default model...") | |
| try: | |
| await ModelManager.get_model('default') | |
| logger.info("β Default model preloaded successfully") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to preload default model: {e}") | |
| elif not ML_BACKEND_AVAILABLE: | |
| logger.warning("β οΈ No ML backend available - skipping model preload") | |
| else: | |
| logger.warning("β οΈ Model files (best.onnx, data.yaml) not found - upload them to enable solving") | |
| if ML_BACKEND_AVAILABLE: | |
| backend_name = "ONNX Runtime" if ONNX_AVAILABLE else "PyTorch" if TORCH_AVAILABLE else "TensorFlow" | |
| logger.info(f"β FunCaptcha Solver API started successfully with {backend_name} backend") | |
| else: | |
| logger.warning("β οΈ FunCaptcha Solver API started with limited functionality (No ML backend available)") | |
| async def shutdown_event(): | |
| """Cleanup saat shutdown""" | |
| logger.info("π Shutting down FunCaptcha Solver API...") | |
| # Clear model cache only (response cache disabled) | |
| LOADED_MODELS.clear() | |
| # RESPONSE_CACHE.clear() # β DISABLED - No response caching | |
| logger.info("β Cleanup completed (response cache disabled)") | |
| # ================================================================= | |
| # DEVELOPMENT SERVER | |
| # ================================================================= | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| reload=False, # Disabled untuk production | |
| workers=1 # Single worker untuk HF Spaces | |
| ) | |