Spaces:
Runtime error
Runtime error
| import psycopg2 | |
| class DataWrapper: | |
| def __init__(self, data): | |
| if isinstance(data, list): | |
| emptyDict = {dataKey:None for dataKey in data} | |
| self.__dict__.update(emptyDict) | |
| elif isinstance(data, dict): | |
| self.__dict__.update(data) | |
| def addKey(self, key, val=None): | |
| self.__dict__.update({key:val}) | |
| def __repr__(self): | |
| return self.__dict__.__repr__() | |
| class MetaDataLayout: | |
| def __init__(self, schemaName, allTablesAndCols): | |
| self.schemaName = schemaName | |
| self.datalayout = { | |
| "schema": self.schemaName, | |
| "selectedTables":{}, | |
| "allTables":allTablesAndCols | |
| } | |
| def setSelection(self, tablesAndCols): | |
| """ | |
| tablesAndCols : {"table1":["col1", "col2"], "table1":["cola","colb"]} | |
| """ | |
| datalayout = self.datalayout | |
| for table in tablesAndCols: | |
| if table in datalayout['allTables'].keys(): | |
| datalayout['selectedTables'][table] = tablesAndCols[table] | |
| else: | |
| print(f"Table {table} doesn't exists in the schema") | |
| self.datalayout = datalayout | |
| def resetSelection(self): | |
| datalayout = self.datalayout | |
| datalayout['selectedTables'] = {} | |
| self.datalayout = datalayout | |
| def getSelectedTablesAndCols(self): | |
| return self.datalayout['selectedTables'] | |
| def getAllTablesCols(self): | |
| return self.datalayout['allTables'] | |
| class DbEngine: | |
| def __init__(self, dbCreds): | |
| self.dbCreds = dbCreds | |
| self.connection = None | |
| def connect(self): | |
| dbCreds = self.dbCreds | |
| if self.connection is None or self.connection.closed != 0: | |
| self.connection = psycopg2.connect(database=dbCreds.database, user = dbCreds.user, | |
| password = dbCreds.password, host = dbCreds.host, | |
| port = dbCreds.port) | |
| def disconnect(self): | |
| if self.connection is not None and self.connection.closed == 0: | |
| self.connection.close() | |
| def execute_query(self, query): | |
| with self.connection.cursor() as cursor: | |
| cursor.execute(query) | |
| result = cursor.fetchall() | |
| return result | |
| def executeQuery(dbEngine, query): | |
| result = dbEngine.execute_query(query) | |
| return result | |
| def executeColumnsQuery(dbEngine, columnQuery): | |
| with dbEngine.connection.cursor() as cursor: | |
| cursor.execute(columnQuery) | |
| columns = [desc[0] for desc in cursor.description] | |
| return columns | |
| def closeDbEngine(dbEngine): | |
| dbEngine.disconnect() | |
| def getAllTablesInfo(dbEngine, schemaName): | |
| tablesAndCols = {} | |
| allTablesQuery = f"""SELECT table_name FROM information_schema.tables | |
| WHERE table_schema = '{schemaName}'""" | |
| tables = executeQuery(dbEngine, allTablesQuery) | |
| for table in tables: | |
| tableName = table[0] | |
| columnsQuery = f"""Select * FROM {schemaName}.{tableName} LIMIT 0""" | |
| columns = executeColumnsQuery(dbEngine, columnsQuery) | |
| tablesAndCols[tableName] = columns | |
| return tablesAndCols | |
| def getSampleDataForTablesAndCols(dbEngine, schemaName, tablesAndCols, maxRows): | |
| data = {} | |
| conn = dbEngine.connection | |
| for table in tablesAndCols.keys(): | |
| try: | |
| sqlQuery = f"""select * from {schemaName}.{table} limit {maxRows}""" | |
| data[table] = pd.read_sql_query(sqlQuery, con=conn) | |
| except: | |
| print(f"couldn't read table data. Table: {table}") | |
| return data |