import os import asyncio from typing import List from fastapi import FastAPI from pydantic import BaseModel MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") app = FastAPI(title="Embedder Service", version="1.0.0") class EmbedRequest(BaseModel): texts: List[str] class EmbedResponse(BaseModel): vectors: List[List[float]] model: str _model = None _model_lock = asyncio.Lock() _sequential_gate = asyncio.Semaphore(1) # ensure one job at a time def _lazy_load_model(): global _model if _model is None: # Lazy import to keep container startup light from sentence_transformers import SentenceTransformer _model = SentenceTransformer(MODEL_NAME) @app.get("/health") async def health(): return {"ok": True, "model": MODEL_NAME, "loaded": _model is not None} @app.post("/embed", response_model=EmbedResponse) async def embed(req: EmbedRequest): # Simple sequential queueing: only one request processes at a time async with _sequential_gate: # Protect model initialization under a lock to avoid concurrent loads async with _model_lock: _lazy_load_model() # Actual encoding # sentence-transformers encode is sync; run in thread pool so we don't block loop loop = asyncio.get_event_loop() vectors = await loop.run_in_executor(None, lambda: _model.encode(req.texts, show_progress_bar=False, normalize_embeddings=True).tolist()) return EmbedResponse(vectors=vectors, model=MODEL_NAME) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run(app, host="0.0.0.0", port=port)