QueryHelper / persistStorage.py
anumaurya114exp's picture
revert back to new head
d290cd6
import sqlite3
from datetime import datetime, timedelta
import pytz
import os
from config import HUGGING_FACE_TOKEN, TABLES_DATA_DIR, logsDir, RESULT_CSV_DIR
import pandas as pd
import csv
import random
try:
os.makedirs(TABLES_DATA_DIR, exist_ok=True)
except:
pass
try:
os.makedirs(RESULT_CSV_DIR, exist_ok=True)
except:
pass
# Set the time zone to Pacific Time Zone
TIME_ZONE = 'US/Pacific'
TIMEZONE_OBJ = pytz.timezone(TIME_ZONE)
CACHE_TIME_EXPIRE = 20 #days
def getNewCsvFilePath():
fileName = "ResultCsv_" + "".join([str(random.randint(0,9)) for i in range(3)]) + ".csv"
fileNameWithpath = os.path.join(RESULT_CSV_DIR, fileName)
return fileNameWithpath
def removeAllCsvFiles():
files = os.listdir(RESULT_CSV_DIR)
for fileName in files:
fileNameWithPath = os.path.join(RESULT_CSV_DIR, fileName)
try:
os.remove(fileNameWithPath)
except:
pass
def append_dict_to_csv(file_path, row_data):
fieldnames = row_data.keys()
with open(file_path, 'a') as csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# Check if the file is empty, and if so, write the header
if csv_file.tell() == 0:
csv_writer.writeheader()
csv_writer.writerow(row_data)
def saveLog(message, level='info') -> None:
global logsDir
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return
current_time = datetime.now(TIMEZONE_OBJ)
message = str(message)
log_file_path = os.path.join(logsDir, f"{current_time.strftime('%Y-%m')}-log.csv")
data_dict = {"time":str(current_time), "level": level, "message": message}
append_dict_to_csv(log_file_path, data_dict)
def getAllLogFilesPaths():
global logsDir
# Save processed data to temporary file
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return []
logFiles = [file for file in os.listdir(logsDir) if 'log' in file.lower()]
print(logFiles,"avaiable logs")
downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles]
return downloadableFilesPaths
def getLocalDbFileName():
if len(os.listdir(TABLES_DATA_DIR))==0:
return None
localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD
return localDbName
def isTablesCacheValid():
localDbName = getLocalDbFileName()
timeCreatedStr = localDbName.split('.')[0]
timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d')
if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now():
return True
return False
def removeFile(fileNameWithPath):
if os.path.exists(fileNameWithPath):
os.remove(fileNameWithPath)
print(f"File '{fileNameWithPath}' deleted successfully.")
else:
print(f"File '{fileNameWithPath}' does not exist.")
def saveTablesDataToLocalDB(tablesData):
for prevDbs in os.listdir(TABLES_DATA_DIR):
removeFile(os.path.join(TABLES_DATA_DIR, prevDbs))
newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db'
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb)
print(f"saving to local db {localDbNameWithPath}")
conn = sqlite3.connect(localDbNameWithPath)
for tableName in tablesData.keys():
tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False)
conn.close()
def retrieveTablesDataFromLocalDb(tablesList):
print("retreving tables from localDb")
localDbName = getLocalDbFileName()
if localDbName==None:
return {}
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName)
if not isTablesCacheValid():
removeFile(localDbNameWithPath)
return {}
conn = sqlite3.connect(localDbNameWithPath)
data = {}
for tableName in tablesList:
try:
sql = f'SELECT * FROM {tableName}'
df = pd.read_sql_query(sql, con=conn)
data[tableName] = df
except:
print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.")
conn.close()
return {}
conn.close()
return data