| | import os |
| | import pandas as pd |
| | import gzip |
| | import pickle |
| | import numpy as np |
| |
|
| | def get_file_path_end(file_path): |
| | |
| | basename = os.path.basename(file_path) |
| | |
| | |
| | filename_without_extension, _ = os.path.splitext(basename) |
| |
|
| | |
| | |
| | return filename_without_extension |
| |
|
| | def get_file_path_end_with_ext(file_path): |
| | |
| | basename = os.path.basename(file_path) |
| | |
| | return basename |
| |
|
| | def ensure_output_folder_exists(): |
| | """Checks if the 'output/' folder exists, creates it if not.""" |
| |
|
| | folder_name = "output/" |
| |
|
| | if not os.path.exists(folder_name): |
| | |
| | os.makedirs(folder_name) |
| | print(f"Created the 'output/' folder.") |
| | else: |
| | print(f"The 'output/' folder already exists.") |
| |
|
| | def detect_file_type(filename): |
| | """Detect the file type based on its extension.""" |
| | if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): |
| | return 'csv' |
| | elif filename.endswith('.xlsx'): |
| | return 'xlsx' |
| | elif filename.endswith('.parquet'): |
| | return 'parquet' |
| | elif filename.endswith('.pkl.gz'): |
| | return 'pkl.gz' |
| | elif filename.endswith('.pkl'): |
| | return 'pkl' |
| | elif filename.endswith('.npz'): |
| | return 'npz' |
| | else: |
| | raise ValueError("Unsupported file type.") |
| |
|
| |
|
| | def read_file(filename, headers=0): |
| | """Read the file based on its detected type.""" |
| | file_type = detect_file_type(filename) |
| | |
| | print("Loading in file") |
| |
|
| | if file_type == 'csv': |
| | file = pd.read_csv(filename, low_memory=False, header=headers) |
| | elif file_type == 'xlsx': |
| | file = pd.read_excel(filename, header=headers) |
| | elif file_type == 'parquet': |
| | file = pd.read_parquet(filename, header = headers) |
| | elif file_type == 'pkl.gz': |
| | with gzip.open(filename, 'rb') as file: |
| | file = pickle.load(file) |
| | |
| | elif file_type == 'npz': |
| | file = np.load(filename)['arr_0'] |
| |
|
| | |
| | if "compress" in filename: |
| | file /= 100 |
| |
|
| | print("File load complete") |
| |
|
| | return file |
| |
|
| | |
| | def add_folder_to_path(folder_path: str): |
| | ''' |
| | Check if a folder exists on your system. If so, get the absolute path and then add it to the system Path variable if it doesn't already exist. |
| | ''' |
| |
|
| | if os.path.exists(folder_path) and os.path.isdir(folder_path): |
| | print(folder_path, "folder exists.") |
| |
|
| | |
| | absolute_path = os.path.abspath(folder_path) |
| |
|
| | current_path = os.environ['PATH'] |
| | if absolute_path not in current_path.split(os.pathsep): |
| | full_path_extension = absolute_path + os.pathsep + current_path |
| | os.environ['PATH'] = full_path_extension |
| | print(f"Updated PATH with: ", full_path_extension) |
| | else: |
| | print(f"Directory {folder_path} already exists in PATH.") |
| | else: |
| | print(f"Folder not found at {folder_path} - not added to PATH") |
| |
|
| | def custom_regex_load(in_file, headers = None): |
| | ''' |
| | When file is loaded, update the column dropdown choices and write to relevant data states. |
| | ''' |
| |
|
| | custom_regex = pd.DataFrame() |
| |
|
| | file_list = [string.name for string in in_file] |
| |
|
| | regex_file_names = [string for string in file_list if "csv" in string.lower()] |
| | if regex_file_names: |
| | regex_file_name = regex_file_names[0] |
| | custom_regex = read_file(regex_file_name, headers) |
| | |
| |
|
| | output_text = "Data file loaded." |
| | print(output_text) |
| | else: |
| | error = "No regex file provided." |
| | print(error) |
| | output_text = error |
| | return error, custom_regex |
| | |
| | return output_text, custom_regex |
| |
|