enflow-api / db.py
dhruv575
Switching to Mongo upload
87c01a0
import os
import logging
from pymongo import MongoClient, version as pymongo_version
from pymongo.errors import ConnectionFailure
from dotenv import load_dotenv
import re
import gridfs
from bson.objectid import ObjectId
# Load environment variables
load_dotenv()
# Set up logging
logger = logging.getLogger(__name__)
# Ensure logger has handlers if running this module standalone for tests
if not logger.hasHandlers():
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
def mask_mongo_uri(uri):
"""Masks the password in a MongoDB URI for logging."""
if not uri:
return "None"
# Regex to find username:password@ part
return re.sub(r"://([^:]+):([^@]+)@", r"://\1:********@", uri)
class Database:
_instance = None
@classmethod
def get_instance(cls):
logger.debug("Database.get_instance called.")
if cls._instance is None:
logger.info("Creating new Database instance.")
cls._instance = cls()
else:
logger.debug("Returning existing Database instance.")
return cls._instance
def __init__(self):
logger.info("Initializing Database singleton.")
if Database._instance is not None:
logger.warning("Database __init__ called on existing instance.")
# In a strict singleton, you might raise an exception,
# but for robustness, maybe just return?
# For now, we allow re-initialization check but proceed.
# raise Exception("This class is a singleton!")
pass # Let it proceed but log it
logger.info(f"Using PyMongo version: {pymongo_version}")
self.mongo_uri = os.environ.get('MONGO_URI')
if not self.mongo_uri:
logger.error("CRITICAL: MONGO_URI environment variable not set.")
raise ValueError("MONGO_URI environment variable not set. Cannot initialize Database.")
masked_uri = mask_mongo_uri(self.mongo_uri)
logger.info(f"Attempting to connect to MongoDB using URI: {masked_uri}")
try:
# Explicitly set serverSelectionTimeoutMS (e.g., 10 seconds)
self.client = MongoClient(
self.mongo_uri,
serverSelectionTimeoutMS=10000 # Timeout in milliseconds
)
# The ismaster command is cheap and does not require auth.
logger.info("MongoClient initialized. Pinging server...")
self.client.admin.command('ismaster') # More specific ping/connection check
self.db = self.client['enflow'] # Select the database
logger.info("Successfully connected to MongoDB server and selected database.")
except ConnectionFailure as e:
logger.error(f"MongoDB Connection Failure: Could not connect to server. Error: {str(e)}")
# Log details that might help diagnose network/firewall issues
logger.error(f"Mongo URI used (masked): {masked_uri}")
raise # Re-raise the exception to halt app startup if needed
except Exception as e:
logger.error(f"An unexpected error occurred during MongoDB connection: {str(e)}", exc_info=True)
logger.error(f"Mongo URI used (masked): {masked_uri}")
raise # Re-raise the exception
def get_db(self):
if not hasattr(self, 'db'):
logger.error("Attempted to get DB instance before successful connection.")
# Depending on how you handle errors in __init__, this might indicate a startup failure
return None # Or raise an exception
return self.db
def close(self):
if hasattr(self, 'client'):
self.client.close()
logger.info("MongoDB connection closed")
# Helper functions to get collections
def get_collection(collection_name):
db_instance = Database.get_instance()
db = db_instance.get_db()
if db is None:
raise RuntimeError("Database not connected, cannot get collection.")
return db[collection_name]
def get_users_collection():
return get_collection('users')
def get_departments_collection():
return get_collection('departments')
def get_workflows_collection():
return get_collection('workflows')
def get_logs_collection():
return get_collection('logs')
def get_incidents_collection():
return get_collection('incidents')
def get_gridfs():
"""Get a GridFS instance for file storage"""
db_instance = Database.get_instance()
db = db_instance.get_db()
if db is None:
raise RuntimeError("Database not connected, cannot get GridFS.")
return gridfs.GridFS(db)