Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| import pytz | |
| import os | |
| from config import HUGGING_FACE_TOKEN, TABLES_DATA_DIR, logsDir, RESULT_CSV_DIR | |
| import pandas as pd | |
| import csv | |
| import random | |
| try: | |
| os.makedirs(TABLES_DATA_DIR, exist_ok=True) | |
| except: | |
| pass | |
| try: | |
| os.makedirs(RESULT_CSV_DIR, exist_ok=True) | |
| except: | |
| pass | |
| # Set the time zone to Pacific Time Zone | |
| TIME_ZONE = 'US/Pacific' | |
| TIMEZONE_OBJ = pytz.timezone(TIME_ZONE) | |
| CACHE_TIME_EXPIRE = 20 #days | |
| def getNewCsvFilePath(): | |
| fileName = "ResultCsv_" + "".join([str(random.randint(0,9)) for i in range(3)]) + ".csv" | |
| fileNameWithpath = os.path.join(RESULT_CSV_DIR, fileName) | |
| return fileNameWithpath | |
| def removeAllCsvFiles(): | |
| files = os.listdir(RESULT_CSV_DIR) | |
| for fileName in files: | |
| fileNameWithPath = os.path.join(RESULT_CSV_DIR, fileName) | |
| try: | |
| os.remove(fileNameWithPath) | |
| except: | |
| pass | |
| def append_dict_to_csv(file_path, row_data): | |
| fieldnames = row_data.keys() | |
| with open(file_path, 'a') as csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
| # Check if the file is empty, and if so, write the header | |
| if csv_file.tell() == 0: | |
| csv_writer.writeheader() | |
| csv_writer.writerow(row_data) | |
| def saveLog(message, level='info') -> None: | |
| global logsDir | |
| if not os.path.isdir(logsDir): | |
| print("Log directory/Data Directory not available.") | |
| return | |
| current_time = datetime.now(TIMEZONE_OBJ) | |
| message = str(message) | |
| log_file_path = os.path.join(logsDir, f"{current_time.strftime('%Y-%m')}-log.csv") | |
| data_dict = {"time":str(current_time), "level": level, "message": message} | |
| append_dict_to_csv(log_file_path, data_dict) | |
| def getAllLogFilesPaths(): | |
| global logsDir | |
| # Save processed data to temporary file | |
| if not os.path.isdir(logsDir): | |
| print("Log directory/Data Directory not available.") | |
| return [] | |
| logFiles = [file for file in os.listdir(logsDir) if 'log' in file.lower()] | |
| print(logFiles,"avaiable logs") | |
| downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles] | |
| return downloadableFilesPaths | |
| def getLocalDbFileName(): | |
| if len(os.listdir(TABLES_DATA_DIR))==0: | |
| return None | |
| localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD | |
| return localDbName | |
| def isTablesCacheValid(): | |
| localDbName = getLocalDbFileName() | |
| timeCreatedStr = localDbName.split('.')[0] | |
| timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d') | |
| if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now(): | |
| return True | |
| return False | |
| def removeFile(fileNameWithPath): | |
| if os.path.exists(fileNameWithPath): | |
| os.remove(fileNameWithPath) | |
| print(f"File '{fileNameWithPath}' deleted successfully.") | |
| else: | |
| print(f"File '{fileNameWithPath}' does not exist.") | |
| def saveTablesDataToLocalDB(tablesData): | |
| for prevDbs in os.listdir(TABLES_DATA_DIR): | |
| removeFile(os.path.join(TABLES_DATA_DIR, prevDbs)) | |
| newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db' | |
| localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb) | |
| print(f"saving to local db {localDbNameWithPath}") | |
| conn = sqlite3.connect(localDbNameWithPath) | |
| for tableName in tablesData.keys(): | |
| tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False) | |
| conn.close() | |
| def retrieveTablesDataFromLocalDb(tablesList): | |
| print("retreving tables from localDb") | |
| localDbName = getLocalDbFileName() | |
| if localDbName==None: | |
| return {} | |
| localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName) | |
| if not isTablesCacheValid(): | |
| removeFile(localDbNameWithPath) | |
| return {} | |
| conn = sqlite3.connect(localDbNameWithPath) | |
| data = {} | |
| for tableName in tablesList: | |
| try: | |
| sql = f'SELECT * FROM {tableName}' | |
| df = pd.read_sql_query(sql, con=conn) | |
| data[tableName] = df | |
| except: | |
| print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.") | |
| conn.close() | |
| return {} | |
| conn.close() | |
| return data |