Spaces:
Sleeping
Sleeping
File size: 1,754 Bytes
1876bc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import os
from urllib.parse import urlparse
import mysql.connector
from mysql.connector import pooling
from dotenv import load_dotenv
load_dotenv()
db_url = os.getenv("AIVEN_DATABASE")
if not db_url:
raise ValueError("AIVEN_DATABASE environment variable not set")
# Parse connection string
# Format: mysql://user:pass@host:port/dbname?ssl-mode=REQUIRED
parsed = urlparse(db_url)
db_user = parsed.username
db_password = parsed.password
db_host = parsed.hostname
db_port = parsed.port or 3306
db_name = parsed.path.lstrip('/')
# Remove query params
if '?' in db_name:
db_name = db_name.split('?')[0]
try:
# Initialize connection pool
# Aiven requires SSL connections, so we configure ssl_verify_cert=False (equivalent to rejectUnauthorized: false)
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
pool_reset_session=True,
host=db_host,
port=db_port,
user=db_user,
password=db_password,
database=db_name,
ssl_disabled=False,
ssl_verify_cert=False
)
except Exception as e:
print(f"Error initializing connection pool: {e}")
connection_pool = None
def get_db_connection():
if not connection_pool:
raise Exception("Database connection pool is not initialized")
return connection_pool.get_connection()
def execute_query(query, params=(), is_select=False):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query, params)
if is_select:
result = cursor.fetchall()
return result
else:
conn.commit()
return cursor.lastrowid
finally:
cursor.close()
conn.close()
|