File size: 5,982 Bytes
f4bee9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | ο»Ώ"""
π DATABASE CONNECTION MODULE
Provides database session management for SQLite/PostgreSQL with mock fallback.
"""
import os
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.exc import OperationalError
import sys
# Add project root to path for imports
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from database.config import DATABASE_CONFIG
class MockSession:
"""
π§ͺ MOCK DATABASE SESSION
Provides mock database functionality when real database isn't available.
"""
def __init__(self):
self._data = {
'deployments': [],
'models': [],
'security_memory': [],
'autonomous_decisions': [],
'policy_versions': [],
'operator_interactions': [],
'system_health': []
}
self.committed = False
def query(self, model_class):
"""Mock query method"""
class MockQuery:
def __init__(self, data):
self.data = data
def all(self):
return []
def filter(self, *args, **kwargs):
return self
def order_by(self, *args):
return self
def limit(self, limit):
return self
def first(self):
return None
def count(self):
return 0
def delete(self):
return self
return MockQuery([])
def add(self, item):
"""Mock add method"""
pass
def commit(self):
"""Mock commit method"""
self.committed = True
def close(self):
"""Mock close method"""
pass
def rollback(self):
"""Mock rollback method"""
pass
def create_sqlite_engine():
"""Create SQLite engine for development"""
try:
db_path = Path(__file__).parent.parent / "security_nervous_system.db"
db_path.parent.mkdir(exist_ok=True)
sqlite_url = f"sqlite:///{db_path}"
engine = create_engine(
sqlite_url,
echo=False,
connect_args={"check_same_thread": False}
)
print(f"β
SQLite engine created at {db_path}")
return engine
except Exception as e:
print(f"β Failed to create SQLite engine: {e}")
return None
def create_postgresql_engine():
"""Create PostgreSQL engine for production"""
try:
# Check if we have PostgreSQL config
if not hasattr(DATABASE_CONFIG, 'host'):
print("β οΈ PostgreSQL not configured, using SQLite")
return create_sqlite_engine()
# Build PostgreSQL connection URL
db_url = (
f"postgresql://{DATABASE_CONFIG.user}:{DATABASE_CONFIG.password}"
f"@{DATABASE_CONFIG.host}:{DATABASE_CONFIG.port}/{DATABASE_CONFIG.database}"
)
engine = create_engine(
db_url,
pool_size=DATABASE_CONFIG.pool_size,
max_overflow=DATABASE_CONFIG.max_overflow,
pool_recycle=3600,
echo=DATABASE_CONFIG.get('echo', False)
)
print(f"β
PostgreSQL engine created for {DATABASE_CONFIG.database}")
return engine
except Exception as e:
print(f"β PostgreSQL connection failed: {e}")
print("π‘ Falling back to SQLite")
return create_sqlite_engine()
def get_engine():
"""Get database engine (PostgreSQL -> SQLite -> Mock)"""
# Try PostgreSQL first
engine = create_postgresql_engine()
# Fallback to SQLite if PostgreSQL fails
if engine is None:
engine = create_sqlite_engine()
# Final fallback: Mock engine
if engine is None:
print("β οΈ All database engines failed, using mock mode")
return None
return engine
def get_session():
"""
Get database session with automatic fallback.
Returns:
SQLAlchemy session or MockSession
"""
try:
engine = get_engine()
if engine is None:
print("π Using MOCK database session (development)")
return MockSession()
# Create SQLAlchemy session
Session = scoped_session(sessionmaker(bind=engine))
session = Session()
# Test connection
session.execute("SELECT 1")
print("β
Real database session created")
return session
except OperationalError as e:
print(f"β οΈ Database connection failed: {e}")
print("π Using MOCK database session (fallback)")
return MockSession()
except Exception as e:
print(f"β Unexpected database error: {e}")
print("π Using MOCK database session (error fallback)")
return MockSession()
def get_session_factory():
"""Get session factory for creating multiple sessions"""
engine = get_engine()
if engine is None:
# Return mock session factory
def mock_session_factory():
return MockSession()
return mock_session_factory
Session = sessionmaker(bind=engine)
return Session
# Global session for convenience (thread-local)
_session = None
def get_global_session():
"""Get or create global session (thread-local)"""
global _session
if _session is None:
_session = get_session()
return _session
def close_global_session():
"""Close global session"""
global _session
if _session is not None:
_session.close()
_session = None
print("β
Global database session closed")
|