import sys import os import time import logging import traceback from typing import List from contextlib import asynccontextmanager from concurrent.futures import ThreadPoolExecutor, as_completed import asyncio # Ensure project root is importable _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import requests as http_requests # -- Project imports ---------------------------------------------------------- from backend.models import ( MatchRequest, MatchResponse, MatchResult, BatchMatchRequest, BatchMatchResponse, HealthResponse, ErrorResponse, ) from backend.matching_service import perform_match # ========================================================= # LOGGING # ========================================================= logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", force=True, handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("backend_server") # ========================================================= # LIFESPAN – startup / shutdown hooks # ========================================================= @asynccontextmanager async def lifespan(app: FastAPI): """ Startup: - Pre-warm embedding models (loaded at import time via model.py) - Check CSV data Shutdown: - Nothing to close (CSV-based, no database connections) """ logger.info("=" * 60) logger.info("Entity Matching backend – Starting up") logger.info("=" * 60) logger.info("Embedding models loaded (sentence-transformers).") try: from services.config import pin_city_state_df, name_variation_df csv_loaded = not pin_city_state_df.empty logger.info("CSV data source: %s (%d pincode rows)", "OK" if csv_loaded else "EMPTY", len(pin_city_state_df)) except Exception as e: logger.warning("CSV data source check failed: %s", e) logger.info("backend ready to serve requests") logger.info("=" * 60) yield # ── app is running ── logger.info("Entity Matching backend – Shutting down") # ========================================================= # APP INSTANCE # ========================================================= app = FastAPI( title="Entity Matching backend", description=( "Gen AI Record-Level Entity Matching backend.\n\n" "Compares two entity records and determines if they represent the same person/entity.\n\n" "**Multi-value fields:** `addresses`, `phones`, and `emails` each accept a list " "of any length. Matching uses best-of-N for addresses and any-match for phones/emails.\n\n" "**Supported matching modes:**\n" "- `embedding` (default): Sentence Transformers + Fuzzy matching\n" "**Input formats:**\n" "- Nested (recommended for multiple values): pass `addresses`, `phones`, `emails` as lists\n" "- Flat (single address/phone/email): pass uppercase keys like `ADDRESSLINE`, `PHONE`, `EMAIL`" ), version="8.0.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc", ) # -- CORS middleware ---------------------------------------------------------- app.add_middleware( CORSMiddleware, allow_origins=["*"], # Restrict in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========================================================= # REQUEST LOGGING MIDDLEWARE # ========================================================= @app.middleware("http") async def log_requests(request: Request, call_next): """Log every request with timing.""" start = time.perf_counter() response = await call_next(request) elapsed = (time.perf_counter() - start) * 1000 logger.info( "%s %s – %d (%.1f ms)", request.method, request.url.path, response.status_code, elapsed, ) return response # ========================================================= # GLOBAL EXCEPTION HANDLER # ========================================================= @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error("Unhandled exception: %s\n%s", exc, traceback.format_exc()) return JSONResponse( status_code=500, content={ "success": False, "error": "Internal server error", "detail": str(exc), }, ) # ========================================================= # ENDPOINTS # ========================================================= # ── Health Checks ───────────────────────────────────────────────────────────── @app.get( "/backend/v1/health", response_model=HealthResponse, tags=["Health"], summary="Full system health check", ) async def health_check(): """Check the health of all system components.""" components = {} try: from services.config import pin_city_state_df components["csv_data"] = ( "healthy" if not pin_city_state_df.empty else "unhealthy" ) except Exception as e: components["csv_data"] = f"error: {e}" try: from services.model import MODEL_STORE components["embedding_models"] = "healthy" if MODEL_STORE else "unhealthy" except Exception as e: components["embedding_models"] = f"error: {e}" overall = ( "healthy" if all(v == "healthy" for v in components.values() if v != "not_configured") else "degraded" ) return HealthResponse(status=overall, version="8.0", components=components) # ── Single Match ────────────────────────────────────────────────────────────── @app.post( "/backend/v1/match", response_model=MatchResponse, tags=["Matching"], summary="Match two entity records", responses={ 200: {"description": "Successful matching result"}, 400: {"model": ErrorResponse, "description": "Invalid input"}, 500: {"model": ErrorResponse, "description": "Internal error"}, }, ) async def match_records(request: MatchRequest): """ Compare two entity records and determine if they represent the same entity. **Multi-value fields:** Pass `addresses`, `phones`, and `emails` as lists of any length: ```json { "mode": "embedding", "record1": { "NAME": "RAJESH KUMAR SHARMA", "dob": "15-01-1990", "phones": ["9876543210", "9123456789"], "addresses": [ {"addressline": "123 MG Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560034"}, {"addressline": "45 Brigade Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560025"} ] }, "record2": { "NAME": "RAJESH K SHARMA", "dob": "15/01/1990", "phones": ["9876543210"], "addresses": [ {"addressline": "123 Mahatma Gandhi Rd", "city": "Bengaluru", "state": "KA", "zipcode": "560034"} ] } } ``` **Matching strategy for lists:** - `addresses`: best-of-N (highest score across all pair combinations) - `phones`: any-match (match if any phone pair matches) - `emails`: any-match (match if any email pair matches) **Modes:** - `embedding` (default): Sentence Transformers + RbackenddFuzz """ mode = request.mode.value t0 = time.perf_counter() try: # Pre-print to terminal specifically for user visibility import json print("\n\n" + "="*80) print(f" NEW MATCH REQUEST RECEIVED (Mode: {mode})") print("="*80) print(f" RECORD 1 INPUT:\n{json.dumps(request.record1.model_dump(by_alias=True), indent=2)}") print(f" RECORD 2 INPUT:\n{json.dumps(request.record2.model_dump(by_alias=True), indent=2)}") print("-" * 80) # perform_match is synchronous (CPU + IO bound); run in thread pool # so it doesn't block the asyncio event loop. result = await asyncio.to_thread( perform_match, request.record1, request.record2, mode=mode ) elapsed_ms = (time.perf_counter() - t0) * 1000 logger.info( "Match complete — decision=%s mode=%s time=%.1fms", result["overall_decision"], mode, elapsed_ms, ) # Post-print to terminal specifically for user visibility print("📤 MATCH RESULT OUT:\n" + json.dumps({ "overall_decision": result["overall_decision"], "reason": result["reason"], "field_scores": result["field_scores"] }, indent=2)) print("="*80 + "\n\n") return MatchResponse( success=True, result=MatchResult( overall_decision=result["overall_decision"], reason=result["reason"], field_scores=result["field_scores"], mode=mode, ), processing_time_ms=round(elapsed_ms, 2), ) except Exception as e: elapsed_ms = (time.perf_counter() - t0) * 1000 logger.error("Match failed: %s\n%s", e, traceback.format_exc()) return MatchResponse( success=False, error=str(e), processing_time_ms=round(elapsed_ms, 2), ) # ========================================================= # ROOT / INFO # ========================================================= @app.get("/", tags=["Info"], include_in_schema=False) async def root(): return { "service": "Entity Matching backend", "version": "8.0.0", "docs": "/docs", "health": "/backend/v1/health", } # ========================================================= # MAIN (for direct execution: python backend/server.py) # ========================================================= if __name__ == "__main__": import uvicorn uvicorn.run( "backend.server:app", host="0.0.0.0", port=8000, reload=True, log_level="info", )