import io
import os
import sys
import tempfile
from typing import List
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
# Add the parent directory to sys.path to import bacsense_v2_package
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from bacsense_v2_package.inference import BacSense
from fastapi.responses import HTMLResponse
app = FastAPI(title="Bacsense 2.0 API")
# Setup CORS to allow requests from the React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Root landing page for Hugging Face Space visibility
@app.get("/", response_class=HTMLResponse)
async def root():
return """
BacSense v2 | Precision API
"""
# Catch-all route for SPA support (but exclude API endpoints)
@app.get("/{path_name:path}", response_class=HTMLResponse)
async def catch_all(path_name: str):
# Don't catch API endpoints - they should return 404 if not found
api_endpoints = ["docs", "redoc", "openapi.json", "health", "debug_model", "predict_batch", "api"]
if any(path_name.startswith(endpoint) for endpoint in api_endpoints):
raise HTTPException(status_code=404)
# For everything else, return the root page (useful for SPA routing)
return await root()
# Global classifier instance
CLASSIFIER = None
def get_classifier():
global CLASSIFIER
if CLASSIFIER is None:
model_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'bacsense_v2_package'))
CLASSIFIER = BacSense(model_dir=model_dir)
# Note: we skip explicit warmup here to save time; first call will be slightly slower
return CLASSIFIER
# Health check and Debugging endpoint
@app.get("/health")
async def health():
return {"status": "ok", "backend": "Hugging Face Space"}
@app.get("/debug_model")
async def debug_model():
"""Diagnostic endpoint to see why model loading might be failing."""
try:
# Check if directories exist
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
pkg = os.path.join(root, 'bacsense_v2_package')
exists = os.path.isdir(pkg)
# Check files
files = []
if exists:
files = os.listdir(pkg)
# Try a dummy import
try:
import tensorflow as tf
tf_version = tf.__version__
except Exception as e:
tf_version = f"Error: {e}"
# Try initializing (this might fail if deps are missing)
try:
get_classifier()
status = "Initialized OK"
error = None
except Exception as e:
status = "FAILED"
error = str(e)
import traceback
error += "\n" + traceback.format_exc()
return {
"root_dir": root,
"pkg_dir": pkg,
"pkg_exists": exists,
"pkg_files": files,
"tensorflow": tf_version,
"model_status": status,
"error_detail": error,
"sys_path": sys.path
}
except Exception as e:
return {"error": str(e)}
@app.post("/predict_batch")
async def predict_batch(files: List[UploadFile] = File(...)):
if not files or len(files) == 0:
raise HTTPException(status_code=400, detail="No files uploaded")
results = []
for file in files:
temp_path = None
try:
# Read the uploaded file into an IO stream
contents = await file.read()
# BacSense uses cv2.imread and PIL.Image.open with a file path, so we save it to disk temporarily
fd, temp_path = tempfile.mkstemp(suffix=".png")
with os.fdopen(fd, 'wb') as f:
f.write(contents)
# Process the image using lazy-loaded classifier
print(f"DEBUG: Processing file {file.filename}")
classifier = get_classifier()
result = classifier.predict(temp_path)
# Format probabilities for the frontend
confidence_pct = result["confidence"] * 100 if result["confidence"] <= 1.0 else result["confidence"]
results.append({
"filename": file.filename,
"success": True,
"prediction": result['prediction'],
"confidence": confidence_pct,
"probabilities": [
{"name": result['prediction'], "probability": confidence_pct}
],
"details": {
"gram_stain": result.get("gram", "Unknown"),
"shape": result.get("shape", "Unknown"),
"pathogenicity": result.get("risk", "Unknown")
}
})
except Exception as e:
import traceback
error_msg = f"{str(e)}\n{traceback.format_exc()}"
print(f"ERROR processing {file.filename}: {error_msg}")
results.append({
"filename": file.filename,
"success": False,
"error": str(e),
"trace": error_msg
})
finally:
# Clean up the temporary file
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
return {"results": results}
# Forced update at 02:26