File size: 6,994 Bytes
74d3aee 6a73e96 74d3aee 6a73e96 74d3aee 6a73e96 74d3aee 6a73e96 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """
Database connector with support for ODBC and TDS drivers.
Secure handling of credentials via environment variables.
"""
import os
import logging
from typing import Optional, Dict, Any
from contextlib import contextmanager
from urllib.parse import quote_plus
import pandas as pd
from sqlalchemy import create_engine, text, pool
from sqlalchemy.engine import Engine
from tenacity import retry, stop_after_attempt, wait_exponential
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatabaseConnector:
"""Handles database connections and query execution."""
def __init__(self):
self.engine: Optional[Engine] = None
self.connection_string: Optional[str] = None
self._init_engine()
def _get_env_var(self, key: str, default: str = "", mask_log: bool = False) -> str:
"""Safely retrieve environment variable."""
value = os.getenv(key, default)
if mask_log and value:
logger.info(f"{key}: {'*' * 8}")
else:
logger.info(f"{key}: {value if value else '(not set)'}")
return value
def _build_connection_string(self) -> Optional[str]:
"""Build SQLAlchemy connection string from environment variables."""
host = self._get_env_var("DB_HOST")
port = self._get_env_var("DB_PORT", "1433")
database = self._get_env_var("DB_NAME")
user = self._get_env_var("DB_USER", mask_log=True)
password = self._get_env_var("DB_PASSWORD", mask_log=True)
driver = self._get_env_var("DB_DRIVER", "tds") # tds or odbc
encrypt = self._get_env_var("DB_ENCRYPT", "false")
if not all([host, database, user, password]):
logger.warning("Database credentials incomplete. Demo mode will be used.")
return None
try:
if driver == "odbc":
# ODBC connection string
driver_name = "{ODBC Driver 18 for SQL Server}"
params = {
"DRIVER": driver_name,
"SERVER": f"{host},{port}",
"DATABASE": database,
"UID": user,
"PWD": password,
"Encrypt": "yes" if encrypt.lower() == "true" else "no",
"TrustServerCertificate": "yes"
}
conn_str = "mssql+pyodbc://?" + "&".join(
f"{k}={quote_plus(str(v))}" for k, v in params.items()
)
else:
# python-tds connection string
conn_str = (
f"mssql+pytds://{quote_plus(user)}:{quote_plus(password)}"
f"@{host}:{port}/{database}"
)
if encrypt.lower() == "true":
conn_str += "?encryption=required"
logger.info(f"Connection string built using {driver} driver")
return conn_str
except Exception as e:
logger.error(f"Error building connection string: {str(e)}")
return None
def _init_engine(self):
"""Initialize SQLAlchemy engine."""
self.connection_string = self._build_connection_string()
if not self.connection_string:
logger.warning("No valid connection string. Database features disabled.")
return
try:
self.engine = create_engine(
self.connection_string,
poolclass=pool.QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Verify connections before using
echo=False
)
logger.info("Database engine initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database engine: {str(e)}")
self.engine = None
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def test_connection(self) -> bool:
"""Test database connectivity."""
if not self.engine:
return False
try:
with self.engine.connect() as conn:
result = conn.execute(text("SELECT 1 AS test"))
row = result.fetchone()
if row and row[0] == 1:
logger.info("Database connection test successful")
return True
return False
except Exception as e:
logger.error(f"Database connection test failed: {str(e)}")
return False
def execute_query(
self,
query: str,
params: Optional[Dict[str, Any]] = None
) -> Optional[pd.DataFrame]:
"""
Execute a SQL query and return results as pandas DataFrame.
Args:
query: SQL query string with :param placeholders
params: Dictionary of parameter values
Returns:
DataFrame with results or None on error
"""
if not self.engine:
logger.error("No database engine available")
return None
try:
with self.engine.connect() as conn:
result = pd.read_sql_query(
text(query),
conn,
params=params or {}
)
logger.info(f"Query executed successfully, returned {len(result)} rows")
return result
except Exception as e:
# Mask any credential info in error messages
error_msg = str(e)
for key in ["PASSWORD", "PWD", "UID", "password", "user"]:
if key.lower() in error_msg.lower():
error_msg = "Database query error (credentials masked)"
break
logger.error(f"Query execution failed: {error_msg}")
return None
def execute_scalar(
self,
query: str,
params: Optional[Dict[str, Any]] = None
) -> Optional[Any]:
"""Execute query and return single scalar value."""
df = self.execute_query(query, params)
if df is not None and not df.empty:
return df.iloc[0, 0]
return None
@contextmanager
def get_connection(self):
"""Context manager for raw database connections."""
if not self.engine:
raise RuntimeError("No database engine available")
conn = self.engine.connect()
try:
yield conn
finally:
conn.close()
def is_available(self) -> bool:
"""Check if database is available."""
if self.engine is None:
return False
try:
return self.test_connection()
except Exception:
return False
# Global database connector instance
db_connector = DatabaseConnector() |