LogicGoInfotechSpaces's picture
Convert API to scheduler-based pattern for duplicate detection
d702071
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import Any, List
from fastapi import FastAPI, Query
from pydantic import BaseModel, ConfigDict, Field
from .config import settings
from .duplicate_scheduler import DuplicateScheduler
from .repositories import (
MergeSuggestionRepository,
build_client,
)
logger = logging.getLogger("DuplicateAPI")
mongo_client = build_client()
suggestion_repository = MergeSuggestionRepository.from_client(mongo_client)
duplicate_scheduler = DuplicateScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
duplicate_scheduler.start(mongo_client)
logger.info("Duplicate scheduler started")
yield
# Shutdown
duplicate_scheduler.shutdown()
logger.info("Duplicate scheduler stopped")
app = FastAPI(
title="Expense Duplicate Service",
version="1.0.0",
description="Detects near-duplicate expense entries and proposes merge suggestions.",
lifespan=lifespan,
)
class Suggestion(BaseModel):
model_config = ConfigDict(populate_by_name=True)
id: str = Field(alias="_id")
candidate_ids: List[str]
message: str
details: dict[str, Any]
audit: dict[str, Any]
status: str
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/suggestions", response_model=List[Suggestion])
async def list_suggestions(limit: int = Query(50, ge=1, le=500)) -> List[Suggestion]:
docs = suggestion_repository.fetch_recent(limit)
return [Suggestion.model_validate(doc) for doc in docs]