calendar-gym / server /database /session_manager.py
santosh-iima's picture
Upload 91 files
beb8990 verified
"""
Calendar Session Manager - Database session handling for Calendar services using SQLAlchemy
"""
import logging
import os
from typing import Dict
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database.models import Base
logger = logging.getLogger(__name__)
class CalendarSessionManager:
"""Calendar session manager with SQLAlchemy support for calendar services"""
def __init__(self):
self._engines = {} # Cache engines per database
self._session_makers = {} # Cache session makers per database
def get_db_path(self, db_id: str) -> str:
"""Get database path for a specific session ID"""
databases_dir = "./mcp_databases"
os.makedirs(databases_dir, exist_ok=True)
return os.path.join(databases_dir, f"calendar_{db_id}.sqlite")
def get_engine(self, db_id: str):
"""Get SQLAlchemy engine for the specified database"""
if db_id not in self._engines:
db_path = self.get_db_path(db_id)
database_url = f"sqlite:///{db_path}"
self._engines[db_id] = create_engine(
database_url,
echo=False, # Set to True for SQL logging
connect_args={"check_same_thread": False}, # Needed for SQLite
pool_pre_ping=True, # Verify connections before using them
pool_recycle=3600, # Recycle connections after 1 hour
)
return self._engines[db_id]
def get_session_maker(self, db_id: str):
"""Get SQLAlchemy session maker for the specified database"""
if db_id not in self._session_makers:
engine = self.get_engine(db_id)
self._session_makers[db_id] = sessionmaker(bind=engine)
return self._session_makers[db_id]
def get_session(self, db_id: str):
"""Get SQLAlchemy session for the specified database"""
session_maker = self.get_session_maker(db_id)
return session_maker()
def close_session(self, db_id: str):
"""Close and remove session for the specified database"""
if db_id in self._session_makers:
del self._session_makers[db_id]
if db_id in self._engines:
self._engines[db_id].dispose()
del self._engines[db_id]
logger.info(f"Closed session for database id '{db_id}'")
def init_database(self, db_id: str, create_tables: bool = False):
"""Initialize the database with SQLAlchemy models"""
try:
engine = self.get_engine(db_id)
# Create all tables
if create_tables:
Base.metadata.create_all(engine)
logger.info(f"Calendar database {db_id} initialized successfully with SQLAlchemy")
except Exception as e:
logger.error(f"Failed to initialize database {db_id}: {e}")
raise
def get_database_schema(self) -> Dict:
"""Get the database schema definition dynamically from SQLAlchemy models"""
from sqlalchemy import inspect
schema = {}
# Get all mapped classes (models) from the Base metadata
for table_name, table in Base.metadata.tables.items():
table_info = {"table_name": table_name, "columns": {}, "foreign_keys": [], "indexes": []}
# Process columns
for column in table.columns:
column_def = []
# Column type
column_type = str(column.type)
if hasattr(column.type, "python_type"):
if column.type.python_type == int:
column_def.append("INTEGER")
elif column.type.python_type == str:
column_def.append("TEXT")
elif column.type.python_type == bool:
column_def.append("BOOLEAN")
elif column.type.python_type == float:
column_def.append("REAL")
else:
column_def.append(column_type.upper())
else:
# Handle special SQLAlchemy types
type_str = str(column.type).upper()
if "VARCHAR" in type_str or "STRING" in type_str:
column_def.append("TEXT")
elif "INTEGER" in type_str:
column_def.append("INTEGER")
elif "TEXT" in type_str:
column_def.append("TEXT")
elif "BOOLEAN" in type_str:
column_def.append("BOOLEAN")
elif "DATETIME" in type_str:
column_def.append("DATETIME")
else:
column_def.append(type_str)
# Primary key
if column.primary_key:
column_def.append("PRIMARY KEY")
if column.autoincrement:
column_def.append("AUTOINCREMENT")
# Unique constraint
if column.unique:
column_def.append("UNIQUE")
# Not null constraint
if not column.nullable:
column_def.append("NOT NULL")
# Default value
if column.default is not None:
if hasattr(column.default, "arg"):
if callable(column.default.arg):
column_def.append("DEFAULT (function)")
else:
column_def.append(f"DEFAULT '{column.default.arg}'")
else:
column_def.append(f"DEFAULT '{column.default}'")
table_info["columns"][column.name] = " ".join(column_def)
# Process foreign keys
for fk in table.foreign_keys:
ref_table = fk.column.table.name
ref_column = fk.column.name
local_column = fk.parent.name
table_info["foreign_keys"].append(f"FOREIGN KEY ({local_column}) REFERENCES {ref_table}({ref_column})")
# Process indexes
for index in table.indexes:
index_columns = [col.name for col in index.columns]
index_type = "UNIQUE" if index.unique else "INDEX"
table_info["indexes"].append(
f"{index_type} INDEX {index.name} ON {table_name} ({', '.join(index_columns)})"
)
schema[table_name] = table_info
return schema