Spaces:
Sleeping
Sleeping
| import os | |
| from urllib.parse import urlparse | |
| import mysql.connector | |
| from mysql.connector import pooling | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| db_url = os.getenv("AIVEN_DATABASE") | |
| if not db_url: | |
| raise ValueError("AIVEN_DATABASE environment variable not set") | |
| # Parse connection string | |
| # Format: mysql://user:pass@host:port/dbname?ssl-mode=REQUIRED | |
| parsed = urlparse(db_url) | |
| db_user = parsed.username | |
| db_password = parsed.password | |
| db_host = parsed.hostname | |
| db_port = parsed.port or 3306 | |
| db_name = parsed.path.lstrip('/') | |
| # Remove query params | |
| if '?' in db_name: | |
| db_name = db_name.split('?')[0] | |
| try: | |
| # Initialize connection pool | |
| # Aiven requires SSL connections, so we configure ssl_verify_cert=False (equivalent to rejectUnauthorized: false) | |
| connection_pool = pooling.MySQLConnectionPool( | |
| pool_name="mypool", | |
| pool_size=5, | |
| pool_reset_session=True, | |
| host=db_host, | |
| port=db_port, | |
| user=db_user, | |
| password=db_password, | |
| database=db_name, | |
| ssl_disabled=False, | |
| ssl_verify_cert=False | |
| ) | |
| except Exception as e: | |
| print(f"Error initializing connection pool: {e}") | |
| connection_pool = None | |
| def get_db_connection(): | |
| if not connection_pool: | |
| raise Exception("Database connection pool is not initialized") | |
| return connection_pool.get_connection() | |
| def execute_query(query, params=(), is_select=False): | |
| conn = get_db_connection() | |
| cursor = conn.cursor(dictionary=True) | |
| try: | |
| cursor.execute(query, params) | |
| if is_select: | |
| result = cursor.fetchall() | |
| return result | |
| else: | |
| conn.commit() | |
| return cursor.lastrowid | |
| finally: | |
| cursor.close() | |
| conn.close() | |