File size: 5,231 Bytes
be86a81 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
"""
Database Connection and Session Management
This module provides:
- Async engine for Neon PostgreSQL
- Session factory for database operations
- Base class for SQLAlchemy models
- Dependency injection for FastAPI
Uses async SQLAlchemy with asyncpg driver for optimal performance.
"""
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import text
from typing import AsyncGenerator
from src.config import get_settings
# Get settings
settings = get_settings()
# Create async engine
# pool_pre_ping=True: Check connection health before use
# pool_recycle=3600: Recycle connections after 1 hour (prevent stale connections)
engine = create_async_engine(
settings.database_url,
echo=settings.debug_mode, # Log SQL in debug mode
future=True,
pool_pre_ping=True,
pool_recycle=3600,
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
# Base class for all models
class Base(DeclarativeBase):
"""
Base class for SQLAlchemy models.
All models should inherit from this class.
Provides:
- Automatic table name generation (lowercase class name)
- Common columns via mixins (id, created_at, updated_at)
- Declarative mapping
"""
pass
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency injection for FastAPI routes.
Provides a database session that is automatically closed after use.
Yields:
AsyncSession: Database session for async operations
Example:
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def init_db() -> None:
"""
Initialize database connection.
Use this on application startup to verify database connectivity.
This does NOT create tables - use Alembic migrations for that.
Raises:
Exception: If database connection fails
"""
async with engine.begin() as conn:
# Test connection
await conn.execute(text("SELECT 1"))
async def close_db() -> None:
"""
Close database connections.
Use this on application shutdown to gracefully close connections.
"""
await engine.dispose()
# ============================================================================
# SECURITY DOCUMENTATION
# ============================================================================
"""
NEON POSTGRESQL SECURITY RULES
================================
1. DATABASE URL SECURITY
- DATABASE_URL contains sensitive credentials (username, password)
- MUST be loaded from environment variables only
- NEVER commit to version control
- MUST use SSL (sslmode=require in connection string)
2. CONNECTION POOLING
- SQLAlchemy manages connection pooling automatically
- pool_pre_ping checks connection health before use
- pool_recycle prevents stale connections
- Configure pool size based on your load
3. SESSION MANAGEMENT
- Use get_db() dependency injection for FastAPI routes
- Sessions are automatically committed on success
- Sessions are automatically rolled back on error
- Sessions are automatically closed after use
4. SQL INJECTION PREVENTION
- ALWAYS use SQLAlchemy ORM methods (never raw SQL)
- NEVER concatenate strings into queries
- ALWAYS use parameterized queries
- SQLAlchemy automatically escapes parameters
5. USER DATA ISOLATION
- Enforce user_id filtering at application layer
- Never assume database-level security is enough
- Always validate user owns the data they're accessing
- Implement defense in depth
EXAMPLE USAGE PATTERNS
========================
✅ CORRECT: Using ORM with parameterized queries
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
✅ CORRECT: Using dependency injection
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
✅ CORRECT: Explicit user filtering for data isolation
result = await db.execute(
select(Todo).where(Todo.user_id == current_user.id)
)
❌ WRONG: Raw SQL with string concatenation
sql = f"SELECT * FROM users WHERE id = '{user_id}'" # SQL injection risk!
await db.execute(text(sql))
❌ WRONG: Forgetting to filter by user_id
result = await db.execute(select(Todo)) # Returns ALL users' todos!
FOR MORE INFORMATION
- Neon Docs: https://neon.tech/docs
- SQLAlchemy Async: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
- FastAPI Database: https://fastapi.tiangolo.com/tutorial/dependencies/
"""
|