Spaces:
Runtime error
Runtime error
File size: 4,552 Bytes
3616223 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
"""
Database Utility Functions
This module provides utility functions for connecting to databases,
executing queries, and fetching data.
"""
import sqlite3
import pandas as pd
from typing import List, Dict, Any, Optional, Union, Tuple
def create_connection(db_path: str = "data/pharma_db.sqlite") -> sqlite3.Connection:
"""Create a database connection to the SQLite database"""
try:
conn = sqlite3.connect(db_path)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
raise
def execute_query(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> bool:
"""Execute a SQL query with optional parameters"""
try:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error executing query: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return False
def execute_script(conn: sqlite3.Connection, script: str) -> bool:
"""Execute a SQL script containing multiple statements"""
try:
cursor = conn.cursor()
cursor.executescript(script)
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error executing script: {e}")
return False
def fetch_data(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> List[Tuple]:
"""Fetch data from the database using a SQL query"""
try:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
except sqlite3.Error as e:
print(f"Error fetching data: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return []
def fetch_as_dataframe(
conn: sqlite3.Connection,
query: str,
params: Optional[Union[Tuple, List, Dict]] = None
) -> pd.DataFrame:
"""Fetch data from the database as a pandas DataFrame"""
try:
if params:
df = pd.read_sql_query(query, conn, params=params)
else:
df = pd.read_sql_query(query, conn)
return df
except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
print(f"Error fetching DataFrame: {e}")
print(f"Query: {query}")
if params:
print(f"Parameters: {params}")
return pd.DataFrame()
def get_table_names(conn: sqlite3.Connection) -> List[str]:
"""Get a list of all tables in the database"""
query = "SELECT name FROM sqlite_master WHERE type='table'"
cursor = conn.cursor()
cursor.execute(query)
return [row[0] for row in cursor.fetchall()]
def get_table_schema(conn: sqlite3.Connection, table_name: str) -> List[Dict[str, Any]]:
"""Get the schema for a specific table"""
try:
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
schema = []
for col in columns:
schema.append({
"name": col[1],
"type": col[2],
"notnull": bool(col[3]),
"default": col[4],
"primary_key": bool(col[5])
})
return schema
except sqlite3.Error as e:
print(f"Error getting schema for {table_name}: {e}")
return []
def get_table_row_count(conn: sqlite3.Connection, table_name: str) -> int:
"""Get the number of rows in a table"""
try:
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
return cursor.fetchone()[0]
except sqlite3.Error as e:
print(f"Error getting row count for {table_name}: {e}")
return 0
def close_connection(conn: sqlite3.Connection) -> None:
"""Close the database connection"""
if conn:
conn.close()
# Example usage
if __name__ == "__main__":
# Create a connection
conn = create_connection()
# Get list of tables
tables = get_table_names(conn)
print(f"Tables in the database: {tables}")
# Get row counts for each table
for table in tables:
count = get_table_row_count(conn, table)
print(f"Table '{table}' has {count} rows")
# Close the connection
close_connection(conn) |