kamau1's picture
refactor: remove reconciliation system and all related code, tasks, and docs
d12a170
"""
FastAPI Application Entry Point
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
import logging
# Import routers
from app.api.v1 import pages
from app.config import settings
# Setup logging - import configured logging
from app.core.logging import get_logger
from app.core.rate_limit import limiter
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
logger = get_logger(__name__)
# Get static files directory
static_dir = Path(__file__).parent / "static"
app = FastAPI(
title="SwiftOps API",
description="Field Service Management Platform",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# Add rate limiter
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Startup event
@app.on_event("startup")
async def startup_event():
"""Initialize the application on startup"""
from app.core.database import engine, SessionLocal
from sqlalchemy import text
logger.info("=" * 60)
logger.info(f"πŸš€ {settings.APP_NAME} v1.0.0 | {settings.ENVIRONMENT.upper()}")
logger.info("πŸ“Š Dashboard: Enabled")
logger.info("=" * 60)
# Database Health Check
logger.info("πŸ“¦ Database:")
try:
db = SessionLocal()
db.execute(text("SELECT 1"))
# Count tables
table_count = db.execute(text("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")).scalar()
# Count active users
user_count = db.execute(text(
"SELECT COUNT(*) FROM users WHERE deleted_at IS NULL"
)).scalar()
db.close()
logger.info(f" βœ“ Connected | {table_count} tables | {user_count} users")
except Exception as e:
logger.error(f" βœ— Connection failed | {str(e)[:50]}...")
# Redis Cache Check (initialize first, before external services)
logger.info("πŸ’Ύ Cache & Sessions:")
try:
from app.services.otp_service import OTPService
otp_service = OTPService() # Initialize singleton during startup
if otp_service.storage_type == 'redis':
# Test operations silently
test_key = "health_check_test"
otp_service.redis_client.setex(test_key, 5, "test")
test_result = otp_service.redis_client.get(test_key)
otp_service.redis_client.delete(test_key)
if test_result == "test":
logger.info(f" βœ“ Redis: Connected")
else:
logger.warning(f" ⚠ Redis: Connected but operations failed")
else:
logger.warning(f" β—‹ Redis: Using fallback storage")
except Exception as e:
logger.warning(f" ⚠ Redis: Connection failed")
# External Services Health Check
logger.info("πŸ”Œ External Services:")
try:
from app.core.health_checks import run_all_health_checks
health_results = run_all_health_checks()
# Cloudinary
status = "βœ“" if health_results["cloudinary"]["status"] == "Connected" else "β—‹"
logger.info(f" {status} Cloudinary: {health_results['cloudinary']['status']}")
# Resend (Email)
status = "βœ“" if health_results["resend"]["status"] == "Configured" else "β—‹"
logger.info(f" {status} Resend: {health_results['resend']['status']}")
# WASender (WhatsApp)
status = "βœ“" if health_results["wasender"]["status"] == "Connected" else "β—‹"
logger.info(f" {status} WASender: {health_results['wasender']['status']}")
# Supabase Storage
storage = health_results["supabase_storage"]
status = "βœ“" if storage["status"] == "Connected" else "β—‹"
bucket_info = f" | {storage.get('buckets', 0)} buckets" if storage["status"] == "Connected" else ""
logger.info(f" {status} Supabase: {storage['status']}{bucket_info}")
except Exception as e:
logger.warning(f" ⚠ External services check failed")
logger.info("=" * 60)
logger.info("βœ… Startup complete | Ready to serve requests")
logger.info("=" * 60)
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on application shutdown"""
logger.info("Shutting down application...")
logger.info("Shutdown complete")
# Mount static files
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
# CORS Configuration
import os
allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080,https://swiftops.atomio.tech").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include page routes (HTML templates)
app.include_router(pages.router)
# Include API routes
from app.api.v1.router import api_router
app.include_router(api_router, prefix="/api/v1")
# Health check endpoint (API)
@app.get("/health", tags=["Health"])
async def health_check():
"""
Health check endpoint - Returns system health status
"""
from app.core.database import SessionLocal
from sqlalchemy import text
health_status = {
"status": "healthy",
"version": "1.0.0",
"service": "SwiftOps API",
"environment": settings.ENVIRONMENT,
"components": {}
}
# Check Database
try:
db = SessionLocal()
db.execute(text("SELECT 1"))
table_count = db.execute(text("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")).scalar()
db.close()
health_status["components"]["database"] = {
"status": "connected",
"tables": table_count
}
except Exception:
health_status["components"]["database"] = {
"status": "error"
}
health_status["status"] = "degraded"
# Check Redis (uses existing singleton, no re-initialization)
try:
from app.services.otp_service import OTPService
otp_service = OTPService() # Gets existing singleton instance
health_status["components"]["redis"] = {
"status": "connected" if otp_service.storage_type == 'redis' else "fallback",
"storage_type": otp_service.storage_type
}
except Exception:
health_status["components"]["redis"] = {
"status": "error"
}
return health_status