|
|
from __future__ import annotations |
|
|
|
|
|
import logging |
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Any, List |
|
|
|
|
|
from fastapi import FastAPI, Query |
|
|
from pydantic import BaseModel, ConfigDict, Field |
|
|
|
|
|
from .config import settings |
|
|
from .duplicate_scheduler import DuplicateScheduler |
|
|
from .repositories import ( |
|
|
MergeSuggestionRepository, |
|
|
build_client, |
|
|
) |
|
|
|
|
|
logger = logging.getLogger("DuplicateAPI") |
|
|
|
|
|
mongo_client = build_client() |
|
|
suggestion_repository = MergeSuggestionRepository.from_client(mongo_client) |
|
|
duplicate_scheduler = DuplicateScheduler() |
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
|
|
|
duplicate_scheduler.start(mongo_client) |
|
|
logger.info("Duplicate scheduler started") |
|
|
yield |
|
|
|
|
|
duplicate_scheduler.shutdown() |
|
|
logger.info("Duplicate scheduler stopped") |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Expense Duplicate Service", |
|
|
version="1.0.0", |
|
|
description="Detects near-duplicate expense entries and proposes merge suggestions.", |
|
|
lifespan=lifespan, |
|
|
) |
|
|
|
|
|
|
|
|
class Suggestion(BaseModel): |
|
|
model_config = ConfigDict(populate_by_name=True) |
|
|
|
|
|
id: str = Field(alias="_id") |
|
|
candidate_ids: List[str] |
|
|
message: str |
|
|
details: dict[str, Any] |
|
|
audit: dict[str, Any] |
|
|
status: str |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health() -> dict[str, str]: |
|
|
return {"status": "ok"} |
|
|
|
|
|
|
|
|
@app.get("/suggestions", response_model=List[Suggestion]) |
|
|
async def list_suggestions(limit: int = Query(50, ge=1, le=500)) -> List[Suggestion]: |
|
|
docs = suggestion_repository.fetch_recent(limit) |
|
|
return [Suggestion.model_validate(doc) for doc in docs] |
|
|
|
|
|
|