Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from pymongo import MongoClient, version as pymongo_version | |
| from pymongo.errors import ConnectionFailure | |
| from dotenv import load_dotenv | |
| import re | |
| import gridfs | |
| from bson.objectid import ObjectId | |
| # Load environment variables | |
| load_dotenv() | |
| # Set up logging | |
| logger = logging.getLogger(__name__) | |
| # Ensure logger has handlers if running this module standalone for tests | |
| if not logger.hasHandlers(): | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| console_handler.setFormatter(formatter) | |
| logger.addHandler(console_handler) | |
| logger.setLevel(logging.INFO) | |
| def mask_mongo_uri(uri): | |
| """Masks the password in a MongoDB URI for logging.""" | |
| if not uri: | |
| return "None" | |
| # Regex to find username:password@ part | |
| return re.sub(r"://([^:]+):([^@]+)@", r"://\1:********@", uri) | |
| class Database: | |
| _instance = None | |
| def get_instance(cls): | |
| logger.debug("Database.get_instance called.") | |
| if cls._instance is None: | |
| logger.info("Creating new Database instance.") | |
| cls._instance = cls() | |
| else: | |
| logger.debug("Returning existing Database instance.") | |
| return cls._instance | |
| def __init__(self): | |
| logger.info("Initializing Database singleton.") | |
| if Database._instance is not None: | |
| logger.warning("Database __init__ called on existing instance.") | |
| # In a strict singleton, you might raise an exception, | |
| # but for robustness, maybe just return? | |
| # For now, we allow re-initialization check but proceed. | |
| # raise Exception("This class is a singleton!") | |
| pass # Let it proceed but log it | |
| logger.info(f"Using PyMongo version: {pymongo_version}") | |
| self.mongo_uri = os.environ.get('MONGO_URI') | |
| if not self.mongo_uri: | |
| logger.error("CRITICAL: MONGO_URI environment variable not set.") | |
| raise ValueError("MONGO_URI environment variable not set. Cannot initialize Database.") | |
| masked_uri = mask_mongo_uri(self.mongo_uri) | |
| logger.info(f"Attempting to connect to MongoDB using URI: {masked_uri}") | |
| try: | |
| # Explicitly set serverSelectionTimeoutMS (e.g., 10 seconds) | |
| self.client = MongoClient( | |
| self.mongo_uri, | |
| serverSelectionTimeoutMS=10000 # Timeout in milliseconds | |
| ) | |
| # The ismaster command is cheap and does not require auth. | |
| logger.info("MongoClient initialized. Pinging server...") | |
| self.client.admin.command('ismaster') # More specific ping/connection check | |
| self.db = self.client['enflow'] # Select the database | |
| logger.info("Successfully connected to MongoDB server and selected database.") | |
| except ConnectionFailure as e: | |
| logger.error(f"MongoDB Connection Failure: Could not connect to server. Error: {str(e)}") | |
| # Log details that might help diagnose network/firewall issues | |
| logger.error(f"Mongo URI used (masked): {masked_uri}") | |
| raise # Re-raise the exception to halt app startup if needed | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred during MongoDB connection: {str(e)}", exc_info=True) | |
| logger.error(f"Mongo URI used (masked): {masked_uri}") | |
| raise # Re-raise the exception | |
| def get_db(self): | |
| if not hasattr(self, 'db'): | |
| logger.error("Attempted to get DB instance before successful connection.") | |
| # Depending on how you handle errors in __init__, this might indicate a startup failure | |
| return None # Or raise an exception | |
| return self.db | |
| def close(self): | |
| if hasattr(self, 'client'): | |
| self.client.close() | |
| logger.info("MongoDB connection closed") | |
| # Helper functions to get collections | |
| def get_collection(collection_name): | |
| db_instance = Database.get_instance() | |
| db = db_instance.get_db() | |
| if db is None: | |
| raise RuntimeError("Database not connected, cannot get collection.") | |
| return db[collection_name] | |
| def get_users_collection(): | |
| return get_collection('users') | |
| def get_departments_collection(): | |
| return get_collection('departments') | |
| def get_workflows_collection(): | |
| return get_collection('workflows') | |
| def get_logs_collection(): | |
| return get_collection('logs') | |
| def get_incidents_collection(): | |
| return get_collection('incidents') | |
| def get_gridfs(): | |
| """Get a GridFS instance for file storage""" | |
| db_instance = Database.get_instance() | |
| db = db_instance.get_db() | |
| if db is None: | |
| raise RuntimeError("Database not connected, cannot get GridFS.") | |
| return gridfs.GridFS(db) |