Spaces:
Runtime error
Runtime error
File size: 3,259 Bytes
37bd4dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | import psycopg2
class DataWrapper:
def __init__(self, data):
if isinstance(data, list):
emptyDict = {dataKey:None for dataKey in data}
self.__dict__.update(emptyDict)
elif isinstance(data, dict):
self.__dict__.update(data)
def addKey(self, key, val=None):
self.__dict__.update({key:val})
def __repr__(self):
return self.__dict__.__repr__()
class MetaDataLayout:
def __init__(self, schemaName, allTablesAndCols):
self.schemaName = schemaName
self.datalayout = {
"schema": self.schemaName,
"selectedTables":{},
"allTables":allTablesAndCols
}
def setSelection(self, tablesAndCols):
"""
tablesAndCols : {"table1":["col1", "col2"], "table1":["cola","colb"]}
"""
datalayout = self.datalayout
for table in tablesAndCols:
if table in datalayout['allTables'].keys():
datalayout['selectedTables'][table] = tablesAndCols[table]
else:
print(f"Table {table} doesn't exists in the schema")
self.datalayout = datalayout
def resetSelection(self):
datalayout = self.datalayout
datalayout['selectedTables'] = {}
self.datalayout = datalayout
def getSelectedTablesAndCols(self):
return self.datalayout['selectedTables']
def getAllTablesCols(self):
return self.datalayout['allTables']
class DbEngine:
def __init__(self, dbCreds):
self.dbCreds = dbCreds
self.connection = None
def connect(self):
dbCreds = self.dbCreds
if self.connection is None or self.connection.closed != 0:
self.connection = psycopg2.connect(database=dbCreds.database, user = dbCreds.user,
password = dbCreds.password, host = dbCreds.host,
port = dbCreds.port)
def disconnect(self):
if self.connection is not None and self.connection.closed == 0:
self.connection.close()
def execute_query(self, query):
with self.connection.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
def executeQuery(dbEngine, query):
result = dbEngine.execute_query(query)
return result
def executeColumnsQuery(dbEngine, columnQuery):
with dbEngine.connection.cursor() as cursor:
cursor.execute(columnQuery)
columns = [desc[0] for desc in cursor.description]
return columns
def closeDbEngine(dbEngine):
dbEngine.disconnect()
def getAllTablesInfo(dbEngine, schemaName):
tablesAndCols = {}
allTablesQuery = f"""SELECT table_name FROM information_schema.tables
WHERE table_schema = '{schemaName}'"""
tables = executeQuery(dbEngine, allTablesQuery)
for table in tables:
tableName = table[0]
columnsQuery = f"""Select * FROM {schemaName}.{tableName} LIMIT 0"""
columns = executeColumnsQuery(dbEngine, columnsQuery)
tablesAndCols[tableName] = columns
return tablesAndCols
def getSampleDataForTablesAndCols(dbEngine, schemaName, tablesAndCols, maxRows):
data = {}
conn = dbEngine.connection
for table in tablesAndCols.keys():
try:
sqlQuery = f"""select * from {schemaName}.{table} limit {maxRows}"""
data[table] = pd.read_sql_query(sqlQuery, con=conn)
except:
print(f"couldn't read table data. Table: {table}")
return data |