anumaurya114exp commited on
Commit
522ce80
·
1 Parent(s): 7cf13a6

persist tables data in hugging face

Browse files
Files changed (2) hide show
  1. persistStorage.py +68 -11
  2. utils.py +9 -1
persistStorage.py CHANGED
@@ -1,26 +1,26 @@
1
- from huggingface_hub import HfFileSystem
2
 
3
- from datetime import datetime
4
  import pytz
5
  import os
6
  from config import HUGGING_FACE_TOKEN
 
7
  import csv
8
 
 
9
  logsDir = os.getenv("HF_HOME", "/data")
10
 
11
- # # Create a new file
12
- # with open(os.path.join(data_dir, "my_data.txt"), "a") as f:
13
- # f.write("Hello World! From pesistent storage line 2")
14
 
15
- # # Read the data from the file
16
- # with open(os.path.join(data_dir, "my_data.txt"), "r") as f:
17
- # data = f.read()
18
- # # Print the data
19
- # print(data)
20
 
21
  # Set the time zone to Pacific Time Zone
22
  TIME_ZONE = 'US/Pacific'
23
  TIMEZONE_OBJ = pytz.timezone(TIME_ZONE)
 
24
 
25
  def append_dict_to_csv(file_path, row_data):
26
  fieldnames = row_data.keys()
@@ -54,4 +54,61 @@ def getAllLogFilesPaths():
54
  print(logFiles,"avaiable logs")
55
 
56
  downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles]
57
- return downloadableFilesPaths
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
 
3
+ from datetime import datetime, timedelta
4
  import pytz
5
  import os
6
  from config import HUGGING_FACE_TOKEN
7
+ import pandas as pd
8
  import csv
9
 
10
+
11
  logsDir = os.getenv("HF_HOME", "/data")
12
 
13
+ TABLES_DATA_DIR = os.path.join(os.getenv("HF_HOME", "/data"), "tablesData")
 
 
14
 
15
+ try:
16
+ os.makedirs(TABLES_DATA_DIR, exist_ok=True)
17
+ except:
18
+ pass
 
19
 
20
  # Set the time zone to Pacific Time Zone
21
  TIME_ZONE = 'US/Pacific'
22
  TIMEZONE_OBJ = pytz.timezone(TIME_ZONE)
23
+ CACHE_TIME_EXPIRE = 5 #days
24
 
25
  def append_dict_to_csv(file_path, row_data):
26
  fieldnames = row_data.keys()
 
54
  print(logFiles,"avaiable logs")
55
 
56
  downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles]
57
+ return downloadableFilesPaths
58
+
59
+ def getLocalDbFileName():
60
+ if len(os.listdir(TABLES_DATA_DIR))==0:
61
+ return None
62
+ localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD
63
+ return localDbName
64
+
65
+ def isTablesCacheValid():
66
+ localDbName = getLocalDbFileName()
67
+ timeCreatedStr = localDbName.split('.')[0]
68
+ timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d')
69
+ if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now():
70
+ return True
71
+ return False
72
+
73
+ def removeFile(fileNameWithPath):
74
+ if os.path.exists(fileNameWithPath):
75
+ os.remove(fileNameWithPath)
76
+ print(f"File '{fileNameWithPath}' deleted successfully.")
77
+ else:
78
+ print(f"File '{fileNameWithPath}' does not exist.")
79
+
80
+
81
+ def saveTablesDataToLocalDB(tablesData):
82
+ for prevDbs in os.listdir(TABLES_DATA_DIR):
83
+ removeFile(os.path.join(TABLES_DATA_DIR, prevDbs))
84
+ newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db'
85
+ localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb)
86
+ print(f"saving to local db {localDbNameWithPath}")
87
+ conn = sqlite3.connect(localDbNameWithPath)
88
+ for tableName in tablesData.keys():
89
+ tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False)
90
+ conn.close()
91
+
92
+ def retrieveTablesDataFromLocalDb(tablesList):
93
+ print("retreving tables from localDb")
94
+ localDbName = getLocalDbFileName()
95
+ if localDbName==None:
96
+ return {}
97
+ localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName)
98
+ if not isTablesCacheValid():
99
+ removeFile(localDbNameWithPath)
100
+ return {}
101
+
102
+ conn = sqlite3.connect(localDbNameWithPath)
103
+ data = {}
104
+ for tableName in tablesList:
105
+ try:
106
+ sql = f'SELECT * FROM {tableName}'
107
+ df = pd.read_sql_query(sql, con=conn)
108
+ data[tableName] = df
109
+ except:
110
+ print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.")
111
+ conn.close()
112
+ return {}
113
+ conn.close()
114
+ return data
utils.py CHANGED
@@ -1,6 +1,7 @@
1
  import psycopg2
2
  import re
3
  import pandas as pd
 
4
 
5
  class DataWrapper:
6
  def __init__(self, data):
@@ -111,15 +112,22 @@ def getAllTablesInfo(dbEngine, schemaName):
111
  return tablesAndCols
112
 
113
  def getSampleDataForTablesAndCols(dbEngine, schemaName, tablesAndCols, maxRows):
114
- data = {}
 
 
 
 
115
  dbEngine.connect()
116
  conn = dbEngine.getConnection()
 
117
  for table in tablesAndCols.keys():
118
  try:
119
  sqlQuery = f"""select * from {schemaName}.{table} limit {maxRows}"""
120
  data[table] = pd.read_sql_query(sqlQuery, con=conn)
121
  except:
122
  print(f"couldn't read table data. Table: {table}")
 
 
123
  return data
124
 
125
  # Function to test the generated sql query
 
1
  import psycopg2
2
  import re
3
  import pandas as pd
4
+ from persistStorage import retrieveTablesDataFromLocalDb, saveTablesDataToLocalDB
5
 
6
  class DataWrapper:
7
  def __init__(self, data):
 
112
  return tablesAndCols
113
 
114
  def getSampleDataForTablesAndCols(dbEngine, schemaName, tablesAndCols, maxRows):
115
+ data = retrieveTablesDataFromLocalDb(list(tablesAndCols.keys()))
116
+ if data!={}:
117
+ print("Didn't find any cache/valid cache.")
118
+ return data
119
+
120
  dbEngine.connect()
121
  conn = dbEngine.getConnection()
122
+ print("Getting data from aws redshift")
123
  for table in tablesAndCols.keys():
124
  try:
125
  sqlQuery = f"""select * from {schemaName}.{table} limit {maxRows}"""
126
  data[table] = pd.read_sql_query(sqlQuery, con=conn)
127
  except:
128
  print(f"couldn't read table data. Table: {table}")
129
+ data[table] = pd.DataFrame({})
130
+ saveTablesDataToLocalDB(data)
131
  return data
132
 
133
  # Function to test the generated sql query