Spaces:
Running
Running
File size: 12,366 Bytes
7da1497 806bf3d 7da1497 806bf3d 7da1497 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 | """
PostgreSQL database connection and session management for App microservice.
Following TMS pattern with SQLAlchemy async engine.
"""
import logging
import ssl
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, text
from app.core.config import settings
logger = logging.getLogger(__name__)
# Database URL validation
DATABASE_URI = settings.POSTGRES_URI
if not DATABASE_URI:
logger.error("POSTGRES_URI is empty or missing from settings")
raise ValueError("POSTGRES_URI is not set. Check environment variables.")
# Parse and log connection details (with masked password)
def mask_connection_string(uri: str) -> str:
"""Mask password in connection string for safe logging"""
if not uri:
return "EMPTY"
try:
# Format: postgresql+asyncpg://user:password@host:port/database
if "@" in uri:
protocol_user_pass, host_db = uri.split("@", 1)
if "://" in protocol_user_pass:
protocol, user_pass = protocol_user_pass.split("://", 1)
if ":" in user_pass:
user, _ = user_pass.split(":", 1)
masked_uri = f"{protocol}://{user}:***@{host_db}"
else:
masked_uri = f"{protocol}://{user_pass}:***@{host_db}"
else:
masked_uri = f"{protocol_user_pass}:***@{host_db}"
else:
masked_uri = uri
return masked_uri
except Exception:
return "INVALID_FORMAT"
def parse_connection_details(uri: str) -> dict:
"""Parse connection string to extract host, port, database"""
try:
# Extract host:port/database from URI
if "@" in uri:
_, host_db = uri.split("@", 1)
if "/" in host_db:
host_port, database = host_db.split("/", 1)
if ":" in host_port:
host, port = host_port.split(":", 1)
else:
host = host_port
port = "5432"
return {
"host": host,
"port": port,
"database": database.split("?")[0] # Remove query params
}
except Exception:
pass
return {"host": "unknown", "port": "unknown", "database": "unknown"}
masked_uri = mask_connection_string(DATABASE_URI)
conn_details = parse_connection_details(DATABASE_URI)
logger.info(
"PostgreSQL connection configured",
extra={
"connection_string": masked_uri,
"host": conn_details["host"],
"port": conn_details["port"],
"database": conn_details["database"],
"ssl_mode": settings.POSTGRES_SSL_MODE
}
)
# Build connect args including optional SSL
CONNECT_ARGS = {
"server_settings": {
"application_name": "cuatrolabs-app-ms",
"jit": "off"
},
"command_timeout": 60,
"statement_cache_size": 0
}
mode = (settings.POSTGRES_SSL_MODE or "disable").lower()
if mode != "disable":
ssl_context: ssl.SSLContext
if mode == "verify-full":
ssl_context = ssl.create_default_context(cafile=settings.POSTGRES_SSL_ROOT_CERT) if settings.POSTGRES_SSL_ROOT_CERT else ssl.create_default_context()
if settings.POSTGRES_SSL_CERT and settings.POSTGRES_SSL_KEY:
try:
ssl_context.load_cert_chain(certfile=settings.POSTGRES_SSL_CERT, keyfile=settings.POSTGRES_SSL_KEY)
except Exception as e:
logger.warning("Failed to load client SSL cert/key for PostgreSQL", exc_info=e)
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
else:
# sslmode=require: encrypt but don't verify server cert
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
CONNECT_ARGS["ssl"] = ssl_context
logger.info("PostgreSQL SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE})
# Create async engine with connection pool settings
async_engine = create_async_engine(
DATABASE_URI,
echo=settings.DEBUG, # Enable SQL logging in debug mode
future=True,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,
connect_args=CONNECT_ARGS
)
# Create async session factory
async_session = sessionmaker(
async_engine,
expire_on_commit=False,
class_=AsyncSession
)
# Metadata for table creation
metadata = MetaData()
logger.info("PostgreSQL configuration loaded successfully")
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Lifecycle helpers
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def connect_to_database() -> None:
"""Initialize database connection when the application starts."""
import asyncio
# Log connection details at startup (once per worker)
print(f"\n{'='*70}")
print(f"[POSTGRES] Starting Database Connection")
print(f"{'='*70}")
print(f"[POSTGRES] Connection String: {masked_uri}")
print(f"[POSTGRES] Host: {conn_details['host']}")
print(f"[POSTGRES] Port: {conn_details['port']}")
print(f"[POSTGRES] Database: {conn_details['database']}")
print(f"[POSTGRES] SSL Mode: {settings.POSTGRES_SSL_MODE}")
print(f"{'='*70}\n")
# Log connection attempt start
logger.info(
"Starting PostgreSQL connection attempts",
extra={
"max_retries": settings.POSTGRES_CONNECT_MAX_RETRIES,
"initial_delay_ms": settings.POSTGRES_CONNECT_INITIAL_DELAY_MS,
"backoff_multiplier": settings.POSTGRES_CONNECT_BACKOFF_MULTIPLIER
}
)
print(f"[POSTGRES] Attempting to connect (max retries: {settings.POSTGRES_CONNECT_MAX_RETRIES})...")
attempts = 0
max_attempts = settings.POSTGRES_CONNECT_MAX_RETRIES
delay = settings.POSTGRES_CONNECT_INITIAL_DELAY_MS / 1000.0
last_error = None
while attempts < max_attempts:
try:
async with async_engine.begin() as conn:
await conn.execute(text("SELECT 1"))
logger.info("Successfully connected to PostgreSQL database")
print(f"[POSTGRES] β
Connection successful after {attempts + 1} attempt(s)")
return
except Exception as e:
last_error = e
attempts += 1
error_msg = str(e)
error_type = type(e).__name__
logger.warning(
"PostgreSQL connection attempt failed",
extra={
"attempt": attempts,
"max_attempts": max_attempts,
"retry_delay_ms": int(delay * 1000),
"error_type": error_type,
"error_message": error_msg[:200] # Truncate long errors
}
)
print(f"[POSTGRES] β Connection attempt {attempts}/{max_attempts} failed")
print(f"[POSTGRES] Error: {error_type}: {error_msg[:150]}")
if attempts < max_attempts:
print(f"[POSTGRES] Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
delay = min(delay * settings.POSTGRES_CONNECT_BACKOFF_MULTIPLIER, 30.0)
# All attempts failed
logger.error(
"Failed to connect to PostgreSQL after all retries",
extra={
"total_attempts": attempts,
"final_error_type": type(last_error).__name__,
"final_error": str(last_error)[:500]
},
exc_info=last_error
)
print(f"[POSTGRES] β FATAL: Failed to connect after {attempts} attempts")
print(f"[POSTGRES] Last error: {type(last_error).__name__}: {str(last_error)[:200]}")
print(f"[POSTGRES] Please check:")
print(f"[POSTGRES] 1. Database host is reachable: {conn_details['host']}")
print(f"[POSTGRES] 2. Database credentials are correct")
print(f"[POSTGRES] 3. SSL mode is appropriate: {settings.POSTGRES_SSL_MODE}")
print(f"[POSTGRES] 4. Firewall allows connections to port {conn_details['port']}")
raise last_error
async def disconnect_from_database() -> None:
"""Close database connection when the application shuts down."""
try:
await async_engine.dispose()
logger.info("Successfully disconnected from PostgreSQL database")
except Exception as e:
logger.exception("Error disconnecting from PostgreSQL database")
raise
async def enforce_trans_schema() -> None:
"""Enforce that all tables use the TRANS schema and validate schema compliance."""
try:
async with async_engine.begin() as conn:
# Ensure trans schema exists
await conn.execute(text("CREATE SCHEMA IF NOT EXISTS trans"))
logger.info("β
TRANS schema exists")
# Validate that all models use trans schema
from app.core.database import Base
# Import all models to ensure they're registered with Base
from app.order.models.model import SalesOrder, SalesOrderItem, SalesOrderAddress
# Validate schema compliance
non_trans_tables = []
for table in Base.metadata.tables.values():
if table.schema != 'trans':
non_trans_tables.append(f"{table.name} (schema: {table.schema})")
if non_trans_tables:
error_msg = f"β SCHEMA VIOLATION: The following tables are not using 'trans' schema: {', '.join(non_trans_tables)}"
logger.error(error_msg)
print(f"\n{'='*80}")
print(f"[SCHEMA ERROR] {error_msg}")
print(f"{'='*80}\n")
raise ValueError(error_msg)
logger.info("β
All App tables correctly use 'trans' schema")
print(f"[SCHEMA] β
All App tables correctly use 'trans' schema")
except Exception as e:
logger.exception("Error enforcing TRANS schema")
raise
async def create_tables() -> None:
"""Create all tables defined in models after enforcing schema compliance."""
try:
# First enforce schema compliance
await enforce_trans_schema()
from app.core.database import Base
# Import all models to ensure they're registered
from app.order.models.model import SalesOrder, SalesOrderItem, SalesOrderAddress
async with async_engine.begin() as conn:
# Create all tables (schema already validated)
await conn.run_sync(Base.metadata.create_all)
logger.info("β
Database tables created successfully in TRANS schema")
print(f"[SCHEMA] β
Database tables created successfully in TRANS schema")
except Exception as e:
logger.exception("Error creating database tables")
raise
try:
from app.postgres import (
connect_to_postgres as _connect_to_postgres,
close_postgres_connection as _close_postgres_connection,
get_postgres_connection as _get_postgres_connection,
release_postgres_connection as _release_postgres_connection,
is_postgres_connected as _is_postgres_connected,
)
async def connect_to_postgres() -> None:
await _connect_to_postgres()
async def close_postgres_connection() -> None:
await _close_postgres_connection()
async def get_postgres_connection():
return await _get_postgres_connection()
async def release_postgres_connection(conn) -> None:
await _release_postgres_connection(conn)
def is_postgres_connected() -> bool:
return _is_postgres_connected()
except Exception:
pass |