Spaces:
Sleeping
Sleeping
| # src/prism/database/connection.py | |
| import os | |
| import psycopg2 | |
| from dotenv import load_dotenv | |
| from psycopg2.extras import RealDictCursor | |
| from ..utils import logger | |
| load_dotenv() | |
| class DatabaseConnection: | |
| """Manages database connections and query execution.""" | |
| def __init__(self): | |
| """Initialize database connection manager.""" | |
| self.database_url = os.getenv("DATABASE_URL") | |
| self.conn = None | |
| def connect(self): | |
| """Establish database connection.""" | |
| try: | |
| logger.debug("Establishing database connection...") | |
| self.conn = psycopg2.connect( | |
| self.database_url, cursor_factory=RealDictCursor | |
| ) | |
| logger.debug("β Database connection established") | |
| return self.conn | |
| except Exception as e: | |
| logger.error(f"β Database connection error: {e}") | |
| raise | |
| def close(self): | |
| """Close database connection.""" | |
| if self.conn: | |
| logger.debug("Closing database connection...") | |
| self.conn.close() | |
| logger.debug("Database connection closed") | |
| def execute_query(self, query, params=None): | |
| """Execute a query and return results.""" | |
| cursor = self.conn.cursor() | |
| try: | |
| query_type = ( | |
| query.strip().upper().split()[0] if query.strip() else "UNKNOWN" | |
| ) | |
| logger.debug(f"Executing {query_type} query...") | |
| cursor.execute(query, params) | |
| if query.strip().upper().startswith("SELECT"): | |
| results = cursor.fetchall() | |
| logger.debug(f"Query returned {len(results)} rows") | |
| return results | |
| else: | |
| self.conn.commit() | |
| rowcount = cursor.rowcount | |
| logger.debug(f"Query affected {rowcount} rows") | |
| return rowcount | |
| except Exception as e: | |
| self.conn.rollback() | |
| logger.error(f"β Query execution error: {e}") | |
| raise | |
| finally: | |
| cursor.close() | |
| def initialize_schema(self, schema_file): | |
| """Initialize database schema from SQL file.""" | |
| logger.debug(f"Reading schema file: {schema_file}") | |
| with open(schema_file) as f: | |
| schema_sql = f.read() | |
| cursor = self.conn.cursor() | |
| try: | |
| logger.debug("Executing schema SQL...") | |
| cursor.execute(schema_sql) | |
| self.conn.commit() | |
| logger.info("β Database schema initialized successfully") | |
| except Exception as e: | |
| self.conn.rollback() | |
| logger.error(f"β Schema initialization error: {e}") | |
| raise | |
| finally: | |
| cursor.close() | |