Spaces:
Runtime error
Runtime error
| import os | |
| import tempfile | |
| import numpy as np | |
| from PIL import Image | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Set environment variables BEFORE importing feat | |
| os.environ['FEAT_MODELS_DIR'] = '/tmp/feat_models' | |
| os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib' | |
| os.environ['TORCH_HOME'] = '/tmp/torch' | |
| os.environ['HF_HOME'] = '/tmp/huggingface' | |
| os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers' | |
| # Create necessary directories | |
| for dir_path in ['/tmp/feat_models', '/tmp/matplotlib', '/tmp/torch', '/tmp/huggingface', '/tmp/transformers']: | |
| os.makedirs(dir_path, exist_ok=True) | |
| # Import py-feat after setting environment variables | |
| try: | |
| import feat | |
| print("Py-feat imported.") | |
| logger.info("Py-feat imported successfully") | |
| except Exception as e: | |
| print(f"Warning: py-feat import failed at app start: {e}") | |
| logger.error(f"Py-feat import failed: {e}") | |
| app = FastAPI(title="Face Detection API") | |
| # Global variable to store the detector instance | |
| global_face_detector = None | |
| async def load_model(): | |
| global global_face_detector | |
| try: | |
| logger.info("Starting model loading process...") | |
| print("Using CPU for inference.") | |
| # Define a writable cache directory for models | |
| model_cache_dir = "/tmp/feat_models_cache" | |
| os.makedirs(model_cache_dir, exist_ok=True) | |
| print(f"Set model cache directory to: {model_cache_dir}") | |
| logger.info(f"Model cache directory: {model_cache_dir}") | |
| # Try to monkey patch the resource path function if needed | |
| try: | |
| import feat.utils | |
| # Override the get_resource_path function to use /tmp | |
| def custom_get_resource_path(): | |
| cache_dir = "/tmp/feat_models" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| return cache_dir | |
| # Patch the function if it exists | |
| if hasattr(feat.utils, 'get_resource_path'): | |
| feat.utils.get_resource_path = custom_get_resource_path | |
| logger.info("Patched feat.utils.get_resource_path") | |
| except Exception as patch_error: | |
| logger.warning(f"Could not patch resource path function: {patch_error}") | |
| # Set additional environment variables that py-feat might use | |
| os.environ['HF_HOME'] = model_cache_dir | |
| os.environ['FEAT_CACHE_DIR'] = model_cache_dir | |
| # Try different model configurations based on what's available | |
| model_configs = [ | |
| # Configuration 1: Your original setup | |
| { | |
| "face_model": "retinaface", | |
| "landmark_model": "mobilefacenet", | |
| "au_model": "xgb", | |
| "emotion_model": "resmasknet", | |
| "facebox_model": "retinaface", | |
| "device": "cpu" | |
| }, | |
| # Configuration 2: Simpler setup | |
| { | |
| "face_model": "retinaface", | |
| "landmark_model": "mobilenet", | |
| "au_model": "svm", | |
| "emotion_model": "resmasknet", | |
| "device": "cpu" | |
| }, | |
| # Configuration 3: Minimal setup | |
| { | |
| "face_model": "retinaface", | |
| "device": "cpu" | |
| } | |
| ] | |
| for i, config in enumerate(model_configs): | |
| try: | |
| logger.info(f"Trying model configuration {i+1}: {config}") | |
| global_face_detector = feat.detector.Detector(**config, verbose=True) | |
| logger.info(f"Model configuration {i+1} loaded successfully!") | |
| print("RetinaFace model loaded successfully.") | |
| break | |
| except Exception as config_error: | |
| logger.warning(f"Model configuration {i+1} failed: {config_error}") | |
| if i == len(model_configs) - 1: # Last configuration | |
| raise config_error | |
| continue | |
| except PermissionError as pe: | |
| error_msg = f"Permission error during model loading: {pe}" | |
| logger.error(error_msg) | |
| print(f"Failed to load RetinaFace model during startup: {pe}") | |
| raise RuntimeError(f"Could not load model due to permissions: {pe}") | |
| except Exception as e: | |
| error_msg = f"Failed to load RetinaFace model during startup: {e}" | |
| logger.error(error_msg) | |
| print(error_msg) | |
| # Print additional debugging information | |
| logger.info("Environment variables:") | |
| for key in ['FEAT_MODELS_DIR', 'MPLCONFIGDIR', 'TORCH_HOME', 'HF_HOME']: | |
| logger.info(f" {key}: {os.environ.get(key, 'Not set')}") | |
| # Check directory permissions | |
| for dir_path in ['/tmp/feat_models', '/tmp/matplotlib', '/tmp/torch']: | |
| if os.path.exists(dir_path): | |
| logger.info(f"Directory {dir_path} exists, writable: {os.access(dir_path, os.W_OK)}") | |
| else: | |
| logger.warning(f"Directory {dir_path} does not exist") | |
| raise RuntimeError(f"Could not load model: {e}") | |
| # Define a Pydantic model for the response expected by your client script | |
| class FaceDetectionResult(BaseModel): | |
| box: List[float] | |
| landmarks: Dict[str, List[float]] | |
| async def extract_faces_api(file: UploadFile = File(...)): | |
| if not global_face_detector: | |
| raise HTTPException(status_code=500, detail="Face detector not initialized.") | |
| # Create a temporary file to save the uploaded image in a writable location | |
| temp_dir = "/tmp/" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| temp_image_path = os.path.join(temp_dir, file.filename) | |
| try: | |
| # Read the uploaded file content and save it | |
| contents = await file.read() | |
| with open(temp_image_path, "wb") as f: | |
| f.write(contents) | |
| logger.info(f"Image saved to temporary path: {temp_image_path}") | |
| print(f"Server: Image saved to temporary path: {temp_image_path}") | |
| # --- THIS IS THE CRITICAL SECTION FOR ERROR LOGGING --- | |
| try: | |
| # Use the global detector to detect faces | |
| logger.info("Starting face detection...") | |
| detected_faces_df = global_face_detector.detect_faces( | |
| input_path=temp_image_path | |
| ) | |
| results = [] | |
| if detected_faces_df is not None and not detected_faces_df.empty: | |
| logger.info(f"Found {len(detected_faces_df)} faces in DataFrame") | |
| for idx, row in detected_faces_df.iterrows(): | |
| try: | |
| # Extract bounding box - try different column name formats | |
| box_coords = None | |
| # Try different possible column names for bounding box | |
| if all(col in row for col in ['FaceRectX', 'FaceRectY', 'FaceRectWidth', 'FaceRectHeight']): | |
| x, y, w, h = row['FaceRectX'], row['FaceRectY'], row['FaceRectWidth'], row['FaceRectHeight'] | |
| box_coords = [float(x), float(y), float(x + w), float(y + h)] | |
| elif all(col in row for col in ['x', 'y', 'w', 'h']): | |
| x, y, w, h = row['x'], row['y'], row['w'], row['h'] | |
| box_coords = [float(x), float(y), float(x + w), float(y + h)] | |
| else: | |
| # Log available columns for debugging | |
| logger.warning(f"Unknown bounding box format. Available columns: {list(row.index)}") | |
| # Try to find any numeric columns that might be coordinates | |
| numeric_cols = [col for col in row.index if isinstance(row[col], (int, float))] | |
| logger.info(f"Numeric columns: {numeric_cols}") | |
| continue | |
| # Extract landmarks | |
| current_landmarks = {} | |
| # Try different landmark naming conventions | |
| for k in range(10): # Check up to 10 landmarks | |
| # Try different naming patterns | |
| patterns = [ | |
| (f'Landmark{k}_x', f'Landmark{k}_y'), | |
| (f'landmark_{k}_x', f'landmark_{k}_y'), | |
| (f'lm{k}_x', f'lm{k}_y'), | |
| (f'point{k}_x', f'point{k}_y') | |
| ] | |
| for x_col, y_col in patterns: | |
| if x_col in row and y_col in row: | |
| lm_x, lm_y = row[x_col], row[y_col] | |
| if lm_x is not None and lm_y is not None: | |
| current_landmarks[f'point{k}'] = [float(lm_x), float(lm_y)] | |
| break | |
| if box_coords: | |
| results.append(FaceDetectionResult(box=box_coords, landmarks=current_landmarks)) | |
| except Exception as row_error: | |
| logger.error(f"Error processing face {idx}: {row_error}") | |
| continue | |
| else: | |
| logger.info("No faces detected or empty DataFrame returned") | |
| logger.info(f"Successfully processed {len(results)} faces") | |
| print(f"Server: Detected {len(results)} faces.") | |
| return results | |
| except Exception as e: | |
| # THIS IS WHERE THE ERROR WILL BE PRINTED | |
| error_msg = f"Exception during face extraction: {e}" | |
| logger.error(f"Server ERROR: {error_msg}") | |
| print(f"Server ERROR: {error_msg}") | |
| import traceback | |
| traceback.print_exc() # Prints the full traceback to logs | |
| raise HTTPException(status_code=500, detail=f"Error processing image: {e}") | |
| finally: | |
| # Clean up the temporary file | |
| if os.path.exists(temp_image_path): | |
| os.remove(temp_image_path) | |
| logger.info(f"Cleaned up temporary file: {temp_image_path}") | |
| print(f"Server: Cleaned up temporary file: {temp_image_path}") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "model_loaded": global_face_detector is not None, | |
| "cache_dirs": { | |
| "feat_models": os.environ.get('FEAT_MODELS_DIR'), | |
| "matplotlib": os.environ.get('MPLCONFIGDIR'), | |
| "torch": os.environ.get('TORCH_HOME'), | |
| "hf_home": os.environ.get('HF_HOME') | |
| } | |
| } | |
| async def root(): | |
| """Root endpoint""" | |
| return { | |
| "message": "Face Detection API is running", | |
| "endpoints": ["/extract-faces/", "/health"], | |
| "model": "RetinaFace via py-feat" | |
| } |