pujithapsx's picture
initial push
e9084d7
import sys
import os
import time
import logging
import traceback
from typing import List
from contextlib import asynccontextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
# Ensure project root is importable
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import requests as http_requests
# -- Project imports ----------------------------------------------------------
from backend.models import (
MatchRequest,
MatchResponse,
MatchResult,
BatchMatchRequest,
BatchMatchResponse,
HealthResponse,
ErrorResponse,
)
from backend.matching_service import perform_match
# =========================================================
# LOGGING
# =========================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("backend_server")
# =========================================================
# LIFESPAN – startup / shutdown hooks
# =========================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Startup:
- Pre-warm embedding models (loaded at import time via model.py)
- Check CSV data
Shutdown:
- Nothing to close (CSV-based, no database connections)
"""
logger.info("=" * 60)
logger.info("Entity Matching backend – Starting up")
logger.info("=" * 60)
logger.info("Embedding models loaded (sentence-transformers).")
try:
from services.config import pin_city_state_df, name_variation_df
csv_loaded = not pin_city_state_df.empty
logger.info("CSV data source: %s (%d pincode rows)",
"OK" if csv_loaded else "EMPTY",
len(pin_city_state_df))
except Exception as e:
logger.warning("CSV data source check failed: %s", e)
logger.info("backend ready to serve requests")
logger.info("=" * 60)
yield # ── app is running ──
logger.info("Entity Matching backend – Shutting down")
# =========================================================
# APP INSTANCE
# =========================================================
app = FastAPI(
title="Entity Matching backend",
description=(
"Gen AI Record-Level Entity Matching backend.\n\n"
"Compares two entity records and determines if they represent the same person/entity.\n\n"
"**Multi-value fields:** `addresses`, `phones`, and `emails` each accept a list "
"of any length. Matching uses best-of-N for addresses and any-match for phones/emails.\n\n"
"**Supported matching modes:**\n"
"- `embedding` (default): Sentence Transformers + Fuzzy matching\n"
"**Input formats:**\n"
"- Nested (recommended for multiple values): pass `addresses`, `phones`, `emails` as lists\n"
"- Flat (single address/phone/email): pass uppercase keys like `ADDRESSLINE`, `PHONE`, `EMAIL`"
),
version="8.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# -- CORS middleware ----------------------------------------------------------
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =========================================================
# REQUEST LOGGING MIDDLEWARE
# =========================================================
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log every request with timing."""
start = time.perf_counter()
response = await call_next(request)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
"%s %s – %d (%.1f ms)",
request.method,
request.url.path,
response.status_code,
elapsed,
)
return response
# =========================================================
# GLOBAL EXCEPTION HANDLER
# =========================================================
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error("Unhandled exception: %s\n%s", exc, traceback.format_exc())
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "Internal server error",
"detail": str(exc),
},
)
# =========================================================
# ENDPOINTS
# =========================================================
# ── Health Checks ─────────────────────────────────────────────────────────────
@app.get(
"/backend/v1/health",
response_model=HealthResponse,
tags=["Health"],
summary="Full system health check",
)
async def health_check():
"""Check the health of all system components."""
components = {}
try:
from services.config import pin_city_state_df
components["csv_data"] = (
"healthy" if not pin_city_state_df.empty else "unhealthy"
)
except Exception as e:
components["csv_data"] = f"error: {e}"
try:
from services.model import MODEL_STORE
components["embedding_models"] = "healthy" if MODEL_STORE else "unhealthy"
except Exception as e:
components["embedding_models"] = f"error: {e}"
overall = (
"healthy"
if all(v == "healthy" for v in components.values() if v != "not_configured")
else "degraded"
)
return HealthResponse(status=overall, version="8.0", components=components)
# ── Single Match ──────────────────────────────────────────────────────────────
@app.post(
"/backend/v1/match",
response_model=MatchResponse,
tags=["Matching"],
summary="Match two entity records",
responses={
200: {"description": "Successful matching result"},
400: {"model": ErrorResponse, "description": "Invalid input"},
500: {"model": ErrorResponse, "description": "Internal error"},
},
)
async def match_records(request: MatchRequest):
"""
Compare two entity records and determine if they represent the same entity.
**Multi-value fields:**
Pass `addresses`, `phones`, and `emails` as lists of any length:
```json
{
"mode": "embedding",
"record1": {
"NAME": "RAJESH KUMAR SHARMA",
"dob": "15-01-1990",
"phones": ["9876543210", "9123456789"],
"addresses": [
{"addressline": "123 MG Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560034"},
{"addressline": "45 Brigade Road", "city": "Bangalore", "state": "Karnataka", "zipcode": "560025"}
]
},
"record2": {
"NAME": "RAJESH K SHARMA",
"dob": "15/01/1990",
"phones": ["9876543210"],
"addresses": [
{"addressline": "123 Mahatma Gandhi Rd", "city": "Bengaluru", "state": "KA", "zipcode": "560034"}
]
}
}
```
**Matching strategy for lists:**
- `addresses`: best-of-N (highest score across all pair combinations)
- `phones`: any-match (match if any phone pair matches)
- `emails`: any-match (match if any email pair matches)
**Modes:**
- `embedding` (default): Sentence Transformers + RbackenddFuzz
"""
mode = request.mode.value
t0 = time.perf_counter()
try:
# Pre-print to terminal specifically for user visibility
import json
print("\n\n" + "="*80)
print(f" NEW MATCH REQUEST RECEIVED (Mode: {mode})")
print("="*80)
print(f" RECORD 1 INPUT:\n{json.dumps(request.record1.model_dump(by_alias=True), indent=2)}")
print(f" RECORD 2 INPUT:\n{json.dumps(request.record2.model_dump(by_alias=True), indent=2)}")
print("-" * 80)
# perform_match is synchronous (CPU + IO bound); run in thread pool
# so it doesn't block the asyncio event loop.
result = await asyncio.to_thread(
perform_match, request.record1, request.record2, mode=mode
)
elapsed_ms = (time.perf_counter() - t0) * 1000
logger.info(
"Match complete β€” decision=%s mode=%s time=%.1fms",
result["overall_decision"], mode, elapsed_ms,
)
# Post-print to terminal specifically for user visibility
print("πŸ“€ MATCH RESULT OUT:\n" + json.dumps({
"overall_decision": result["overall_decision"],
"reason": result["reason"],
"field_scores": result["field_scores"]
}, indent=2))
print("="*80 + "\n\n")
return MatchResponse(
success=True,
result=MatchResult(
overall_decision=result["overall_decision"],
reason=result["reason"],
field_scores=result["field_scores"],
mode=mode,
),
processing_time_ms=round(elapsed_ms, 2),
)
except Exception as e:
elapsed_ms = (time.perf_counter() - t0) * 1000
logger.error("Match failed: %s\n%s", e, traceback.format_exc())
return MatchResponse(
success=False,
error=str(e),
processing_time_ms=round(elapsed_ms, 2),
)
# =========================================================
# ROOT / INFO
# =========================================================
@app.get("/", tags=["Info"], include_in_schema=False)
async def root():
return {
"service": "Entity Matching backend",
"version": "8.0.0",
"docs": "/docs",
"health": "/backend/v1/health",
}
# =========================================================
# MAIN (for direct execution: python backend/server.py)
# =========================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"backend.server:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info",
)