File size: 1,607 Bytes
e28a7b2 d702071 e28a7b2 d702071 e28a7b2 d702071 e28a7b2 d702071 e28a7b2 d702071 e28a7b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import Any, List
from fastapi import FastAPI, Query
from pydantic import BaseModel, ConfigDict, Field
from .config import settings
from .duplicate_scheduler import DuplicateScheduler
from .repositories import (
MergeSuggestionRepository,
build_client,
)
logger = logging.getLogger("DuplicateAPI")
mongo_client = build_client()
suggestion_repository = MergeSuggestionRepository.from_client(mongo_client)
duplicate_scheduler = DuplicateScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
duplicate_scheduler.start(mongo_client)
logger.info("Duplicate scheduler started")
yield
# Shutdown
duplicate_scheduler.shutdown()
logger.info("Duplicate scheduler stopped")
app = FastAPI(
title="Expense Duplicate Service",
version="1.0.0",
description="Detects near-duplicate expense entries and proposes merge suggestions.",
lifespan=lifespan,
)
class Suggestion(BaseModel):
model_config = ConfigDict(populate_by_name=True)
id: str = Field(alias="_id")
candidate_ids: List[str]
message: str
details: dict[str, Any]
audit: dict[str, Any]
status: str
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/suggestions", response_model=List[Suggestion])
async def list_suggestions(limit: int = Query(50, ge=1, le=500)) -> List[Suggestion]:
docs = suggestion_repository.fetch_recent(limit)
return [Suggestion.model_validate(doc) for doc in docs]
|