File size: 6,781 Bytes
beb8990 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """
Calendar Session Manager - Database session handling for Calendar services using SQLAlchemy
"""
import logging
import os
from typing import Dict
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database.models import Base
logger = logging.getLogger(__name__)
class CalendarSessionManager:
"""Calendar session manager with SQLAlchemy support for calendar services"""
def __init__(self):
self._engines = {} # Cache engines per database
self._session_makers = {} # Cache session makers per database
def get_db_path(self, db_id: str) -> str:
"""Get database path for a specific session ID"""
databases_dir = "./mcp_databases"
os.makedirs(databases_dir, exist_ok=True)
return os.path.join(databases_dir, f"calendar_{db_id}.sqlite")
def get_engine(self, db_id: str):
"""Get SQLAlchemy engine for the specified database"""
if db_id not in self._engines:
db_path = self.get_db_path(db_id)
database_url = f"sqlite:///{db_path}"
self._engines[db_id] = create_engine(
database_url,
echo=False, # Set to True for SQL logging
connect_args={"check_same_thread": False}, # Needed for SQLite
pool_pre_ping=True, # Verify connections before using them
pool_recycle=3600, # Recycle connections after 1 hour
)
return self._engines[db_id]
def get_session_maker(self, db_id: str):
"""Get SQLAlchemy session maker for the specified database"""
if db_id not in self._session_makers:
engine = self.get_engine(db_id)
self._session_makers[db_id] = sessionmaker(bind=engine)
return self._session_makers[db_id]
def get_session(self, db_id: str):
"""Get SQLAlchemy session for the specified database"""
session_maker = self.get_session_maker(db_id)
return session_maker()
def close_session(self, db_id: str):
"""Close and remove session for the specified database"""
if db_id in self._session_makers:
del self._session_makers[db_id]
if db_id in self._engines:
self._engines[db_id].dispose()
del self._engines[db_id]
logger.info(f"Closed session for database id '{db_id}'")
def init_database(self, db_id: str, create_tables: bool = False):
"""Initialize the database with SQLAlchemy models"""
try:
engine = self.get_engine(db_id)
# Create all tables
if create_tables:
Base.metadata.create_all(engine)
logger.info(f"Calendar database {db_id} initialized successfully with SQLAlchemy")
except Exception as e:
logger.error(f"Failed to initialize database {db_id}: {e}")
raise
def get_database_schema(self) -> Dict:
"""Get the database schema definition dynamically from SQLAlchemy models"""
from sqlalchemy import inspect
schema = {}
# Get all mapped classes (models) from the Base metadata
for table_name, table in Base.metadata.tables.items():
table_info = {"table_name": table_name, "columns": {}, "foreign_keys": [], "indexes": []}
# Process columns
for column in table.columns:
column_def = []
# Column type
column_type = str(column.type)
if hasattr(column.type, "python_type"):
if column.type.python_type == int:
column_def.append("INTEGER")
elif column.type.python_type == str:
column_def.append("TEXT")
elif column.type.python_type == bool:
column_def.append("BOOLEAN")
elif column.type.python_type == float:
column_def.append("REAL")
else:
column_def.append(column_type.upper())
else:
# Handle special SQLAlchemy types
type_str = str(column.type).upper()
if "VARCHAR" in type_str or "STRING" in type_str:
column_def.append("TEXT")
elif "INTEGER" in type_str:
column_def.append("INTEGER")
elif "TEXT" in type_str:
column_def.append("TEXT")
elif "BOOLEAN" in type_str:
column_def.append("BOOLEAN")
elif "DATETIME" in type_str:
column_def.append("DATETIME")
else:
column_def.append(type_str)
# Primary key
if column.primary_key:
column_def.append("PRIMARY KEY")
if column.autoincrement:
column_def.append("AUTOINCREMENT")
# Unique constraint
if column.unique:
column_def.append("UNIQUE")
# Not null constraint
if not column.nullable:
column_def.append("NOT NULL")
# Default value
if column.default is not None:
if hasattr(column.default, "arg"):
if callable(column.default.arg):
column_def.append("DEFAULT (function)")
else:
column_def.append(f"DEFAULT '{column.default.arg}'")
else:
column_def.append(f"DEFAULT '{column.default}'")
table_info["columns"][column.name] = " ".join(column_def)
# Process foreign keys
for fk in table.foreign_keys:
ref_table = fk.column.table.name
ref_column = fk.column.name
local_column = fk.parent.name
table_info["foreign_keys"].append(f"FOREIGN KEY ({local_column}) REFERENCES {ref_table}({ref_column})")
# Process indexes
for index in table.indexes:
index_columns = [col.name for col in index.columns]
index_type = "UNIQUE" if index.unique else "INDEX"
table_info["indexes"].append(
f"{index_type} INDEX {index.name} ON {table_name} ({', '.join(index_columns)})"
)
schema[table_name] = table_info
return schema |