QueryHelper / persistStorage.py
anumaurya114exp's picture
added get csv file and remove temporary csv files
a39e129
raw
history blame
4.2 kB
import sqlite3
from datetime import datetime, timedelta
import pytz
import os
from config import HUGGING_FACE_TOKEN, TABLES_DATA_DIR, logsDir, RESULT_CSV_DIR
import pandas as pd
import csv
import random
try:
os.makedirs(TABLES_DATA_DIR, exist_ok=True)
except:
pass
# Set the time zone to Pacific Time Zone
TIME_ZONE = 'US/Pacific'
TIMEZONE_OBJ = pytz.timezone(TIME_ZONE)
CACHE_TIME_EXPIRE = 20 #days
def getNewCsvFilePath():
fileName = "ResultCsv_" + "".join([str(random.randint(0,9)) for i in range(3)])
fileNameWithpath = os.path.join(RESULT_CSV_DIR, fileName)
return fileNameWithpath
def removeAllCsvFiles():
files = os.listdir(RESULT_CSV_DIR)
for fileName in files:
fileNameWithPath = os.path.join(RESULT_CSV_DIR, fileName)
try:
os.remove(fileNameWithPath)
except:
pass
def append_dict_to_csv(file_path, row_data):
fieldnames = row_data.keys()
with open(file_path, 'a') as csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# Check if the file is empty, and if so, write the header
if csv_file.tell() == 0:
csv_writer.writeheader()
csv_writer.writerow(row_data)
def saveLog(message, level='info') -> None:
global logsDir
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return
current_time = datetime.now(TIMEZONE_OBJ)
message = str(message)
log_file_path = os.path.join(logsDir, f"{current_time.strftime('%Y-%m')}-log.csv")
data_dict = {"time":str(current_time), "level": level, "message": message}
append_dict_to_csv(log_file_path, data_dict)
def getAllLogFilesPaths():
global logsDir
# Save processed data to temporary file
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return []
logFiles = [file for file in os.listdir(logsDir) if 'log' in file.lower()]
print(logFiles,"avaiable logs")
downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles]
return downloadableFilesPaths
def getLocalDbFileName():
if len(os.listdir(TABLES_DATA_DIR))==0:
return None
localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD
return localDbName
def isTablesCacheValid():
localDbName = getLocalDbFileName()
timeCreatedStr = localDbName.split('.')[0]
timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d')
if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now():
return True
return False
def removeFile(fileNameWithPath):
if os.path.exists(fileNameWithPath):
os.remove(fileNameWithPath)
print(f"File '{fileNameWithPath}' deleted successfully.")
else:
print(f"File '{fileNameWithPath}' does not exist.")
def saveTablesDataToLocalDB(tablesData):
for prevDbs in os.listdir(TABLES_DATA_DIR):
removeFile(os.path.join(TABLES_DATA_DIR, prevDbs))
newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db'
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb)
print(f"saving to local db {localDbNameWithPath}")
conn = sqlite3.connect(localDbNameWithPath)
for tableName in tablesData.keys():
tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False)
conn.close()
def retrieveTablesDataFromLocalDb(tablesList):
print("retreving tables from localDb")
localDbName = getLocalDbFileName()
if localDbName==None:
return {}
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName)
if not isTablesCacheValid():
removeFile(localDbNameWithPath)
return {}
conn = sqlite3.connect(localDbNameWithPath)
data = {}
for tableName in tablesList:
try:
sql = f'SELECT * FROM {tableName}'
df = pd.read_sql_query(sql, con=conn)
data[tableName] = df
except:
print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.")
conn.close()
return {}
conn.close()
return data