import io import os import sys import tempfile from typing import List from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware # Add the parent directory to sys.path to import bacsense_v2_package sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from bacsense_v2_package.inference import BacSense from fastapi.responses import HTMLResponse app = FastAPI(title="Bacsense 2.0 API") # Setup CORS to allow requests from the React frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Root landing page for Hugging Face Space visibility @app.get("/", response_class=HTMLResponse) async def root(): return """ BacSense v2 | Precision API

🦠 BacSense v2 API

The microbial classification engine is online and ready.

View API Documentation
""" # Catch-all route for SPA support (but exclude API endpoints) @app.get("/{path_name:path}", response_class=HTMLResponse) async def catch_all(path_name: str): # Don't catch API endpoints - they should return 404 if not found api_endpoints = ["docs", "redoc", "openapi.json", "health", "debug_model", "predict_batch", "api"] if any(path_name.startswith(endpoint) for endpoint in api_endpoints): raise HTTPException(status_code=404) # For everything else, return the root page (useful for SPA routing) return await root() # Global classifier instance CLASSIFIER = None def get_classifier(): global CLASSIFIER if CLASSIFIER is None: model_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'bacsense_v2_package')) CLASSIFIER = BacSense(model_dir=model_dir) # Note: we skip explicit warmup here to save time; first call will be slightly slower return CLASSIFIER # Health check and Debugging endpoint @app.get("/health") async def health(): return {"status": "ok", "backend": "Hugging Face Space"} @app.get("/debug_model") async def debug_model(): """Diagnostic endpoint to see why model loading might be failing.""" try: # Check if directories exist root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) pkg = os.path.join(root, 'bacsense_v2_package') exists = os.path.isdir(pkg) # Check files files = [] if exists: files = os.listdir(pkg) # Try a dummy import try: import tensorflow as tf tf_version = tf.__version__ except Exception as e: tf_version = f"Error: {e}" # Try initializing (this might fail if deps are missing) try: get_classifier() status = "Initialized OK" error = None except Exception as e: status = "FAILED" error = str(e) import traceback error += "\n" + traceback.format_exc() return { "root_dir": root, "pkg_dir": pkg, "pkg_exists": exists, "pkg_files": files, "tensorflow": tf_version, "model_status": status, "error_detail": error, "sys_path": sys.path } except Exception as e: return {"error": str(e)} @app.post("/predict_batch") async def predict_batch(files: List[UploadFile] = File(...)): if not files or len(files) == 0: raise HTTPException(status_code=400, detail="No files uploaded") results = [] for file in files: temp_path = None try: # Read the uploaded file into an IO stream contents = await file.read() # BacSense uses cv2.imread and PIL.Image.open with a file path, so we save it to disk temporarily fd, temp_path = tempfile.mkstemp(suffix=".png") with os.fdopen(fd, 'wb') as f: f.write(contents) # Process the image using lazy-loaded classifier print(f"DEBUG: Processing file {file.filename}") classifier = get_classifier() result = classifier.predict(temp_path) # Format probabilities for the frontend confidence_pct = result["confidence"] * 100 if result["confidence"] <= 1.0 else result["confidence"] results.append({ "filename": file.filename, "success": True, "prediction": result['prediction'], "confidence": confidence_pct, "probabilities": [ {"name": result['prediction'], "probability": confidence_pct} ], "details": { "gram_stain": result.get("gram", "Unknown"), "shape": result.get("shape", "Unknown"), "pathogenicity": result.get("risk", "Unknown") } }) except Exception as e: import traceback error_msg = f"{str(e)}\n{traceback.format_exc()}" print(f"ERROR processing {file.filename}: {error_msg}") results.append({ "filename": file.filename, "success": False, "error": str(e), "trace": error_msg }) finally: # Clean up the temporary file if temp_path and os.path.exists(temp_path): os.remove(temp_path) return {"results": results} # Forced update at 02:26