PupaClic commited on
Commit ·
537cb9b
1
Parent(s): 7120b03
redis
Browse files- REDIS_CONNECTION_UPDATE.md +102 -0
- app/app.py +19 -7
- app/nosql.py +30 -47
REDIS_CONNECTION_UPDATE.md
ADDED
|
@@ -0,0 +1,102 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
# Redis Connection Update - EMS
|
| 2 |
+
|
| 3 |
+
## Summary
|
| 4 |
+
Updated EMS Redis and MongoDB connection handling to use centralized `insightfy_utils` connectors instead of direct motor/redis imports, making it consistent with other microservices.
|
| 5 |
+
|
| 6 |
+
## Changes Made
|
| 7 |
+
|
| 8 |
+
### 1. `app/nosql.py` - Refactored to Use Centralized Connectors
|
| 9 |
+
**Before:**
|
| 10 |
+
- Used `motor.motor_asyncio.AsyncIOMotorClient` directly
|
| 11 |
+
- Used `redis.asyncio.Redis` directly
|
| 12 |
+
- More verbose implementation
|
| 13 |
+
|
| 14 |
+
**After:**
|
| 15 |
+
- ✅ Uses `create_mongo_connection()` from `insightfy_utils.db.mongo_connector`
|
| 16 |
+
- ✅ Uses `create_redis_connection()` from `insightfy_utils.db.redis_connector`
|
| 17 |
+
- ✅ Cleaner, more maintainable code
|
| 18 |
+
- ✅ Consistent with ANS/TMS/RMS/MPMS pattern
|
| 19 |
+
- ✅ Maintains all health checks and lifecycle functions
|
| 20 |
+
|
| 21 |
+
### 2. `app/app.py` - Migrated to Modern Lifespan Pattern
|
| 22 |
+
**Before:**
|
| 23 |
+
```python
|
| 24 |
+
@app.on_event("startup")
|
| 25 |
+
async def startup():
|
| 26 |
+
await connect_to_database()
|
| 27 |
+
|
| 28 |
+
@app.on_event("shutdown")
|
| 29 |
+
async def shutdown():
|
| 30 |
+
await disconnect_from_database()
|
| 31 |
+
```
|
| 32 |
+
|
| 33 |
+
**After:**
|
| 34 |
+
```python
|
| 35 |
+
@asynccontextmanager
|
| 36 |
+
async def lifespan(app: FastAPI):
|
| 37 |
+
"""Application lifespan manager."""
|
| 38 |
+
# Startup
|
| 39 |
+
await connect_to_database() # SQL
|
| 40 |
+
await connect_stores() # MongoDB + Redis
|
| 41 |
+
|
| 42 |
+
yield
|
| 43 |
+
|
| 44 |
+
# Shutdown
|
| 45 |
+
await disconnect_from_database() # SQL
|
| 46 |
+
await disconnect_stores() # MongoDB + Redis
|
| 47 |
+
|
| 48 |
+
app.router.lifespan_context = lifespan
|
| 49 |
+
```
|
| 50 |
+
|
| 51 |
+
## Benefits
|
| 52 |
+
|
| 53 |
+
1. **Consistency**: Now matches the pattern used across all microservices
|
| 54 |
+
2. **Modern FastAPI**: Uses `@asynccontextmanager` instead of deprecated `@app.on_event()`
|
| 55 |
+
3. **Maintainability**: Uses centralized connectors from `insightfy_utils`
|
| 56 |
+
4. **Reliability**: Proper lifecycle management with health checks for all stores
|
| 57 |
+
5. **Less Code**: Removed verbose motor/redis imports
|
| 58 |
+
|
| 59 |
+
## Configuration
|
| 60 |
+
|
| 61 |
+
No changes to configuration - still supports both formats:
|
| 62 |
+
|
| 63 |
+
### Option 1: Single URL (Recommended)
|
| 64 |
+
```bash
|
| 65 |
+
CACHE_URL=redis://localhost:6379/0
|
| 66 |
+
```
|
| 67 |
+
|
| 68 |
+
### Option 2: Separate Components (Backward Compatible)
|
| 69 |
+
```bash
|
| 70 |
+
CACHE_URI=localhost:6379
|
| 71 |
+
CACHE_K=your_password # optional
|
| 72 |
+
CACHE_DB=0 # optional
|
| 73 |
+
```
|
| 74 |
+
|
| 75 |
+
## Migration Notes
|
| 76 |
+
|
| 77 |
+
- **No breaking changes** - all existing configurations work as before
|
| 78 |
+
- All existing imports remain unchanged
|
| 79 |
+
- NoSQL stores now properly managed in application lifecycle
|
| 80 |
+
- Migrated from deprecated `@app.on_event()` to modern `lifespan` pattern
|
| 81 |
+
|
| 82 |
+
## Testing
|
| 83 |
+
|
| 84 |
+
Verify the changes:
|
| 85 |
+
|
| 86 |
+
```bash
|
| 87 |
+
cd insightfy-bloom-ms-ems
|
| 88 |
+
python -m uvicorn app.app:app --reload
|
| 89 |
+
|
| 90 |
+
# Check logs for:
|
| 91 |
+
# - "Starting EMS application"
|
| 92 |
+
# - "MongoDB client initialized"
|
| 93 |
+
# - "Redis client initialized"
|
| 94 |
+
# - "Store connectivity OK (Mongo & Redis)"
|
| 95 |
+
# - "Database connections established"
|
| 96 |
+
```
|
| 97 |
+
|
| 98 |
+
## Related Files
|
| 99 |
+
- ✅ `app/nosql.py` - Updated to use centralized connectors
|
| 100 |
+
- ✅ `app/app.py` - Migrated to lifespan pattern
|
| 101 |
+
- ℹ️ `settings.py` - No changes needed
|
| 102 |
+
- ℹ️ All other files - No changes needed
|
app/app.py
CHANGED
|
@@ -86,14 +86,26 @@ async def health():
|
|
| 86 |
return {"status": "healthy", "service": "insightfy-bloom-ms-ems", "version": "1.0"}
|
| 87 |
|
| 88 |
# Ensure database connection is started/stopped with FastAPI lifecycle
|
|
|
|
| 89 |
from app.sql import connect_to_database, disconnect_from_database
|
|
|
|
| 90 |
|
| 91 |
-
@
|
| 92 |
-
async def
|
|
|
|
|
|
|
|
|
|
| 93 |
await connect_to_database()
|
| 94 |
-
|
| 95 |
-
|
| 96 |
-
|
| 97 |
-
|
|
|
|
|
|
|
|
|
|
| 98 |
await disconnect_from_database()
|
| 99 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 86 |
return {"status": "healthy", "service": "insightfy-bloom-ms-ems", "version": "1.0"}
|
| 87 |
|
| 88 |
# Ensure database connection is started/stopped with FastAPI lifecycle
|
| 89 |
+
from contextlib import asynccontextmanager
|
| 90 |
from app.sql import connect_to_database, disconnect_from_database
|
| 91 |
+
from app.nosql import connect_stores, disconnect_stores
|
| 92 |
|
| 93 |
+
@asynccontextmanager
|
| 94 |
+
async def lifespan(app: FastAPI):
|
| 95 |
+
"""Application lifespan manager."""
|
| 96 |
+
# Startup
|
| 97 |
+
logger.info("Starting EMS application")
|
| 98 |
await connect_to_database()
|
| 99 |
+
await connect_stores()
|
| 100 |
+
logger.info("Database connections established")
|
| 101 |
+
|
| 102 |
+
yield
|
| 103 |
+
|
| 104 |
+
# Shutdown
|
| 105 |
+
logger.info("Shutting down EMS application")
|
| 106 |
await disconnect_from_database()
|
| 107 |
+
await disconnect_stores()
|
| 108 |
+
logger.info("Application shutdown complete")
|
| 109 |
+
|
| 110 |
+
# Update app to use lifespan
|
| 111 |
+
app.router.lifespan_context = lifespan
|
app/nosql.py
CHANGED
|
@@ -1,15 +1,10 @@
|
|
| 1 |
# data_stores.py (or your current filename)
|
| 2 |
from __future__ import annotations
|
| 3 |
|
| 4 |
-
from
|
|
|
|
|
|
|
| 5 |
|
| 6 |
-
import motor.motor_asyncio
|
| 7 |
-
import redis.asyncio as redis
|
| 8 |
-
from redis.exceptions import RedisError
|
| 9 |
-
from insightfy_utils.db import MongoConnector, RedisConnector
|
| 10 |
-
from insightfy_utils.logging import setup_logging, get_logger
|
| 11 |
-
|
| 12 |
-
# Single source of truth
|
| 13 |
from settings import (
|
| 14 |
MONGO_URI,
|
| 15 |
MONGO_DB_NAME,
|
|
@@ -19,10 +14,6 @@ from settings import (
|
|
| 19 |
CACHE_DB,
|
| 20 |
)
|
| 21 |
|
| 22 |
-
# -----------------------------------------------------------------------------
|
| 23 |
-
# Logging (migrated to insightfy-utils)
|
| 24 |
-
# -----------------------------------------------------------------------------
|
| 25 |
-
setup_logging(level="INFO", format_type="json", app_name="insightfy-bloom-ms-ems-nosql")
|
| 26 |
logger = get_logger(__name__)
|
| 27 |
|
| 28 |
# -----------------------------------------------------------------------------
|
|
@@ -39,45 +30,39 @@ if not CACHE_URL and not CACHE_URI:
|
|
| 39 |
)
|
| 40 |
|
| 41 |
# -----------------------------------------------------------------------------
|
| 42 |
-
# MongoDB client
|
| 43 |
# -----------------------------------------------------------------------------
|
| 44 |
try:
|
| 45 |
-
mongo_client =
|
| 46 |
MONGO_URI,
|
| 47 |
-
|
| 48 |
-
|
| 49 |
-
|
|
|
|
| 50 |
)
|
| 51 |
-
|
| 52 |
-
logger.info("MongoDB client initialized", extra={"database": MONGO_DB_NAME})
|
| 53 |
except Exception as e:
|
| 54 |
-
logger.
|
| 55 |
raise
|
| 56 |
|
| 57 |
# -----------------------------------------------------------------------------
|
| 58 |
-
# Redis client
|
| 59 |
# -----------------------------------------------------------------------------
|
| 60 |
|
| 61 |
|
| 62 |
-
def _redis_from_settings()
|
| 63 |
-
"""
|
| 64 |
-
Prefer a single URL (CACHE_URL / REDIS_URL). If not provided,
|
| 65 |
-
fall back to CACHE_URI ("host:port") + optional password.
|
| 66 |
-
"""
|
| 67 |
if CACHE_URL:
|
| 68 |
-
|
| 69 |
return redis.from_url(CACHE_URL, decode_responses=True)
|
| 70 |
|
| 71 |
-
# Fallback: host:port (required), password optional
|
| 72 |
try:
|
| 73 |
host, port_str = CACHE_URI.split(":")
|
| 74 |
port = int(port_str)
|
| 75 |
except Exception:
|
| 76 |
-
raise ValueError(
|
| 77 |
-
"Invalid CACHE_URI format. Expected 'host:port', e.g., 'localhost:6379'."
|
| 78 |
-
)
|
| 79 |
|
| 80 |
-
return
|
| 81 |
host=host,
|
| 82 |
port=port,
|
| 83 |
password=CACHE_K or None,
|
|
@@ -88,31 +73,33 @@ def _redis_from_settings() -> redis.Redis:
|
|
| 88 |
|
| 89 |
try:
|
| 90 |
redis_client = _redis_from_settings()
|
| 91 |
-
logger.info("Redis client initialized
|
| 92 |
except Exception as e:
|
| 93 |
-
logger.
|
| 94 |
raise
|
| 95 |
|
| 96 |
# -----------------------------------------------------------------------------
|
| 97 |
-
#
|
| 98 |
# -----------------------------------------------------------------------------
|
| 99 |
|
| 100 |
|
| 101 |
async def ping_mongo() -> bool:
|
|
|
|
| 102 |
try:
|
| 103 |
await mongo_db.command("ping")
|
| 104 |
return True
|
| 105 |
except Exception:
|
| 106 |
-
logger.
|
| 107 |
return False
|
| 108 |
|
| 109 |
|
| 110 |
async def ping_redis() -> bool:
|
|
|
|
| 111 |
try:
|
| 112 |
pong = await redis_client.ping()
|
| 113 |
return bool(pong)
|
| 114 |
-
except
|
| 115 |
-
logger.
|
| 116 |
return False
|
| 117 |
|
| 118 |
# -----------------------------------------------------------------------------
|
|
@@ -121,29 +108,25 @@ async def ping_redis() -> bool:
|
|
| 121 |
|
| 122 |
|
| 123 |
async def connect_stores() -> None:
|
| 124 |
-
"""
|
| 125 |
-
No-op for motor/redis (lazy connections), but you can force a ping to fail fast.
|
| 126 |
-
"""
|
| 127 |
ok_mongo = await ping_mongo()
|
| 128 |
ok_redis = await ping_redis()
|
| 129 |
if not (ok_mongo and ok_redis):
|
| 130 |
raise RuntimeError("Store connectivity check failed (mongo or redis).")
|
| 131 |
-
logger.info("Store connectivity OK (Mongo & Redis)
|
| 132 |
|
| 133 |
|
| 134 |
async def disconnect_stores() -> None:
|
| 135 |
-
"""
|
| 136 |
-
Close drivers on shutdown.
|
| 137 |
-
"""
|
| 138 |
try:
|
| 139 |
mongo_client.close()
|
| 140 |
except Exception:
|
| 141 |
-
logger.
|
| 142 |
|
| 143 |
try:
|
| 144 |
await redis_client.close()
|
| 145 |
except Exception:
|
| 146 |
-
logger.
|
| 147 |
|
| 148 |
# Alias for backward compatibility
|
| 149 |
db = mongo_db
|
|
|
|
| 1 |
# data_stores.py (or your current filename)
|
| 2 |
from __future__ import annotations
|
| 3 |
|
| 4 |
+
from insightfy_utils.logging import get_logger
|
| 5 |
+
from insightfy_utils.db.mongo_connector import create_mongo_connection
|
| 6 |
+
from insightfy_utils.db.redis_connector import create_redis_connection
|
| 7 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 8 |
from settings import (
|
| 9 |
MONGO_URI,
|
| 10 |
MONGO_DB_NAME,
|
|
|
|
| 14 |
CACHE_DB,
|
| 15 |
)
|
| 16 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 17 |
logger = get_logger(__name__)
|
| 18 |
|
| 19 |
# -----------------------------------------------------------------------------
|
|
|
|
| 30 |
)
|
| 31 |
|
| 32 |
# -----------------------------------------------------------------------------
|
| 33 |
+
# MongoDB client
|
| 34 |
# -----------------------------------------------------------------------------
|
| 35 |
try:
|
| 36 |
+
mongo_client, mongo_db = create_mongo_connection(
|
| 37 |
MONGO_URI,
|
| 38 |
+
MONGO_DB_NAME,
|
| 39 |
+
server_selection_timeout_ms=60_000,
|
| 40 |
+
socket_timeout_ms=60_000,
|
| 41 |
+
connect_timeout_ms=60_000,
|
| 42 |
)
|
| 43 |
+
logger.info("MongoDB client initialized", extra={"db_name": MONGO_DB_NAME})
|
|
|
|
| 44 |
except Exception as e:
|
| 45 |
+
logger.error("Failed to initialize MongoDB client", exc_info=e)
|
| 46 |
raise
|
| 47 |
|
| 48 |
# -----------------------------------------------------------------------------
|
| 49 |
+
# Redis client
|
| 50 |
# -----------------------------------------------------------------------------
|
| 51 |
|
| 52 |
|
| 53 |
+
def _redis_from_settings():
|
| 54 |
+
"""Create Redis client from settings."""
|
|
|
|
|
|
|
|
|
|
| 55 |
if CACHE_URL:
|
| 56 |
+
import redis.asyncio as redis
|
| 57 |
return redis.from_url(CACHE_URL, decode_responses=True)
|
| 58 |
|
|
|
|
| 59 |
try:
|
| 60 |
host, port_str = CACHE_URI.split(":")
|
| 61 |
port = int(port_str)
|
| 62 |
except Exception:
|
| 63 |
+
raise ValueError("Invalid CACHE_URI format. Expected 'host:port'")
|
|
|
|
|
|
|
| 64 |
|
| 65 |
+
return create_redis_connection(
|
| 66 |
host=host,
|
| 67 |
port=port,
|
| 68 |
password=CACHE_K or None,
|
|
|
|
| 73 |
|
| 74 |
try:
|
| 75 |
redis_client = _redis_from_settings()
|
| 76 |
+
logger.info("Redis client initialized")
|
| 77 |
except Exception as e:
|
| 78 |
+
logger.error("Failed to initialize Redis client", exc_info=e)
|
| 79 |
raise
|
| 80 |
|
| 81 |
# -----------------------------------------------------------------------------
|
| 82 |
+
# Health check helpers
|
| 83 |
# -----------------------------------------------------------------------------
|
| 84 |
|
| 85 |
|
| 86 |
async def ping_mongo() -> bool:
|
| 87 |
+
"""Check MongoDB connectivity."""
|
| 88 |
try:
|
| 89 |
await mongo_db.command("ping")
|
| 90 |
return True
|
| 91 |
except Exception:
|
| 92 |
+
logger.error("Mongo ping failed", exc_info=True)
|
| 93 |
return False
|
| 94 |
|
| 95 |
|
| 96 |
async def ping_redis() -> bool:
|
| 97 |
+
"""Check Redis connectivity."""
|
| 98 |
try:
|
| 99 |
pong = await redis_client.ping()
|
| 100 |
return bool(pong)
|
| 101 |
+
except Exception:
|
| 102 |
+
logger.error("Redis ping failed", exc_info=True)
|
| 103 |
return False
|
| 104 |
|
| 105 |
# -----------------------------------------------------------------------------
|
|
|
|
| 108 |
|
| 109 |
|
| 110 |
async def connect_stores() -> None:
|
| 111 |
+
"""Verify store connectivity on startup."""
|
|
|
|
|
|
|
| 112 |
ok_mongo = await ping_mongo()
|
| 113 |
ok_redis = await ping_redis()
|
| 114 |
if not (ok_mongo and ok_redis):
|
| 115 |
raise RuntimeError("Store connectivity check failed (mongo or redis).")
|
| 116 |
+
logger.info("Store connectivity OK (Mongo & Redis)")
|
| 117 |
|
| 118 |
|
| 119 |
async def disconnect_stores() -> None:
|
| 120 |
+
"""Close connections on shutdown."""
|
|
|
|
|
|
|
| 121 |
try:
|
| 122 |
mongo_client.close()
|
| 123 |
except Exception:
|
| 124 |
+
logger.error("Error closing Mongo client", exc_info=True)
|
| 125 |
|
| 126 |
try:
|
| 127 |
await redis_client.close()
|
| 128 |
except Exception:
|
| 129 |
+
logger.error("Error closing Redis client", exc_info=True)
|
| 130 |
|
| 131 |
# Alias for backward compatibility
|
| 132 |
db = mongo_db
|