from __future__ import annotations import logging from contextlib import asynccontextmanager from typing import Any, List from fastapi import FastAPI, Query from pydantic import BaseModel, ConfigDict, Field from .config import settings from .duplicate_scheduler import DuplicateScheduler from .repositories import ( MergeSuggestionRepository, build_client, ) logger = logging.getLogger("DuplicateAPI") mongo_client = build_client() suggestion_repository = MergeSuggestionRepository.from_client(mongo_client) duplicate_scheduler = DuplicateScheduler() @asynccontextmanager async def lifespan(app: FastAPI): # Startup duplicate_scheduler.start(mongo_client) logger.info("Duplicate scheduler started") yield # Shutdown duplicate_scheduler.shutdown() logger.info("Duplicate scheduler stopped") app = FastAPI( title="Expense Duplicate Service", version="1.0.0", description="Detects near-duplicate expense entries and proposes merge suggestions.", lifespan=lifespan, ) class Suggestion(BaseModel): model_config = ConfigDict(populate_by_name=True) id: str = Field(alias="_id") candidate_ids: List[str] message: str details: dict[str, Any] audit: dict[str, Any] status: str @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} @app.get("/suggestions", response_model=List[Suggestion]) async def list_suggestions(limit: int = Query(50, ge=1, le=500)) -> List[Suggestion]: docs = suggestion_repository.fetch_recent(limit) return [Suggestion.model_validate(doc) for doc in docs]