Spaces:
Sleeping
Sleeping
File size: 6,890 Bytes
759a1c9 3d7538d 759a1c9 700d42b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import os
import logging
from datetime import datetime
# Import refactored engines
from recommender_core import recommender
from ingestion_service import IngestionService
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- LIFESPAN MANAGER ---
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("⏳ Starting up... RecommenderCore is ready.")
yield
logger.info("🛑 Shutting down...")
# --- APP CONFIGURATION ---
app = FastAPI(
title="PPD Risk & Recommendation Engine",
version="1.5",
description="Advanced system with hybrid scoring, multi-field TF-IDF, and offline-first PubMed integration.",
lifespan=lifespan,
docs_url="/docs", # Swagger UI
redoc_url="/redoc" # ReDoc alternative
)
# --- CORS SETUP ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- DATA MODELS ---
class RecommendationRequest(BaseModel):
risk_level: str
symptoms_text: str
top_n: Optional[int] = 5
class APIResponse(BaseModel):
status: str
risk_assessment: str
recommendations: List[Dict[str, Any]]
# --- API ENDPOINTS ---
@app.get("/")
def health_check():
is_ready = recommender is not None and recommender.df is not None and not recommender.df.empty
return {"status": "online", "engine_ready": is_ready, "version": "1.5"}
@app.get("/api/health")
def api_health():
"""Detailed health check for container monitoring."""
try:
is_ready = recommender is not None and recommender.df is not None and not recommender.df.empty
db_connected = recommender.engine is not None
model_loaded = recommender.vectorizer is not None and recommender.tfidf_matrix is not None
return {
"status": "healthy" if is_ready else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": {
"database": "ok" if db_connected else "error",
"model": "ok" if model_loaded else "error",
"articles_loaded": len(recommender.df) if is_ready else 0
}
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
@app.get("/api/stats")
def get_stats():
"""System statistics for monitoring."""
try:
if recommender.df is None:
return {"error": "System not initialized"}
stats = {
"total_articles": len(recommender.df),
"articles_by_type": recommender.df['format_type'].value_counts().to_dict(),
"articles_by_risk": recommender.df['risk_level'].value_counts().to_dict(),
"model_vocabulary_size": len(recommender.vectorizer.vocabulary_) if recommender.vectorizer else 0,
"last_updated": datetime.now().isoformat()
}
return stats
except Exception as e:
logger.error(f"Stats error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/recommend", response_model=APIResponse)
def get_recommendations(request: RecommendationRequest):
"""
Main recommendation endpoint.
Uses hybrid scoring: Cosine Similarity + Exact Symptom Boost + Source Weighting + Recency Boost.
"""
try:
results = recommender.recommend_articles(
symptoms_text=request.symptoms_text,
crisis_level=request.risk_level,
top_n=request.top_n
)
return {
"status": "success",
"risk_assessment": request.risk_level,
"recommendations": results
}
except Exception as e:
logger.error(f"Recommendation error: {e}")
# Expose error for debugging during migration phase
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/article/{article_id}")
def get_article_content(article_id: int):
"""
Retrieves full article content.
Handles both direct contributor text and curated PubMed abstracts.
"""
article_data = recommender.get_article_by_id(article_id)
if not article_data:
raise HTTPException(status_code=404, detail="Article not found")
return {
"article_id": int(article_data['article_id']),
"title": str(article_data['title']),
"category": str(article_data['category']),
"format_type": str(article_data.get('format_type', 'text')),
"external_url": str(article_data.get('external_url')) if article_data.get('external_url') else None,
"content": str(article_data.get('content_raw') or article_data.get('content_clean'))
}
@app.post("/api/admin/rebuild-model")
def rebuild_model():
"""Admin endpoint to trigger a weighted TF-IDF rebuild."""
try:
service = IngestionService()
service.build_tfidf_model()
recommender.load_model()
return {"status": "success", "message": "Weighted TF-IDF model rebuilt and reloaded."}
except Exception as e:
logger.error(f"Rebuild error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/admin/trigger-ingestion")
def trigger_ingestion():
"""Admin endpoint to manually trigger PubMed ingestion."""
try:
service = IngestionService()
articles = service.fetch_from_pubmed("postpartum depression OR maternal mental health", limit=100)
if articles:
count = service.store_articles(articles)
service.build_tfidf_model()
recommender.load_model()
return {
"status": "success",
"message": f"Ingested {count} new articles and rebuilt model."
}
return {"status": "success", "message": "No new articles found."}
except Exception as e:
logger.error(f"Ingestion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/pubmed/search")
def search_pubmed(query: str = "postpartum depression", limit: int = 10):
"""
Real-time proxy to PubMed API.
Used by frontend to fetch fresh articles on demand.
"""
try:
service = IngestionService()
results = service.fetch_from_pubmed(query, limit)
return {
"status": "success",
"count": len(results),
"results": results
}
except Exception as e:
logger.error(f"PubMed search error: {e}")
raise HTTPException(status_code=503, detail="PubMed service unavailable")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |