cryogenic22's picture
Rename utils/db_utils.py to utils_core/db_utils.py
9ff697d verified
"""
Database Utility Functions
This module provides utility functions for connecting to databases,
executing queries, and fetching data.
"""
import sqlite3
import pandas as pd
from typing import List, Dict, Any, Optional, Union, Tuple
def create_connection(db_path: str = "data/pharma_db.sqlite") -> sqlite3.Connection:
"""Create a database connection to the SQLite database"""
try:
conn = sqlite3.connect(db_path)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
raise
def execute_query(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> bool:
"""Execute a SQL query with optional parameters"""
try:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error executing query: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return False
def execute_script(conn: sqlite3.Connection, script: str) -> bool:
"""Execute a SQL script containing multiple statements"""
try:
cursor = conn.cursor()
cursor.executescript(script)
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error executing script: {e}")
return False
def fetch_data(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> List[Tuple]:
"""Fetch data from the database using a SQL query"""
try:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
except sqlite3.Error as e:
print(f"Error fetching data: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return []
def fetch_as_dataframe(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> pd.DataFrame:
"""Fetch data from the database as a pandas DataFrame"""
try:
if params:
df = pd.read_sql_query(query, conn, params=params)
else:
df = pd.read_sql_query(query, conn)
return df
except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
print(f"Error fetching DataFrame: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return pd.DataFrame()
def get_table_names(conn: sqlite3.Connection) -> List[str]:
"""Get a list of all tables in the database"""
query = "SELECT name FROM sqlite_master WHERE type='table'"
cursor = conn.cursor()
cursor.execute(query)
return [row[0] for row in cursor.fetchall()]
def get_table_schema(conn: sqlite3.Connection, table_name: str) -> List[Dict[str, Any]]:
"""Get the schema for a specific table"""
try:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
schema = []
for col in columns:
schema.append({
"name": col[1],
"type": col[2],
"notnull": bool(col[3]),
"default": col[4],
"primary_key": bool(col[5])
})
return schema
except sqlite3.Error as e:
print(f"Error getting schema for {table_name}: {e}")
return []
def get_table_row_count(conn: sqlite3.Connection, table_name: str) -> int:
"""Get the number of rows in a table"""
try:
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
return cursor.fetchone()[0]
except sqlite3.Error as e:
print(f"Error getting row count for {table_name}: {e}")
return 0
def close_connection(conn: sqlite3.Connection) -> None:
"""Close the database connection"""
if conn:
conn.close()
# Example usage
if __name__ == "__main__":
# Create a connection
conn = create_connection()
# Get list of tables
tables = get_table_names(conn)
print(f"Tables in the database: {tables}")
# Get row counts for each table
for table in tables:
count = get_table_row_count(conn, table)
print(f"Table '{table}' has {count} rows")
# Close the connection
close_connection(conn)