image-captioning-api / backend /app /services /predictor_service.py
apoorvrajdev's picture
feat(api): build production-grade FastAPI inference backend
08f1adc
"""Service layer wrapping the ML ``CaptionPredictor``.
Why this exists between the route and the predictor:
* **Off-loop execution** β€” TensorFlow inference is sync and CPU-bound.
Running it inline blocks the event loop, so requests queue up
sequentially and event-loop-bound work (CORS, metrics, /healthz)
stalls. We push the call to a worker thread via ``anyio.to_thread``.
* **Stable seam for testing** β€” routes depend on this class, not on
the concrete predictor. Tests can substitute a stub service that
returns canned captions without loading TensorFlow.
* **Future extension point** β€” Phase 4 will add a request batcher and
per-model registry behind the same ``caption_image_bytes`` API.
This class never re-implements inference; it delegates entirely to the
existing ``CaptionPredictor`` abstraction.
"""
from __future__ import annotations
import time
from anyio import to_thread
from app.utils.image import bytes_to_tensor
from captioning.inference import CaptionPredictor
from captioning.utils import get_logger
log = get_logger(__name__)
class PredictorService:
"""Holds the singleton predictor and exposes async inference."""
def __init__(
self,
*,
predictor: CaptionPredictor,
model_version: str,
max_upload_bytes: int,
) -> None:
"""Args:
predictor: A ready ``CaptionPredictor`` (weights already loaded).
model_version: Semver string surfaced in responses & health.
max_upload_bytes: Hard cap enforced at the route layer.
"""
self._predictor = predictor
self._model_version = model_version
self._max_upload_bytes = max_upload_bytes
@property
def model_version(self) -> str:
return self._model_version
@property
def decode_strategy(self) -> str:
return self._predictor.decode_strategy
@property
def max_upload_bytes(self) -> int:
return self._max_upload_bytes
async def caption_image_bytes(self, image_bytes: bytes) -> tuple[str, float]:
"""Decode bytes, run inference, and return (caption, latency_ms).
Both the decode and the predict are offloaded to a worker thread so
the event loop stays responsive. Latency is measured around the
predict call only β€” decode timing belongs to a separate span if we
ever need it.
"""
tensor = await to_thread.run_sync(bytes_to_tensor, image_bytes)
start = time.perf_counter()
caption: str = await to_thread.run_sync(self._predictor.predict_tensor, tensor)
latency_ms = (time.perf_counter() - start) * 1000
log.info(
"inference_completed",
model_version=self._model_version,
decode_strategy=self.decode_strategy,
latency_ms=round(latency_ms, 2),
)
return caption, latency_ms