MukeshKapoor25's picture
feat(service-professional): Add OTP-based authentication system for service professionals
19a1450
"""
Main FastAPI application for AUTH Microservice.
"""
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError, BaseModel
from typing import Optional, List, Dict, Any
from jose import JWTError
from pymongo.errors import PyMongoError, ConnectionFailure, OperationFailure
from app.core.config import settings
from app.core.logging import setup_logging, get_logger
from app.nosql import connect_to_mongo, close_mongo_connection
from app.system_users.controllers.router import router as system_user_router
from app.auth.controllers.router import router as auth_router
from app.auth.controllers.staff_router import router as staff_router
from app.auth.controllers.customer_router import router as customer_router
from app.auth.controllers.service_professional_router import router as service_professional_router
from app.internal.router import router as internal_router
# Setup logging
setup_logging(
log_level=settings.LOG_LEVEL if hasattr(settings, 'LOG_LEVEL') else "INFO",
log_dir=settings.LOG_DIR if hasattr(settings, 'LOG_DIR') else "logs",
)
logger = get_logger(__name__)
# Standard error response models
class ErrorDetail(BaseModel):
"""Detailed error information"""
field: Optional[str] = None
message: str
type: Optional[str] = None
class ErrorResponse(BaseModel):
"""Standard error response format"""
success: bool = False
error: str
detail: str
errors: Optional[List[ErrorDetail]] = None
request_id: Optional[str] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan events"""
# Startup
logger.info("Starting AUTH Microservice")
await connect_to_mongo()
# Initialize customer authentication collections
try:
from app.auth.db_init_customer import init_customer_auth_collections
await init_customer_auth_collections()
logger.info("Customer authentication collections initialized")
except Exception as e:
logger.error(f"Failed to initialize customer auth collections: {e}")
logger.info("AUTH Microservice started successfully")
yield
# Shutdown
logger.info("Shutting down AUTH Microservice")
await close_mongo_connection()
logger.info("AUTH Microservice shut down successfully")
import os
# Create FastAPI app
app = FastAPI(
title="AUTH Microservice",
description="Authentication & Authorization System - User Management, Login, JWT Tokens & Security",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan,
root_path=os.getenv("ROOT_PATH", "")
)
# Request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all incoming requests and responses with timing."""
request_id = str(id(request))
start_time = time.time()
# Log request
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None,
"user_agent": request.headers.get("User-Agent", "")[:100]
}
)
# Process request
try:
response = await call_next(request)
# Calculate processing time
process_time = time.time() - start_time
# Log response
logger.info(
f"Request completed: {request.method} {request.url.path} - Status: {response.status_code}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time": f"{process_time:.3f}s"
}
)
# Add custom headers
response.headers["X-Process-Time"] = f"{process_time:.3f}"
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
process_time = time.time() - start_time
logger.error(
f"Request failed: {request.method} {request.url.path} - Error: {str(e)}",
exc_info=True,
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"process_time": f"{process_time:.3f}s"
}
)
raise
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global exception handlers
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle request validation errors with detailed field information."""
errors = []
for error in exc.errors():
field = " -> ".join(str(loc) for loc in error["loc"])
errors.append({
"field": field,
"message": error["msg"],
"type": error["type"]
})
logger.warning(f"Validation error on {request.url.path}: {errors}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"success": False,
"error": "Validation Error",
"detail": "The request contains invalid data",
"errors": errors
}
)
@app.exception_handler(ValidationError)
async def pydantic_validation_exception_handler(request: Request, exc: ValidationError):
"""Handle Pydantic validation errors."""
logger.warning(f"Pydantic validation error on {request.url.path}: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"success": False,
"error": "Data Validation Error",
"detail": str(exc),
"errors": exc.errors()
}
)
@app.exception_handler(JWTError)
async def jwt_exception_handler(request: Request, exc: JWTError):
"""Handle JWT token errors."""
logger.warning(f"JWT error on {request.url.path}: {str(exc)}")
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"success": False,
"error": "Authentication Error",
"detail": "Invalid or expired token",
"headers": {"WWW-Authenticate": "Bearer"}
}
)
@app.exception_handler(PyMongoError)
async def mongodb_exception_handler(request: Request, exc: PyMongoError):
"""Handle MongoDB errors."""
logger.error(f"MongoDB error on {request.url.path}: {str(exc)}", exc_info=True)
if isinstance(exc, ConnectionFailure):
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"success": False,
"error": "Database Connection Error",
"detail": "Unable to connect to the database. Please try again later."
}
)
elif isinstance(exc, OperationFailure):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"error": "Database Operation Error",
"detail": "A database operation failed. Please contact support if the issue persists."
}
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"error": "Database Error",
"detail": "An unexpected database error occurred."
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all uncaught exceptions."""
logger.error(
f"Unhandled exception on {request.method} {request.url.path}: {str(exc)}",
exc_info=True,
extra={
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None
}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"success": False,
"error": "Internal Server Error",
"detail": "An unexpected error occurred. Please try again later.",
"request_id": id(request) # Include request ID for tracking
}
)
# Health check endpoint
@app.get("/health", tags=["health"])
async def health_check():
"""
Health check endpoint.
Returns the service status and version.
"""
try:
return {
"status": "healthy",
"service": "auth-microservice",
"version": "1.0.0"
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"service": "auth-microservice",
"error": str(e)
}
)
# Debug endpoint to check database status
@app.get("/debug/db-status", tags=["debug"])
async def check_db_status():
"""
Check database connection and user count.
Returns information about database collections and sample data.
Raises:
HTTPException: 500 - Database connection error
"""
try:
from app.nosql import get_database
from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION, SCM_ACCESS_ROLES_COLLECTION
db = get_database()
if db is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection not available"
)
users_count = await db[AUTH_SYSTEM_USERS_COLLECTION].count_documents({})
roles_count = await db[AUTH_ACCESS_ROLES_COLLECTION].count_documents({})
scm_roles_count = await db[SCM_ACCESS_ROLES_COLLECTION].count_documents({})
# Get sample user to verify
sample_user = await db[AUTH_SYSTEM_USERS_COLLECTION].find_one(
{"email": "superadmin@cuatrolabs.com"},
{"email": 1, "username": 1, "role": 1, "status": 1, "_id": 0}
)
return {
"status": "connected",
"database": settings.MONGODB_DB_NAME,
"collections": {
"users": users_count,
"auth_roles": roles_count,
"scm_roles": scm_roles_count
},
"superadmin_exists": sample_user is not None,
"sample_user": sample_user if sample_user else None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Database status check failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to check database status: {str(e)}"
)
# Include routers with new organization
app.include_router(auth_router) # /auth/* - Core authentication endpoints
app.include_router(staff_router) # /staff/* - Staff authentication (mobile OTP)
app.include_router(customer_router) # /customer/* - Customer authentication (OTP)
app.include_router(service_professional_router) # /service-professional/* - Service professional authentication (OTP)
app.include_router(system_user_router) # /users/* - User management endpoints
app.include_router(internal_router) # /internal/* - Internal API endpoints
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8002,
reload=True,
log_level="info"
)