Spaces:
Sleeping
Sleeping
| """ | |
| Prediction endpoint. | |
| POST /predict → returns duration prediction + logs to monitor | |
| POST /feedback → submit delayed ground truth | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| import uuid | |
| from typing import TYPE_CHECKING | |
| import pandas as pd | |
| from fastapi import APIRouter, HTTPException, Request | |
| from src.api.schemas import ( | |
| PredictionRequest, | |
| PredictionResponse, | |
| FeedbackRequest, | |
| FeedbackResponse, | |
| ) | |
| from src.utils.config import resolve, settings | |
| from src.utils.logging_config import get_logger | |
| router = APIRouter(prefix="/predict", tags=["Prediction"]) | |
| log = get_logger(__name__) | |
| _PREDICTION_LOG = resolve(settings.api.prediction_log_path) | |
| async def predict(body: PredictionRequest, request: Request) -> PredictionResponse: | |
| """ | |
| Predict taxi trip duration from input features. | |
| The prediction is: | |
| 1. Logged to disk (for drift analysis) | |
| 2. Registered in the performance monitor (awaiting ground truth) | |
| """ | |
| app_state = request.app.state | |
| model = app_state.model | |
| preprocessor = app_state.preprocessor | |
| monitor = app_state.monitor | |
| if model is None: | |
| raise HTTPException(status_code=503, detail="No trained model available. Please train first.") | |
| features = body.model_dump() | |
| request_id = str(uuid.uuid4()) | |
| timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| df = pd.DataFrame([features]) | |
| X = preprocessor.transform(df) | |
| prediction = float(model.predict(X)[0]) | |
| prediction = max(1.0, round(prediction, 2)) | |
| # Log prediction | |
| log_entry = { | |
| "request_id": request_id, | |
| "timestamp": timestamp, | |
| "features": features, | |
| "prediction": prediction, | |
| "model_version": app_state.model_version, | |
| } | |
| with open(_PREDICTION_LOG, "a", encoding="utf-8") as fh: | |
| fh.write(json.dumps(log_entry) + "\n") | |
| # Register in performance monitor | |
| monitor.log_prediction(request_id, prediction, features) | |
| app_state.samples_since_last_retrain += 1 | |
| log.debug("Prediction: request_id=%s pred=%.2f min", request_id, prediction) | |
| return PredictionResponse( | |
| request_id=request_id, | |
| predicted_duration_min=prediction, | |
| model_version=app_state.model_version, | |
| timestamp=timestamp, | |
| ) | |
| async def submit_feedback(body: FeedbackRequest, request: Request) -> FeedbackResponse: | |
| """ | |
| Submit the delayed ground truth for a previous prediction. | |
| This simulates labels arriving hours after the prediction was made | |
| (e.g., after the trip completes and data is reconciled). | |
| """ | |
| monitor = request.app.state.monitor | |
| matched = monitor.log_ground_truth(body.request_id, body.actual_duration_min) | |
| if matched: | |
| msg = f"Ground truth matched for request_id={body.request_id}" | |
| else: | |
| msg = f"No pending prediction found for request_id={body.request_id}" | |
| return FeedbackResponse(request_id=body.request_id, matched=matched, message=msg) | |