File size: 7,334 Bytes
fd50325 2278049 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """
Database Configuration for DetectifAI Backend
This module handles connections to MongoDB Atlas and S3-compatible object storage
(Backblaze B2) for the DetectifAI system.
It provides centralized configuration and connection management.
"""
import os
from pymongo import MongoClient
from minio import Minio
from minio.error import S3Error
from dotenv import load_dotenv
import logging
from datetime import timedelta
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class DatabaseConfig:
"""Configuration class for database connections"""
def __init__(self):
# MongoDB Atlas connection (same as frontend)
self.mongo_uri = os.getenv(
'MONGO_URI',
'mongodb+srv://detectifai_user:DetectifAI123@cluster0.6f9uj.mongodb.net/detectifai?retryWrites=true&w=majority&appName=Cluster0'
)
self.mongo_db_name = 'detectifai'
# S3-compatible object storage (Backblaze B2)
self.minio_endpoint = os.getenv('MINIO_ENDPOINT', 's3.eu-central-003.backblazeb2.com')
self.minio_access_key = os.getenv('MINIO_ACCESS_KEY', '00367479ffb7e4e0000000001')
self.minio_secret_key = os.getenv('MINIO_SECRET_KEY', 'K003opTvf92ijRj5dM7H1dgrlwcGTdA')
self.minio_video_bucket = os.getenv('MINIO_VIDEO_BUCKET', 'detectifai-videos')
self.minio_keyframe_bucket = os.getenv('MINIO_KEYFRAME_BUCKET', 'detectifai-keyframes')
self.minio_reports_bucket = os.getenv('MINIO_REPORTS_BUCKET', 'detectifai-reports')
self.minio_secure = os.getenv('MINIO_SECURE', 'true').lower() == 'true'
# Extract region from endpoint for S3 signing (e.g. 'eu-central-003')
self.minio_region = os.getenv('MINIO_REGION', self._extract_region(self.minio_endpoint))
@staticmethod
def _extract_region(endpoint: str) -> str:
"""Extract region from B2 S3 endpoint like s3.eu-central-003.backblazeb2.com"""
parts = endpoint.split('.')
if len(parts) >= 3 and parts[0] == 's3':
return parts[1] # e.g. 'eu-central-003'
return ''
class DatabaseManager:
"""Central database manager for MongoDB and MinIO connections"""
def __init__(self):
self.config = DatabaseConfig()
self._mongodb_client = None
self._db = None
self._minio_client = None
@property
def mongo_client(self):
"""Lazy loading MongoDB client"""
if self._mongodb_client is None:
try:
self._mongodb_client = MongoClient(self.config.mongo_uri)
# Test connection
self._mongodb_client.admin.command('ping')
logger.info("β
MongoDB connection established successfully")
except Exception as e:
logger.error(f"β Failed to connect to MongoDB: {e}")
raise
return self._mongodb_client
@property
def db(self):
"""Get MongoDB database instance"""
if self._db is None:
self._db = self.mongo_client[self.config.mongo_db_name]
return self._db
@property
def minio_client(self):
"""Lazy loading S3-compatible storage client β returns None when unavailable"""
if self._minio_client is None:
try:
self._minio_client = Minio(
self.config.minio_endpoint,
access_key=self.config.minio_access_key,
secret_key=self.config.minio_secret_key,
secure=self.config.minio_secure,
region=self.config.minio_region or None
)
# Test connection and verify buckets exist
self._ensure_bucket_exists()
logger.info("β
S3 storage connection established (Backblaze B2)")
except Exception as e:
logger.warning(f"β οΈ S3 storage unavailable (non-fatal): {e}")
self._minio_client = None # keep it None so we can retry later
return None
return self._minio_client
def _ensure_bucket_exists(self):
"""Verify that the required S3 buckets exist on Backblaze B2"""
try:
for bucket_name in [
self.config.minio_video_bucket,
self.config.minio_keyframe_bucket,
self.config.minio_reports_bucket,
]:
if self._minio_client.bucket_exists(bucket_name):
logger.info(f"β
S3 bucket verified: {bucket_name}")
else:
logger.warning(f"β οΈ S3 bucket not found: {bucket_name} β create it in Backblaze B2 dashboard")
except S3Error as e:
logger.error(f"β Failed to verify S3 buckets: {e}")
raise
def test_connections(self):
"""Test both MongoDB and MinIO connections"""
mongodb_success = False
minio_success = False
try:
# Test MongoDB
self.mongo_client.admin.command('ping')
collections = self.db.list_collection_names()
logger.info(f"β
MongoDB test successful. Collections: {collections}")
print(f"β
MongoDB connected successfully. Collections: {collections}")
mongodb_success = True
except Exception as e:
logger.error(f"β MongoDB connection failed: {e}")
print(f"β MongoDB connection failed: {e}")
try:
# Test S3 storage (Backblaze B2)
buckets = self.minio_client.list_buckets()
bucket_names = [bucket.name for bucket in buckets]
logger.info(f"β
S3 storage test successful. Buckets: {bucket_names}")
print(f"β
S3 storage (Backblaze B2) connected successfully. Buckets: {bucket_names}")
minio_success = True
except Exception as e:
logger.error(f"β S3 storage connection failed: {e}")
print(f"β S3 storage connection failed: {e}")
print("π‘ Check MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY env vars.")
return mongodb_success # At minimum, we need MongoDB working
def close_connections(self):
"""Close database connections"""
if self._mongodb_client:
self._mongodb_client.close()
logger.info("MongoDB connection closed")
def get_presigned_url(minio_client, bucket_name: str, object_name: str, expires: timedelta = timedelta(hours=1)):
"""Generate presigned URL for S3 object access (works with Backblaze B2)"""
try:
return minio_client.presigned_get_object(bucket_name, object_name, expires=expires)
except S3Error as e:
logger.error(f"Failed to generate presigned URL for {object_name}: {e}")
return None
if __name__ == "__main__":
# Test connections
db_manager = DatabaseManager()
if db_manager.test_connections():
print("β
All database connections working!")
else:
print("β Database connection issues detected") |