import psycopg2 class DataWrapper: def __init__(self, data): if isinstance(data, list): emptyDict = {dataKey:None for dataKey in data} self.__dict__.update(emptyDict) elif isinstance(data, dict): self.__dict__.update(data) def addKey(self, key, val=None): self.__dict__.update({key:val}) def __repr__(self): return self.__dict__.__repr__() class MetaDataLayout: def __init__(self, schemaName, allTablesAndCols): self.schemaName = schemaName self.datalayout = { "schema": self.schemaName, "selectedTables":{}, "allTables":allTablesAndCols } def setSelection(self, tablesAndCols): """ tablesAndCols : {"table1":["col1", "col2"], "table1":["cola","colb"]} """ datalayout = self.datalayout for table in tablesAndCols: if table in datalayout['allTables'].keys(): datalayout['selectedTables'][table] = tablesAndCols[table] else: print(f"Table {table} doesn't exists in the schema") self.datalayout = datalayout def resetSelection(self): datalayout = self.datalayout datalayout['selectedTables'] = {} self.datalayout = datalayout def getSelectedTablesAndCols(self): return self.datalayout['selectedTables'] def getAllTablesCols(self): return self.datalayout['allTables'] class DbEngine: def __init__(self, dbCreds): self.dbCreds = dbCreds self.connection = None def connect(self): dbCreds = self.dbCreds if self.connection is None or self.connection.closed != 0: self.connection = psycopg2.connect(database=dbCreds.database, user = dbCreds.user, password = dbCreds.password, host = dbCreds.host, port = dbCreds.port) def disconnect(self): if self.connection is not None and self.connection.closed == 0: self.connection.close() def execute_query(self, query): with self.connection.cursor() as cursor: cursor.execute(query) result = cursor.fetchall() return result def executeQuery(dbEngine, query): result = dbEngine.execute_query(query) return result def executeColumnsQuery(dbEngine, columnQuery): with dbEngine.connection.cursor() as cursor: cursor.execute(columnQuery) columns = [desc[0] for desc in cursor.description] return columns def closeDbEngine(dbEngine): dbEngine.disconnect() def getAllTablesInfo(dbEngine, schemaName): tablesAndCols = {} allTablesQuery = f"""SELECT table_name FROM information_schema.tables WHERE table_schema = '{schemaName}'""" tables = executeQuery(dbEngine, allTablesQuery) for table in tables: tableName = table[0] columnsQuery = f"""Select * FROM {schemaName}.{tableName} LIMIT 0""" columns = executeColumnsQuery(dbEngine, columnsQuery) tablesAndCols[tableName] = columns return tablesAndCols def getSampleDataForTablesAndCols(dbEngine, schemaName, tablesAndCols, maxRows): data = {} conn = dbEngine.connection for table in tablesAndCols.keys(): try: sqlQuery = f"""select * from {schemaName}.{table} limit {maxRows}""" data[table] = pd.read_sql_query(sqlQuery, con=conn) except: print(f"couldn't read table data. Table: {table}") return data