import sqlite3 from datetime import datetime, timedelta import pytz import os from config import HUGGING_FACE_TOKEN, TABLES_DATA_DIR, logsDir, RESULT_CSV_DIR import pandas as pd import csv import random try: os.makedirs(TABLES_DATA_DIR, exist_ok=True) except: pass try: os.makedirs(RESULT_CSV_DIR, exist_ok=True) except: pass # Set the time zone to Pacific Time Zone TIME_ZONE = 'US/Pacific' TIMEZONE_OBJ = pytz.timezone(TIME_ZONE) CACHE_TIME_EXPIRE = 20 #days def getNewCsvFilePath(): fileName = "ResultCsv_" + "".join([str(random.randint(0,9)) for i in range(3)]) + ".csv" fileNameWithpath = os.path.join(RESULT_CSV_DIR, fileName) return fileNameWithpath def removeAllCsvFiles(): files = os.listdir(RESULT_CSV_DIR) for fileName in files: fileNameWithPath = os.path.join(RESULT_CSV_DIR, fileName) try: os.remove(fileNameWithPath) except: pass def append_dict_to_csv(file_path, row_data): fieldnames = row_data.keys() with open(file_path, 'a') as csv_file: csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames) # Check if the file is empty, and if so, write the header if csv_file.tell() == 0: csv_writer.writeheader() csv_writer.writerow(row_data) def saveLog(message, level='info') -> None: global logsDir if not os.path.isdir(logsDir): print("Log directory/Data Directory not available.") return current_time = datetime.now(TIMEZONE_OBJ) message = str(message) log_file_path = os.path.join(logsDir, f"{current_time.strftime('%Y-%m')}-log.csv") data_dict = {"time":str(current_time), "level": level, "message": message} append_dict_to_csv(log_file_path, data_dict) def getAllLogFilesPaths(): global logsDir # Save processed data to temporary file if not os.path.isdir(logsDir): print("Log directory/Data Directory not available.") return [] logFiles = [file for file in os.listdir(logsDir) if 'log' in file.lower()] print(logFiles,"avaiable logs") downloadableFilesPaths = [os.path.join(os.path.abspath(logsDir), logFilePath) for logFilePath in logFiles] return downloadableFilesPaths def getLocalDbFileName(): if len(os.listdir(TABLES_DATA_DIR))==0: return None localDbName = os.listdir(TABLES_DATA_DIR)[0] #'2023-12-03.db YYYY-MM-DD return localDbName def isTablesCacheValid(): localDbName = getLocalDbFileName() timeCreatedStr = localDbName.split('.')[0] timeCreated = datetime.strptime(timeCreatedStr, '%Y-%m-%d') if timeCreated + timedelta(days=CACHE_TIME_EXPIRE) > datetime.now(): return True return False def removeFile(fileNameWithPath): if os.path.exists(fileNameWithPath): os.remove(fileNameWithPath) print(f"File '{fileNameWithPath}' deleted successfully.") else: print(f"File '{fileNameWithPath}' does not exist.") def saveTablesDataToLocalDB(tablesData): for prevDbs in os.listdir(TABLES_DATA_DIR): removeFile(os.path.join(TABLES_DATA_DIR, prevDbs)) newLocalDb = datetime.now(TIMEZONE_OBJ).strftime('%Y-%m-%d') + '.db' localDbNameWithPath = os.path.join(TABLES_DATA_DIR, newLocalDb) print(f"saving to local db {localDbNameWithPath}") conn = sqlite3.connect(localDbNameWithPath) for tableName in tablesData.keys(): tablesData[tableName].to_sql(tableName, conn, if_exists='replace', index=False) conn.close() def retrieveTablesDataFromLocalDb(tablesList): print("retreving tables from localDb") localDbName = getLocalDbFileName() if localDbName==None: return {} localDbNameWithPath = os.path.join(TABLES_DATA_DIR, localDbName) if not isTablesCacheValid(): removeFile(localDbNameWithPath) return {} conn = sqlite3.connect(localDbNameWithPath) data = {} for tableName in tablesList: try: sql = f'SELECT * FROM {tableName}' df = pd.read_sql_query(sql, con=conn) data[tableName] = df except: print(f"Couldn't read {tableName} from localDb. Advise to read all the tables.") conn.close() return {} conn.close() return data