cryogenic22 commited on
Commit
3616223
·
verified ·
1 Parent(s): 3e9cac4

Create utils/db_utils.py

Browse files
Files changed (1) hide show
  1. utils/db_utils.py +151 -0
utils/db_utils.py ADDED
@@ -0,0 +1,151 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Database Utility Functions
3
+
4
+ This module provides utility functions for connecting to databases,
5
+ executing queries, and fetching data.
6
+ """
7
+
8
+ import sqlite3
9
+ import pandas as pd
10
+ from typing import List, Dict, Any, Optional, Union, Tuple
11
+
12
+ def create_connection(db_path: str = "data/pharma_db.sqlite") -> sqlite3.Connection:
13
+ """Create a database connection to the SQLite database"""
14
+ try:
15
+ conn = sqlite3.connect(db_path)
16
+ return conn
17
+ except sqlite3.Error as e:
18
+ print(f"Error connecting to database: {e}")
19
+ raise
20
+
21
+ def execute_query(
22
+ conn: sqlite3.Connection,
23
+ query: str,
24
+ params: Optional[Union[Tuple, List, Dict]] = None
25
+ ) -> bool:
26
+ """Execute a SQL query with optional parameters"""
27
+ try:
28
+ cursor = conn.cursor()
29
+ if params:
30
+ cursor.execute(query, params)
31
+ else:
32
+ cursor.execute(query)
33
+ conn.commit()
34
+ return True
35
+ except sqlite3.Error as e:
36
+ print(f"Error executing query: {e}")
37
+ print(f"Query: {query}")
38
+ if params:
39
+ print(f"Parameters: {params}")
40
+ return False
41
+
42
+ def execute_script(conn: sqlite3.Connection, script: str) -> bool:
43
+ """Execute a SQL script containing multiple statements"""
44
+ try:
45
+ cursor = conn.cursor()
46
+ cursor.executescript(script)
47
+ conn.commit()
48
+ return True
49
+ except sqlite3.Error as e:
50
+ print(f"Error executing script: {e}")
51
+ return False
52
+
53
+ def fetch_data(
54
+ conn: sqlite3.Connection,
55
+ query: str,
56
+ params: Optional[Union[Tuple, List, Dict]] = None
57
+ ) -> List[Tuple]:
58
+ """Fetch data from the database using a SQL query"""
59
+ try:
60
+ cursor = conn.cursor()
61
+ if params:
62
+ cursor.execute(query, params)
63
+ else:
64
+ cursor.execute(query)
65
+ return cursor.fetchall()
66
+ except sqlite3.Error as e:
67
+ print(f"Error fetching data: {e}")
68
+ print(f"Query: {query}")
69
+ if params:
70
+ print(f"Parameters: {params}")
71
+ return []
72
+
73
+ def fetch_as_dataframe(
74
+ conn: sqlite3.Connection,
75
+ query: str,
76
+ params: Optional[Union[Tuple, List, Dict]] = None
77
+ ) -> pd.DataFrame:
78
+ """Fetch data from the database as a pandas DataFrame"""
79
+ try:
80
+ if params:
81
+ df = pd.read_sql_query(query, conn, params=params)
82
+ else:
83
+ df = pd.read_sql_query(query, conn)
84
+ return df
85
+ except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
86
+ print(f"Error fetching DataFrame: {e}")
87
+ print(f"Query: {query}")
88
+ if params:
89
+ print(f"Parameters: {params}")
90
+ return pd.DataFrame()
91
+
92
+ def get_table_names(conn: sqlite3.Connection) -> List[str]:
93
+ """Get a list of all tables in the database"""
94
+ query = "SELECT name FROM sqlite_master WHERE type='table'"
95
+ cursor = conn.cursor()
96
+ cursor.execute(query)
97
+ return [row[0] for row in cursor.fetchall()]
98
+
99
+ def get_table_schema(conn: sqlite3.Connection, table_name: str) -> List[Dict[str, Any]]:
100
+ """Get the schema for a specific table"""
101
+ try:
102
+ cursor = conn.cursor()
103
+ cursor.execute(f"PRAGMA table_info({table_name})")
104
+ columns = cursor.fetchall()
105
+
106
+ schema = []
107
+ for col in columns:
108
+ schema.append({
109
+ "name": col[1],
110
+ "type": col[2],
111
+ "notnull": bool(col[3]),
112
+ "default": col[4],
113
+ "primary_key": bool(col[5])
114
+ })
115
+
116
+ return schema
117
+ except sqlite3.Error as e:
118
+ print(f"Error getting schema for {table_name}: {e}")
119
+ return []
120
+
121
+ def get_table_row_count(conn: sqlite3.Connection, table_name: str) -> int:
122
+ """Get the number of rows in a table"""
123
+ try:
124
+ cursor = conn.cursor()
125
+ cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
126
+ return cursor.fetchone()[0]
127
+ except sqlite3.Error as e:
128
+ print(f"Error getting row count for {table_name}: {e}")
129
+ return 0
130
+
131
+ def close_connection(conn: sqlite3.Connection) -> None:
132
+ """Close the database connection"""
133
+ if conn:
134
+ conn.close()
135
+
136
+ # Example usage
137
+ if __name__ == "__main__":
138
+ # Create a connection
139
+ conn = create_connection()
140
+
141
+ # Get list of tables
142
+ tables = get_table_names(conn)
143
+ print(f"Tables in the database: {tables}")
144
+
145
+ # Get row counts for each table
146
+ for table in tables:
147
+ count = get_table_row_count(conn, table)
148
+ print(f"Table '{table}' has {count} rows")
149
+
150
+ # Close the connection
151
+ close_connection(conn)