| import sys |
| import os |
| import time |
| import logging |
| import traceback |
| from typing import List |
| from contextlib import asynccontextmanager |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import asyncio |
|
|
| |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| if _PROJECT_ROOT not in sys.path: |
| sys.path.insert(0, _PROJECT_ROOT) |
|
|
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
|
|
| import requests as http_requests |
|
|
| |
|
|
|
|
| from backend.models import ( |
| MatchRequest, |
| MatchResponse, |
| MatchResult, |
| BatchMatchRequest, |
| BatchMatchResponse, |
| HealthResponse, |
| ErrorResponse, |
| ) |
|
|
| from backend.matching_service import perform_match |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| force=True, |
| handlers=[logging.StreamHandler(sys.stdout)] |
| ) |
| logger = logging.getLogger("backend_server") |
|
|
|
|
| |
| |
| |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """ |
| Startup: |
| - Pre-warm embedding models (loaded at import time via model.py) |
| - Check CSV data |
| Shutdown: |
| - Nothing to close (CSV-based, no database connections) |
| """ |
| logger.info("=" * 60) |
| logger.info("Entity Matching backend β Starting up") |
| logger.info("=" * 60) |
|
|
| logger.info("Embedding models loaded (sentence-transformers).") |
|
|
| try: |
| from services.config import pin_city_state_df, name_variation_df |
| csv_loaded = not pin_city_state_df.empty |
| logger.info("CSV data source: %s (%d pincode rows)", |
| "OK" if csv_loaded else "EMPTY", |
| len(pin_city_state_df)) |
| except Exception as e: |
| logger.warning("CSV data source check failed: %s", e) |
|
|
| |
| logger.info("backend ready to serve requests") |
| logger.info("=" * 60) |
|
|
| yield |
|
|
| logger.info("Entity Matching backend β Shutting down") |
|
|
|
|
| |
| |
| |
| app = FastAPI( |
| title="Entity Matching backend", |
| description=( |
| "Gen AI Record-Level Entity Matching backend.\n\n" |
| "Compares two entity records and determines if they represent the same person/entity.\n\n" |
| "**Multi-value fields:** `addresses`, `phones`, and `emails` each accept a list " |
| "of any length. Matching uses best-of-N for addresses and any-match for phones/emails.\n\n" |
| "**Supported matching modes:**\n" |
| "- `embedding` (default): Sentence Transformers + Fuzzy matching\n" |
| |
| "**Input formats:**\n" |
| "- Nested (recommended for multiple values): pass `addresses`, `phones`, `emails` as lists\n" |
| "- Flat (single address/phone/email): pass uppercase keys like `ADDRESSLINE`, `PHONE`, `EMAIL`" |
| ), |
| version="8.0.0", |
| lifespan=lifespan, |
| docs_url="/docs", |
| redoc_url="/redoc", |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
| @app.middleware("http") |
| async def log_requests(request: Request, call_next): |
| """Log every request with timing.""" |
| start = time.perf_counter() |
| response = await call_next(request) |
| elapsed = (time.perf_counter() - start) * 1000 |
| logger.info( |
| "%s %s β %d (%.1f ms)", |
| request.method, |
| request.url.path, |
| response.status_code, |
| elapsed, |
| ) |
| return response |
|
|
|
|
| |
| |
| |
| @app.exception_handler(Exception) |
| async def global_exception_handler(request: Request, exc: Exception): |
| logger.error("Unhandled exception: %s\n%s", exc, traceback.format_exc()) |
| return JSONResponse( |
| status_code=500, |
| content={ |
| "success": False, |
| "error": "Internal server error", |
| "detail": str(exc), |
| }, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| @app.get( |
| "/backend/v1/health", |
| response_model=HealthResponse, |
| tags=["Health"], |
| summary="Full system health check", |
| ) |
| async def health_check(): |
| """Check the health of all system components.""" |
| components = {} |
|
|
| try: |
| from services.config import pin_city_state_df |
| components["csv_data"] = ( |
| "healthy" if not pin_city_state_df.empty else "unhealthy" |
| ) |
| except Exception as e: |
| components["csv_data"] = f"error: {e}" |
|
|
| try: |
| from services.model import MODEL_STORE |
| components["embedding_models"] = "healthy" if MODEL_STORE else "unhealthy" |
| except Exception as e: |
| components["embedding_models"] = f"error: {e}" |
|
|
| |
|
|
| overall = ( |
| "healthy" |
| if all(v == "healthy" for v in components.values() if v != "not_configured") |
| else "degraded" |
| ) |
| return HealthResponse(status=overall, version="8.0", components=components) |
|
|
|
|
|
|
|
|
| |
| @app.post( |
| "/backend/v1/match", |
| response_model=MatchResponse, |
| tags=["Matching"], |
| summary="Match two entity records", |
| responses={ |
| 200: {"description": "Successful matching result"}, |
| 400: {"model": ErrorResponse, "description": "Invalid input"}, |
| 500: {"model": ErrorResponse, "description": "Internal error"}, |
| }, |
| ) |
| async def match_records(request: MatchRequest): |
| """ |
| Compare two entity records and determine if they represent the same entity. |
| |
| **Multi-value fields:** |
| Pass `addresses`, `phones`, and `emails` as lists of any length: |
| ```json |
| { |
| "mode": "embedding", |
| "record1": { |
| "NAME": "RAJESH KUMAR SHARMA", |
| "dob": "15-01-1990", |
| "phones": ["9876543210", "9123456789"], |
| "addresses": [ |
| {"addressline": "123 MG Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560034"}, |
| {"addressline": "45 Brigade Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560025"} |
| ] |
| }, |
| "record2": { |
| "NAME": "RAJESH K SHARMA", |
| "dob": "15/01/1990", |
| "phones": ["9876543210"], |
| "addresses": [ |
| {"addressline": "123 Mahatma Gandhi Rd", "city": "Bengaluru", "state": "KA", "zipcode": "560034"} |
| ] |
| } |
| } |
| ``` |
| |
| **Matching strategy for lists:** |
| - `addresses`: best-of-N (highest score across all pair combinations) |
| - `phones`: any-match (match if any phone pair matches) |
| - `emails`: any-match (match if any email pair matches) |
| |
| **Modes:** |
| - `embedding` (default): Sentence Transformers + RbackenddFuzz |
| |
| """ |
| mode = request.mode.value |
|
|
| |
| t0 = time.perf_counter() |
| try: |
| |
| import json |
| print("\n\n" + "="*80) |
| print(f" NEW MATCH REQUEST RECEIVED (Mode: {mode})") |
| print("="*80) |
| print(f" RECORD 1 INPUT:\n{json.dumps(request.record1.model_dump(by_alias=True), indent=2)}") |
| print(f" RECORD 2 INPUT:\n{json.dumps(request.record2.model_dump(by_alias=True), indent=2)}") |
| print("-" * 80) |
|
|
| |
| |
| result = await asyncio.to_thread( |
| perform_match, request.record1, request.record2, mode=mode |
| ) |
| elapsed_ms = (time.perf_counter() - t0) * 1000 |
| logger.info( |
| "Match complete β decision=%s mode=%s time=%.1fms", |
| result["overall_decision"], mode, elapsed_ms, |
| ) |
| |
| |
| print("π€ MATCH RESULT OUT:\n" + json.dumps({ |
| "overall_decision": result["overall_decision"], |
| "reason": result["reason"], |
| "field_scores": result["field_scores"] |
| }, indent=2)) |
| print("="*80 + "\n\n") |
|
|
| return MatchResponse( |
| success=True, |
| result=MatchResult( |
| overall_decision=result["overall_decision"], |
| reason=result["reason"], |
| field_scores=result["field_scores"], |
| mode=mode, |
| ), |
| processing_time_ms=round(elapsed_ms, 2), |
| ) |
| except Exception as e: |
| elapsed_ms = (time.perf_counter() - t0) * 1000 |
| logger.error("Match failed: %s\n%s", e, traceback.format_exc()) |
| return MatchResponse( |
| success=False, |
| error=str(e), |
| processing_time_ms=round(elapsed_ms, 2), |
| ) |
|
|
|
|
|
|
| |
| |
| |
| @app.get("/", tags=["Info"], include_in_schema=False) |
| async def root(): |
| return { |
| "service": "Entity Matching backend", |
| "version": "8.0.0", |
| "docs": "/docs", |
| "health": "/backend/v1/health", |
| } |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run( |
| "backend.server:app", |
| host="0.0.0.0", |
| port=8000, |
| reload=True, |
| log_level="info", |
| ) |