Spaces:
Runtime error
Runtime error
File size: 4,275 Bytes
522ce80 68a4e98 522ce80 be452a0 d290cd6 522ce80 4a842c4 d290cd6 4a842c4 522ce80 68a4e98 d290cd6 b73a352 454d802 be452a0 d290cd6 be452a0 b73a352 be452a0 1dda07c 1fcd9e2 1dda07c f80313a 1fcd9e2 1dda07c 522ce80 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import sqlite3
from datetime import datetime, timedelta
import pytz
import os
from config import HUGGING_FACE_TOKEN, TABLES_DATA_DIR, logsDir, RESULT_CSV_DIR
import pandas as pd
import csv
import random
try:
os.makedirs(TABLES_DATA_DIR, exist_ok=True)
except:
pass
try:
os.makedirs(RESULT_CSV_DIR, exist_ok=True)
except:
pass
# Set the time zone to Pacific Time Zone
TIME_ZONE = 'US/Pacific'
TIMEZONE_OBJ = pytz.timezone(TIME_ZONE)
CACHE_TIME_EXPIRE = 20 #days
def getNewCsvFilePath():
fileName = "ResultCsv_" + "".join([str(random.randint(0,9)) for i in range(3)]) + ".csv"
fileNameWithpath = os.path.join(RESULT_CSV_DIR, fileName)
return fileNameWithpath
def removeAllCsvFiles():
files = os.listdir(RESULT_CSV_DIR)
for fileName in files:
fileNameWithPath = os.path.join(RESULT_CSV_DIR, fileName)
try:
os.remove(fileNameWithPath)
except:
pass
def append_dict_to_csv(file_path, row_data):
fieldnames = row_data.keys()
with open(file_path, 'a') as csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
# Check if the file is empty, and if so, write the header
if csv_file.tell() == 0:
csv_writer.writeheader()
csv_writer.writerow(row_data)
def saveLog(message, level='info') -> None:
global logsDir
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return
current_time = datetime.now(TIMEZONE_OBJ)
message = str(message)
log_file_path = os.path.join(logsDir, f"{current_time.strftime('%Y-%m')}-log.csv")
data_dict = {"time":str(current_time), "level": level, "message": message}
append_dict_to_csv(log_file_path, data_dict)
def getAllLogFilesPaths():
global logsDir
# Save processed data to temporary file
if not os.path.isdir(logsDir):
print("Log directory/Data Directory not available.")
return []
logFiles = [file for file in os.listdir(logsDir) if 'log' in file.lower()]
print(logFiles,"avaiable logs")
downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles]
return downloadableFilesPaths
def getLocalDbFileName():
if len(os.listdir(TABLES_DATA_DIR))==0:
return None
localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD
return localDbName
def isTablesCacheValid():
localDbName = getLocalDbFileName()
timeCreatedStr = localDbName.split('.')[0]
timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d')
if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now():
return True
return False
def removeFile(fileNameWithPath):
if os.path.exists(fileNameWithPath):
os.remove(fileNameWithPath)
print(f"File '{fileNameWithPath}' deleted successfully.")
else:
print(f"File '{fileNameWithPath}' does not exist.")
def saveTablesDataToLocalDB(tablesData):
for prevDbs in os.listdir(TABLES_DATA_DIR):
removeFile(os.path.join(TABLES_DATA_DIR, prevDbs))
newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db'
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb)
print(f"saving to local db {localDbNameWithPath}")
conn = sqlite3.connect(localDbNameWithPath)
for tableName in tablesData.keys():
tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False)
conn.close()
def retrieveTablesDataFromLocalDb(tablesList):
print("retreving tables from localDb")
localDbName = getLocalDbFileName()
if localDbName==None:
return {}
localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName)
if not isTablesCacheValid():
removeFile(localDbNameWithPath)
return {}
conn = sqlite3.connect(localDbNameWithPath)
data = {}
for tableName in tablesList:
try:
sql = f'SELECT * FROM {tableName}'
df = pd.read_sql_query(sql, con=conn)
data[tableName] = df
except:
print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.")
conn.close()
return {}
conn.close()
return data |