Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import re | |
| import os | |
| import warnings | |
| import gradio as gr | |
| import re | |
| import zipfile | |
| import datetime | |
| import openpyxl | |
| from openpyxl.styles import Font, PatternFill | |
| from openpyxl.utils import column_index_from_string, get_column_letter | |
| g_mapping = None | |
| elems = """ | |
| #button { | |
| /* Permalink - use to edit and share this gradient: https://colorzilla.com/gradient-editor/#f6e6b4+0,ed9017+100;Yellow+3D+%231 */ | |
| background: #f6e6b4; /* Old browsers */ | |
| background: -moz-linear-gradient(top, #f6e6b4 0%, #ed9017 100%); /* FF3.6-15 */ | |
| background: -webkit-linear-gradient(top, #f6e6b4 0%,#ed9017 100%); /* Chrome10-25,Safari5.1-6 */ | |
| background: linear-gradient(to bottom, #f6e6b4 0%,#ed9017 100%); /* W3C, IE10+, FF16+, Chrome26+, Opera12+, Safari7+ */ | |
| filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#f6e6b4', endColorstr='#ed9017',GradientType=0 ); /* IE6-9 */ | |
| text-shadow: 2px 2px 10px #000000; | |
| } | |
| """ | |
| def download_csv_as_dataframe(url): | |
| import io | |
| import pandas as pd | |
| import requests | |
| if 'drive.google.com' in url: | |
| # Google Drive link | |
| file_id = url.split('/')[-2] | |
| download_url = f'https://drive.google.com/uc?id={file_id}' | |
| elif 'docs.google.com/spreadsheets' in url: | |
| # Google Sheets link | |
| file_id = url.split('/')[-2] | |
| download_url = f'https://docs.google.com/spreadsheets/d/{file_id}/export?format=csv' | |
| else: | |
| print('Invalid URL') | |
| return None | |
| # Send a GET request to download the file | |
| response = requests.get(download_url) | |
| # Read the content as CSV and convert to DataFrame | |
| content = response.content.decode('utf-8') | |
| df = pd.read_csv(io.StringIO(content)) | |
| return df | |
| def map_names(odf,fname): | |
| global g_mapping | |
| msg = None | |
| if g_mapping is None: | |
| g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link') | |
| mapping = g_mapping#pd.read_csv("data_automation_mapping.csv") | |
| fname = fname.lower() | |
| ftype = next((element for element in [x for x in list(mapping['type'].unique())] if element.lower() in fname), None) | |
| fcompany = next((element for element in [x for x in list(mapping['company'].unique())] if element.lower() in fname), None) | |
| mapped_frame = None | |
| if ftype is not None and fcompany is not None: | |
| print(fname,"has been successfully remapped") | |
| query_result = mapping[(mapping['type'].str.lower() == ftype.lower()) & (mapping['company'].str.lower() == fcompany.lower())] | |
| mapped_frame = query_result | |
| for index, row in mapped_frame.iterrows(): | |
| original_val = row['original'] | |
| rename_val = row['rename'] | |
| odf = odf.replace(original_val, rename_val) | |
| #display(odf) | |
| mapped_frame = odf | |
| else: | |
| mapped_frame = odf | |
| msg = ' LOB has not been mapped for this file as name must have insurance line of business type (example: as_motor_summary.csv)' | |
| print(msg) | |
| return mapped_frame,msg | |
| def get_lob(df): | |
| global g_mapping | |
| if g_mapping is None: | |
| g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link') | |
| mapping = g_mapping | |
| column_names = set(df.columns) | |
| best_match_col = None | |
| max_matches = 0 | |
| for pattern in ["lob", "market_segment", "product", "class_of_business", 'type']: | |
| matching_columns = {col for col in column_names if pattern in col.lower()} | |
| for col in matching_columns: | |
| matches = sum(df[col].isin(g_mapping['original'])) | |
| if matches > max_matches: | |
| best_match_col = col | |
| max_matches = matches | |
| column_names -= matching_columns | |
| return best_match_col if max_matches > 0 else None | |
| def get_paid_amount(df): | |
| for col in df.columns: | |
| # Replace "Gross" with "amount" in column name | |
| if "Gross" in col or "gross" in col: | |
| new_col = col.replace("Gross", "amount").replace("gross", "amount") | |
| else: | |
| new_col = col | |
| # If "paid" and "amount" are in the column name, return the column name | |
| if "paid" in new_col.lower() and "amount" in new_col.lower(): | |
| return col | |
| # If "paid" and "claim" are in the column name, return the column name | |
| if "paid" in new_col.lower() and "claim" in new_col.lower(): | |
| return col | |
| return None | |
| def get_gross_os(df): | |
| for col in df.columns: | |
| if 'ri' in col.lower(): | |
| continue | |
| new_col = col.replace("gross", "amount").replace("Gross", "Amount") | |
| if "amount" in new_col.lower() and "os" in new_col.lower(): | |
| return col | |
| if "os" in new_col.lower() and "claim" in new_col.lower(): | |
| return col | |
| return None | |
| def get_recover_os(df): | |
| for col in df.columns: | |
| # If "recover" and "os" are in the column name, return the column name | |
| if "recover" in col.lower() and "os" in col.lower() and "ed" not in col.lower(): | |
| return col | |
| return None | |
| def get_gross_recoveries(df): | |
| for col in df.columns: | |
| # Replace "settled" with "amount" in column name | |
| new_col = col.replace("settled", "amount").replace("Settled", "Amount") | |
| # If "recover" and "amount" are in the column name, return the column name | |
| if "recover" in new_col.lower() and "amount" in new_col.lower(): | |
| return col | |
| # If "gross" and "recover" are in the column name, return the column name | |
| if "gross" in new_col.lower() and "recover" in new_col.lower(): | |
| return col | |
| return None | |
| def get_claim_count(df): | |
| for col in df.columns: | |
| # If "claim" and "count" are in the column name, return the column name | |
| if "claim" in col.lower() and "count" in col.lower(): | |
| return col | |
| return None | |
| def get_quarter_bracket(df): | |
| columns = df.columns | |
| for col in columns: | |
| if col.lower() == "quarter_bracket": | |
| return col | |
| return None | |
| def get_earned(df): | |
| for col in df.columns: | |
| # If "GEP" is in the column name, return the column name | |
| if "gep" in col.lower(): | |
| return col | |
| # If "premium" and "earned" are in the column name, return the column name | |
| if "premium" in col.lower() and "earned" in col.lower(): | |
| return col | |
| return None | |
| def get_erp(df): | |
| for col in df.columns: | |
| # If "ERP" is in the column name, return the column name | |
| if "erp" in col.lower(): | |
| return col | |
| return None | |
| def quarters(df): | |
| valid_cols = [] | |
| df = df.applymap(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) | |
| for col in df.columns: | |
| # Check if all values in column are either 'nan' or numeric | |
| if all(df[col].apply(lambda x: str(x).isnumeric() or str(x) == 'nan')): | |
| # Check if column has at least one value with length of 6 | |
| if any(df[col].apply(lambda x: len(str(x))) == 6): | |
| # Check if all non-zero numeric values end with '03', '06', '09', or '12' | |
| filtered = df[df[col] != '0'] | |
| filtered = filtered[filtered[col].apply(lambda x: str(x).isnumeric())] | |
| if filtered[col].apply(lambda x: x[-2:]).isin(['03', '06', '09', '12']).all(): | |
| valid_cols.append(col) | |
| valid_cols = [elem for elem in valid_cols if "report" not in elem.lower() if "effect" not in elem.lower()] | |
| return valid_cols | |
| def col_to_ints(df,columns_to_convert): | |
| for col in columns_to_convert: | |
| df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) | |
| return df | |
| def fill_missing_quarters(df, lob, acc, transaction): | |
| filled = [] | |
| missing_count = 0 | |
| lobs_dict = dict() | |
| print('accident',acc,'transaction',transaction) | |
| columns_to_convert = [acc,transaction] # Only affect acc and transaction | |
| print('Number of NaN values in', acc, ':', df[acc].isna().sum()) | |
| print('Number of NaN values in', transaction, ':', df[transaction].isna().sum()) | |
| for col in columns_to_convert: | |
| df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) | |
| quarters = [] | |
| start_year = 2017 | |
| end_year = 2022 | |
| # df_temp = df.copy(deep=True) | |
| # df_temp = df_temp.dropna() | |
| end_year = min(int(df[acc].max()[:4]), 2022) | |
| print("the end year", end_year) | |
| print("safe and sound") | |
| for year in range(start_year, end_year+1): | |
| for quarter in ['03', '06', '09', '12']: | |
| quarters.append(str(year) + quarter) | |
| # Find the missing quarters by LOB | |
| missing_quarters = [] | |
| for l in df[lob].unique(): | |
| l_df = df[df[lob] == l] | |
| l_quarters = set(quarters) - set(l_df[acc]) | |
| l_missing_df = pd.DataFrame({acc: list(l_quarters), | |
| transaction: [str(end_year)+'12'] * len(l_quarters)}) | |
| for col in df.columns: # Fill the missing | |
| #print("\n"*5,col,transaction) | |
| if col != lob: # These two checks are nesscary in case we are filling for the premium then we only fill it with the missing quarters without the 202212 for transactions | |
| if col == acc: | |
| l_missing_df[col] = list(l_quarters) | |
| elif str(col) == str(transaction): | |
| l_missing_df[col] = [str(end_year) + '12'] * len(l_quarters) | |
| else: | |
| # Pad | |
| l_missing_df[col] = 0.1 | |
| # Count padding per lob | |
| if col not in lobs_dict: | |
| lobs_dict[col] = 0 | |
| lobs_dict[col] = 0.1 + lobs_dict[col] | |
| # Count total paddings | |
| missing_count = missing_count + 1 | |
| if len(l_quarters) > 0 : | |
| filled_warn = str(l)+' was filled with the dates '+str(l_quarters) | |
| print(filled_warn) | |
| filled.append(filled_warn) | |
| l_missing_df[lob] = l | |
| missing_quarters.append(l_missing_df) | |
| filled.append([lobs_dict.keys(),lobs_dict.values()]) | |
| #filled.append("Total paddings (0.1): "+str(missing_count)) | |
| print("=="*100) | |
| print('Unique values in', acc, 'for missing quarters:', l_missing_df[acc].unique()) | |
| # Concatenate the original dataframe and the missing quarters dataframe | |
| filled_df = pd.concat([df] + missing_quarters, ignore_index=True) | |
| print('Number of NaN values in', acc, 'after concatenation:', filled_df[acc].isna().sum()) | |
| print('Unique values in', acc, 'before conversion:', filled_df[acc].unique()) | |
| # Convert the 'accident_quarter_bracket' column to datetime format | |
| filled_df[acc] = pd.to_datetime(filled_df[acc], format='%Y%m').dt.strftime('%Y%m') | |
| print('Unique values in', acc, 'after conversion:', filled_df[acc].unique()) | |
| print("=="*100) | |
| # Sort the dataframe by quarter | |
| filled_df = filled_df.sort_values(acc) | |
| # Reset the index | |
| filled_df = filled_df.reset_index(drop=True) | |
| # Print the filled quarters or a message if there are no missing quarters | |
| filled_quarters = filled_df[acc].unique() | |
| filtered_quarters = [q for q in filled_quarters if q[:4] in [str(year1) for year1 in range(start_year, end_year + 1)]] | |
| if len(filtered_quarters) == 0: | |
| msg = "No missing quarters between "+start_year+"-"+str(end_year) | |
| print(msg) | |
| filled.append(msg) | |
| else: | |
| pass#print(filtered_quarters) | |
| #filled_df = filled_df[[acc, transaction] + [col for col in filled_df.columns if col not in [acc, transaction]]] | |
| return filled_df,filled | |
| def drop_missing_rows(df, columns): | |
| #import sys | |
| removed_rows = df[df[columns].isnull().any(axis=1)] | |
| #display(removed_rows) | |
| print("LOB NAME", columns[0]) | |
| #sys.exit() | |
| removed_rows = df[df[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any') | |
| removed_rows = removed_rows[removed_rows[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any') | |
| df = df.dropna(subset=columns, how='any') | |
| return df,removed_rows | |
| # def write_log(sheet_data_dict): | |
| # workbook = openpyxl.Workbook() | |
| # max_sheet_name_length = 31 | |
| # for sheet_name, data_dict in sheet_data_dict.items(): | |
| # sheet_name = sheet_name[:max_sheet_name_length] | |
| # sheet = workbook.create_sheet(title=sheet_name) | |
| # col_index = 1 # Start from column 1 (A), column 0 does not exist in Excel | |
| # adjacent_col_index = 2 # Initialize adjacent column index to 2 (B) | |
| # row_index = 1 # Initialize row index to 1 to start writing from the first row | |
| # for title, data in data_dict.items(): | |
| # lst, color = data[0], (data[1] if len(data) > 1 else None) | |
| # adjacent = data[2] if len(data) > 2 else False | |
| # if adjacent: | |
| # write_col_index = adjacent_col_index # Use adjacent column | |
| # adjacent_col_index += 1 # Increment adjacent column index for next adjacent data | |
| # else: | |
| # write_col_index = col_index # Use column 1 (A) for non-adjacent data | |
| # row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from next available row in column 1 | |
| # # Write title | |
| # title_cell = sheet.cell(row=row_index, column=write_col_index) | |
| # title_cell.value = title | |
| # title_cell.font = Font(size=14, bold=True) | |
| # # Write list items and apply color | |
| # for item_index, item in enumerate(lst, start=row_index + 1): | |
| # cell = sheet.cell(row=item_index, column=write_col_index) | |
| # cell.value = item | |
| # if color: | |
| # fill = PatternFill(start_color=color, end_color=color, fill_type="solid") | |
| # cell.fill = fill | |
| # # Adjust column width | |
| # max_length = 0 | |
| # for cell in sheet[get_column_letter(write_col_index)]: | |
| # try: | |
| # if len(str(cell.value)) > max_length: | |
| # max_length = len(cell.value) | |
| # except: | |
| # pass | |
| # adjusted_width = (max_length + 2) | |
| # sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width | |
| # if "Sheet" in workbook.sheetnames: | |
| # workbook.remove(workbook["Sheet"]) | |
| # workbook.save('Log.xlsx') | |
| def write_log(sheet_data_dict): | |
| workbook = openpyxl.Workbook() | |
| max_sheet_name_length = 31 | |
| for sheet_name, data_dict in sheet_data_dict.items(): | |
| sheet_name = sheet_name[:max_sheet_name_length] | |
| sheet = workbook.create_sheet(title=sheet_name) | |
| col_index = 1 | |
| adjacent_col_index = 1 | |
| start_row_index = 1 | |
| for title, data in data_dict.items(): | |
| lst, color = data[0], (data[1] if len(data) > 1 else None) | |
| adjacent = data[2] if len(data) > 2 else False | |
| if adjacent: | |
| adjacent_col_index += 1 # Move to the next column for adjacent data | |
| write_col_index = adjacent_col_index # Write data in the adjacent column | |
| else: | |
| col_index = 1 # Reset to column 1 (A) for non-adjacent data | |
| adjacent_col_index = col_index # Reset adjacent column index | |
| write_col_index = col_index # Write data in column 1 (A) | |
| start_row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from the next available row in column 1 (A) | |
| # Write the title | |
| title_cell = sheet.cell(row=start_row_index, column=write_col_index) | |
| title_cell.value = title | |
| title_cell.font = Font(size=14, bold=True) | |
| # Write list items and apply color | |
| for item_index, item in enumerate(lst, start=start_row_index + 1): | |
| cell = sheet.cell(row=item_index, column=write_col_index) | |
| cell.value = item | |
| if color: | |
| fill = PatternFill(start_color=color, end_color=color, fill_type="solid") | |
| cell.fill = fill | |
| # Adjust the column width | |
| max_length = max(len(str(val)) for val in [title, *lst]) | |
| adjusted_width = (max_length + 2) | |
| sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width | |
| if "Sheet" in workbook.sheetnames: | |
| workbook.remove(workbook["Sheet"]) | |
| workbook.save('Log.xlsx') | |
| def column_letter(index): | |
| """Convert a column index into a column letter""" | |
| letters = "" | |
| while index > 0: | |
| index, remainder = divmod(index - 1, 26) | |
| letters = chr(65 + remainder) + letters | |
| return letters | |
| warnings = [] | |
| def is_found(c,text): | |
| global warnings | |
| if c[-1] == None: | |
| warnings.append(text+" was not found") | |
| def get_alts(atype): | |
| if atype == 'claim': | |
| return ['lob','accident_quarter_bracket','transaction_quarter_bracket','paid_amount','gross_recoveries_settled','os_amount','gross_os_recoveries','claim_count'] | |
| return ['lob','quarter_bracket','gross_premium_earned','ERP'] | |
| def filter_claims(df): | |
| print("Sum of Null beginning: ",df.isnull().sum()) | |
| print("Sum of Null beginning 2: ",(df == '').sum()) | |
| print(df.dtypes) | |
| filled_warn = [] | |
| global warnings | |
| warnings = [] | |
| columns = [] | |
| # Find lob | |
| columns.append(get_lob(df)) | |
| is_found(columns,"lob") | |
| if None in columns: | |
| return None,None | |
| # Find quarters | |
| sublist = quarters(df) | |
| print("\n"*10,sublist,"\n"*10) | |
| columns.extend(sublist) | |
| # min_col = min(sublist, key=lambda col: df.dropna()[col].sum()) | |
| # max_col = max(sublist, key=lambda col: df.dropna()[col].sum()) | |
| min_col = df[sublist].sum().idxmin() | |
| max_col = [col for col in sublist if col != min_col][0] | |
| df,temp = drop_missing_rows(df,columns) | |
| print('missing: ',df[df.columns[1]].isnull().sum()) | |
| #df.to_csv("gayassshit.csv") | |
| #temp.to_csv("gayassshit1.csv") | |
| #df.to_csv("before_filling.csv") | |
| #print("\n"*10,columns[0],min_col,max_col,"\n"*10) | |
| df, filled_warn = fill_missing_quarters(df,columns[0],min_col,max_col) | |
| #df.to_csv("after_filling.csv") | |
| #print(columns[0],min_col,max_col) | |
| #temp = fill_missing_quarters(temp,columns[0],min_col,max_col) | |
| df = col_to_ints(df,sublist) | |
| #df = df[[min_col, max_col] + [col for col in df.columns if col not in [min_col, max_col]]] | |
| #display(df) | |
| min_col_index = columns.index(min_col) # Find the index of min_col | |
| max_col_index = columns.index(max_col) # Find the index of max_col | |
| # Rearrange the columns list | |
| if min_col_index > max_col_index: | |
| columns.insert(max_col_index, columns.pop(min_col_index)) | |
| is_found(columns,"quarters") | |
| # Find paid amount | |
| columns.append(get_paid_amount(df)) | |
| is_found(columns,"paid amount") | |
| # Find gross recoveries | |
| columns.append(get_gross_recoveries(df)) | |
| is_found(columns,"gross recoveries") | |
| # Find gross os | |
| columns.append(get_gross_os(df)) | |
| is_found(columns,"gross os") | |
| # Find recover os | |
| columns.append(get_recover_os(df)) | |
| is_found(columns,"recover os") | |
| # Find claims count | |
| columns.append(get_claim_count(df)) | |
| is_found(columns,"claim count") | |
| # Warn | |
| for i,w in enumerate(warnings): | |
| print(str(i+1)+'-',w) | |
| #df = pd.concat([df, temp], ignore_index=True) | |
| df = df.replace('nan',0) | |
| df = df.fillna({col: 0 for col in df.columns if col not in sublist}) | |
| return df,columns,temp,filled_warn | |
| def filter_premiums(df): | |
| global warnings | |
| warnings = [] | |
| columns = [] | |
| filled_warn = [] | |
| # Find lob | |
| columns.append(get_lob(df)) | |
| is_found(columns,"lob") | |
| if None in columns: | |
| return None,None | |
| # Find quarter bracket | |
| columns.append(get_quarter_bracket(df)) | |
| df,filled_warn = fill_missing_quarters(df,columns[0],columns[-1],columns[-1]) | |
| is_found(columns,"quarter") | |
| # Find premium earned | |
| columns.append(get_earned(df)) | |
| is_found(columns,"premium earned") | |
| # Find ERP | |
| columns.append(get_erp(df)) | |
| is_found(columns,"ERP") | |
| # Warn | |
| for i,w in enumerate(warnings): | |
| print(str(i+1)+'-',w) | |
| return df,columns,filled_warn | |
| css_code='body{background-image:url("https://picsum.photos/seed/picsum/200/300");}' | |
| # def unzip_files(zip_file_path): | |
| # file_extension = os.path.splitext(zip_file_path)[1] | |
| # if file_extension == '.zip': | |
| # with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| # file_list = zip_ref.namelist() | |
| # csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))] | |
| # return csv_excel_files | |
| # else: | |
| # return [zip_file_path] | |
| def unzip_files(zip_file_path): | |
| file_extension = os.path.splitext(zip_file_path)[1] | |
| if file_extension == '.zip': | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| file_list = zip_ref.namelist() | |
| csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))] | |
| extracted_files = [] | |
| for file in csv_excel_files: | |
| zip_ref.extract(file) | |
| extracted_files.append(file) | |
| return extracted_files | |
| else: | |
| return [zip_file_path] | |
| def zip_files(file_paths): | |
| current_date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") | |
| new_file_name = f"processed_files_{current_date}.zip" | |
| with zipfile.ZipFile(new_file_name, 'w') as zipf: | |
| for file_path in file_paths: | |
| file_name = file_path.split('/')[-1] | |
| zipf.write(file_path, file_name) | |
| print(f"{len(file_paths)} files compressed and saved as '{new_file_name}'.") | |
| return new_file_name | |
| def valid(text): | |
| file_extensions = [".zip", ".xlsx", ".csv"] | |
| pattern = r"\b({})\b".format("|".join(map(re.escape, file_extensions))) | |
| match = re.search(pattern, text, flags=re.IGNORECASE) | |
| return bool(match) | |
| def op_outcome(name,msg): | |
| name = os.path.basename(name) | |
| return name+msg | |
| def process(files,button): | |
| global warnings | |
| fail = ' ❌\n' | |
| passe = ' ✔️\n' | |
| warn = ' ⚠️\n' | |
| status = [] | |
| cleaned_names = [] | |
| if files is None: | |
| msg = 'No file provided'+fail | |
| return None, msg | |
| names = unzip_files(files.name) | |
| sheet_data = dict() | |
| for name in names: | |
| #name = os.path.basename(name) | |
| if valid(name): | |
| # return zip_files([files.name]),'Success'+passe | |
| temp = None | |
| columns = [] | |
| filled_warn = [] | |
| replacens = dict() | |
| print("Processing:", name) | |
| try: | |
| df = pd.read_csv(name) | |
| except: | |
| df = pd.read_excel(name) | |
| old_cols = df.columns | |
| old_olds = list(old_cols) | |
| sums_old = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in old_cols] | |
| print("Before columns") | |
| print(old_olds) | |
| if "summ" in name: | |
| print("Summary:") | |
| df,columns,filled_warn = filter_premiums(df) | |
| if columns == None: | |
| print(name,'has no LOB column') | |
| print("--"*50) | |
| status.append(op_outcome(name,' has no LOB column'+fail)) | |
| continue | |
| altnames = get_alts('summ') | |
| else: | |
| print("Claims:") | |
| df,columns,temp,filled_warn = filter_claims(df) | |
| if columns == None: | |
| print(name,'has no LOB column') | |
| print("--"*50) | |
| status.append(op_outcome(name,' has no LOB column'+fail)) | |
| continue | |
| altnames = get_alts('claim') | |
| finalnames = [] | |
| for ind,col in enumerate(columns): | |
| if col is not None: | |
| finalnames.append(columns[ind]+" ("+altnames[ind]+")") | |
| columns = [x for x in columns if x is not None] | |
| print("After columns") | |
| print(columns) | |
| df, msg = map_names(df,name) | |
| df = df[columns] | |
| print("temp",temp) | |
| if isinstance(temp,pd.DataFrame): | |
| temp, _ = map_names(temp,name) | |
| temp = temp[columns] | |
| temp = temp[temp.iloc[:, 3:].sum(axis=1) != 0] | |
| df = pd.concat([df, temp], ignore_index=True) | |
| column_mapping = dict(zip(columns, finalnames)) | |
| df = df.rename(columns=column_mapping) | |
| # sum new | |
| ncols = df.columns | |
| sums_new = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in ncols] | |
| #display(df) | |
| name = os.path.basename(name) | |
| #print(columns) | |
| #print(warnings) | |
| sheetwarnings = [['No warnings'],'00FF00'] | |
| if len(warnings) > 0: | |
| sheetwarnings = [warnings,'FFA500'] | |
| filled_warn.pop(-1) | |
| if len(filled_warn) == 0: | |
| filled_warn = ['No fillings'] | |
| # else: | |
| # # tempt_list = [element for element in filled_warn[-2][0] if element in columns] | |
| # # filled_warn[-2] = "Padded columns "+str(list(tempt_list))+" with total of "+str(round(filled_warn[-2][1],3))+" each" | |
| # pass | |
| # fillings_amounts = filled_warn[-1][1] | |
| sheet_data[name] = { | |
| "Before columns": [old_olds], | |
| 'Sum Before':[sums_old,None,True], | |
| "After columns": [ncols, '00FF00'], | |
| 'Sum After':[sums_new,None,True], | |
| #'Filling amount':[fillings_amounts,None,True], | |
| 'Fillings':[filled_warn,None], | |
| "Warnings": sheetwarnings | |
| } | |
| c_name = name.split('.')[0]+'_cleaned.csv' | |
| df.to_csv(c_name,index=False) | |
| cleaned_names.append(c_name) | |
| formatted_warnings = '' | |
| if len(warnings) > 0: | |
| formatted_warnings = '📝:\n'+'\n'.join(warnings) | |
| if msg == None: | |
| status.append(op_outcome(name,' was processed'+passe+formatted_warnings)) | |
| else: | |
| status.append(op_outcome(name,msg+warn+formatted_warnings)) | |
| else: | |
| name = os.path.basename(name) | |
| status.append(op_outcome(name,' Failed (Only .csv, .xlsx, .zip are allowed)'+fail)) | |
| if len(cleaned_names) > 0: | |
| write_log(sheet_data) | |
| cleaned_names.append('Log.xlsx') | |
| final_file = zip_files(cleaned_names) | |
| else: | |
| final_file = None | |
| msg = '\n'.join(f"{index + 1}.{value}" for index, value in enumerate(status)) | |
| return gr.File.update(value=final_file,visible=True),msg | |
| #return(str(files)+'fole') | |
| with gr.Blocks(css=elems) as demo: | |
| gr.Markdown( | |
| """ | |
| <style> | |
| .inline-container { | |
| display: flex; | |
| align-items: center; | |
| } | |
| .zip-line { | |
| margin-top: 20px; | |
| position: relative; | |
| } | |
| .zip-line img { | |
| position: absolute; | |
| top: 0; | |
| left: 0; | |
| } | |
| </style> | |
| <div class="inline-container"> | |
| <img src="https://mustafasa.com/uploads/excel_sheet.png" alt="Excel Sheet" width="50px"> | |
| <h1>Upload a singular xlsx/csv file to clean</h1> | |
| </div> | |
| <div class="inline-container zip-line"> | |
| <img src="https://mustafasa.com/uploads/zip_icon.png" alt="Zip Icon" width="50px"> | |
| <img src="https://mustafasa.com/uploads/excel_sheet.png" alt="Excel Sheet" width="20px"> | |
| <h1 style="margin-left: 50px;">Or upload multiple compressed into a zip file</h1> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(): | |
| inp = gr.File(label='Input file/s') | |
| with gr.Row(): | |
| bt = gr.Button(value='🧹 Clean',elem_id='button') | |
| #bt1 = gr.Button(value='Restart') | |
| for _ in range(2): | |
| with gr.Row(): | |
| pass | |
| with gr.Row(): | |
| out = gr.File(label='Cleaned files',visible=False) | |
| with gr.Row(): | |
| log = gr.Textbox(label='Process log 📄',visible=True) | |
| bt.click(fn = process, inputs=[inp,bt], outputs=[out,log]) | |
| demo.launch(debug=True) |