Spaces:
Sleeping
Sleeping
File size: 28,513 Bytes
3548b0d a2d4254 3548b0d ec471dd 45a8eda ec471dd 3548b0d a2d4254 5cbe607 36c1546 cc753ea 4d81f1a 5cbe607 e12add2 3548b0d a2d4254 3548b0d d6f7a09 3548b0d a28b4e2 3548b0d d6f7a09 3548b0d ecf5eaa 3548b0d a02b03c 8cf2caa 6b636f6 4655ed4 b34225c 4655ed4 a35ed6e 4655ed4 a02b03c a35ed6e a02b03c a35ed6e 4655ed4 3548b0d 4655ed4 1543c02 4655ed4 a35ed6e 4655ed4 a35ed6e 4655ed4 6b636f6 4655ed4 6b636f6 8cf2caa 3548b0d 4655ed4 a02b03c 4655ed4 7a95dc9 266ecaa a35ed6e b34225c 4655ed4 b34225c 4655ed4 3548b0d b34225c a35ed6e 4655ed4 ffbb473 a02b03c ffbb473 4655ed4 b34225c 4655ed4 a02b03c 3548b0d a35ed6e 3548b0d a35ed6e 3548b0d a35ed6e 3548b0d 09a3650 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 e295594 64bd409 cd45ea6 88a93f5 1e00380 cd45ea6 44dead3 cd45ea6 44dead3 1e00380 6c93718 44dead3 1e00380 44dead3 1e00380 09a3650 1e00380 e295594 09a3650 44dead3 e295594 44dead3 882be38 dabc4f8 b50b758 1e00380 b50b758 44dead3 f680a07 1e00380 cd45ea6 09a3650 3548b0d a35ed6e a02b03c 3548b0d a35ed6e 3548b0d a35ed6e 3548b0d a35ed6e a02b03c a35ed6e 3548b0d a35ed6e 3548b0d a35ed6e 3548b0d a02b03c 3548b0d a02b03c 3548b0d a02b03c 3548b0d a02b03c 3548b0d d6f7a09 3548b0d 36c1546 862dfc9 36c1546 862dfc9 3548b0d 36c1546 3548b0d e744fb4 69a4ef9 3548b0d cd45ea6 3548b0d 9b230fd 3548b0d a35ed6e 3548b0d a02b03c 3548b0d cd45ea6 3ecc528 3548b0d cd45ea6 3548b0d a35ed6e a02b03c 3548b0d 36c1546 3548b0d 36c1546 3548b0d a35ed6e a02b03c 3548b0d 36c1546 3548b0d 36c1546 3548b0d cd45ea6 3548b0d d6f7a09 3548b0d a35ed6e 3548b0d 3ecc528 0e822b6 ead5cab a6c27e9 cd45ea6 7330837 cd45ea6 26eee44 32e0037 1866e68 3f207f0 945d196 32e0037 da09cc9 d9eeda6 87d250e 3ecc528 87d250e 945d196 fbe3c34 7330837 09a3650 266ecaa a6c27e9 3548b0d 36c1546 d6f7a09 36c1546 d6f7a09 36c1546 3548b0d 4190cf1 36c1546 3548b0d f680a07 cd45ea6 3548b0d 36c1546 cd45ea6 69a4ef9 3548b0d ce4be35 e12add2 3548b0d 8cfa387 3548b0d 484730e 3548b0d 5cbe607 3548b0d cd45ea6 c4a35dc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 | import pandas as pd
import numpy as np
import re
import os
import warnings
import gradio as gr
import re
import zipfile
import datetime
import openpyxl
from openpyxl.styles import Font, PatternFill
from openpyxl.utils import column_index_from_string, get_column_letter
g_mapping = None
elems = """
#button {
/* Permalink - use to edit and share this gradient: https://colorzilla.com/gradient-editor/#f6e6b4+0,ed9017+100;Yellow+3D+%231 */
background: #f6e6b4; /* Old browsers */
background: -moz-linear-gradient(top, #f6e6b4 0%, #ed9017 100%); /* FF3.6-15 */
background: -webkit-linear-gradient(top, #f6e6b4 0%,#ed9017 100%); /* Chrome10-25,Safari5.1-6 */
background: linear-gradient(to bottom, #f6e6b4 0%,#ed9017 100%); /* W3C, IE10+, FF16+, Chrome26+, Opera12+, Safari7+ */
filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#f6e6b4', endColorstr='#ed9017',GradientType=0 ); /* IE6-9 */
text-shadow: 2px 2px 10px #000000;
}
"""
def download_csv_as_dataframe(url):
import io
import pandas as pd
import requests
if 'drive.google.com' in url:
# Google Drive link
file_id = url.split('/')[-2]
download_url = f'https://drive.google.com/uc?id={file_id}'
elif 'docs.google.com/spreadsheets' in url:
# Google Sheets link
file_id = url.split('/')[-2]
download_url = f'https://docs.google.com/spreadsheets/d/{file_id}/export?format=csv'
else:
print('Invalid URL')
return None
# Send a GET request to download the file
response = requests.get(download_url)
# Read the content as CSV and convert to DataFrame
content = response.content.decode('utf-8')
df = pd.read_csv(io.StringIO(content))
return df
def map_names(odf,fname):
global g_mapping
msg = None
if g_mapping is None:
g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link')
mapping = g_mapping#pd.read_csv("data_automation_mapping.csv")
fname = fname.lower()
ftype = next((element for element in [x for x in list(mapping['type'].unique())] if element.lower() in fname), None)
fcompany = next((element for element in [x for x in list(mapping['company'].unique())] if element.lower() in fname), None)
mapped_frame = None
if ftype is not None and fcompany is not None:
print(fname,"has been successfully remapped")
query_result = mapping[(mapping['type'].str.lower() == ftype.lower()) & (mapping['company'].str.lower() == fcompany.lower())]
mapped_frame = query_result
for index, row in mapped_frame.iterrows():
original_val = row['original']
rename_val = row['rename']
odf = odf.replace(original_val, rename_val)
#display(odf)
mapped_frame = odf
else:
mapped_frame = odf
msg = ' LOB has not been mapped for this file as name must have insurance line of business type (example: as_motor_summary.csv)'
print(msg)
return mapped_frame,msg
def get_lob(df):
global g_mapping
if g_mapping is None:
g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link')
mapping = g_mapping
column_names = set(df.columns)
best_match_col = None
max_matches = 0
for pattern in ["lob", "market_segment", "product", "class_of_business", 'type']:
matching_columns = {col for col in column_names if pattern in col.lower()}
for col in matching_columns:
matches = sum(df[col].isin(g_mapping['original']))
if matches > max_matches:
best_match_col = col
max_matches = matches
column_names -= matching_columns
return best_match_col if max_matches > 0 else None
def get_paid_amount(df):
for col in df.columns:
# Replace "Gross" with "amount" in column name
if "Gross" in col or "gross" in col:
new_col = col.replace("Gross", "amount").replace("gross", "amount")
else:
new_col = col
# If "paid" and "amount" are in the column name, return the column name
if "paid" in new_col.lower() and "amount" in new_col.lower():
return col
# If "paid" and "claim" are in the column name, return the column name
if "paid" in new_col.lower() and "claim" in new_col.lower():
return col
return None
def get_gross_os(df):
for col in df.columns:
if 'ri' in col.lower():
continue
new_col = col.replace("gross", "amount").replace("Gross", "Amount")
if "amount" in new_col.lower() and "os" in new_col.lower():
return col
if "os" in new_col.lower() and "claim" in new_col.lower():
return col
return None
def get_recover_os(df):
for col in df.columns:
# If "recover" and "os" are in the column name, return the column name
if "recover" in col.lower() and "os" in col.lower() and "ed" not in col.lower():
return col
return None
def get_gross_recoveries(df):
for col in df.columns:
# Replace "settled" with "amount" in column name
new_col = col.replace("settled", "amount").replace("Settled", "Amount")
# If "recover" and "amount" are in the column name, return the column name
if "recover" in new_col.lower() and "amount" in new_col.lower():
return col
# If "gross" and "recover" are in the column name, return the column name
if "gross" in new_col.lower() and "recover" in new_col.lower():
return col
return None
def get_claim_count(df):
for col in df.columns:
# If "claim" and "count" are in the column name, return the column name
if "claim" in col.lower() and "count" in col.lower():
return col
return None
def get_quarter_bracket(df):
columns = df.columns
for col in columns:
if col.lower() == "quarter_bracket":
return col
return None
def get_earned(df):
for col in df.columns:
# If "GEP" is in the column name, return the column name
if "gep" in col.lower():
return col
# If "premium" and "earned" are in the column name, return the column name
if "premium" in col.lower() and "earned" in col.lower():
return col
return None
def get_erp(df):
for col in df.columns:
# If "ERP" is in the column name, return the column name
if "erp" in col.lower():
return col
return None
def quarters(df):
valid_cols = []
df = df.applymap(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x))
for col in df.columns:
# Check if all values in column are either 'nan' or numeric
if all(df[col].apply(lambda x: str(x).isnumeric() or str(x) == 'nan')):
# Check if column has at least one value with length of 6
if any(df[col].apply(lambda x: len(str(x))) == 6):
# Check if all non-zero numeric values end with '03', '06', '09', or '12'
filtered = df[df[col] != '0']
filtered = filtered[filtered[col].apply(lambda x: str(x).isnumeric())]
if filtered[col].apply(lambda x: x[-2:]).isin(['03', '06', '09', '12']).all():
valid_cols.append(col)
valid_cols = [elem for elem in valid_cols if "report" not in elem.lower() if "effect" not in elem.lower()]
return valid_cols
def col_to_ints(df,columns_to_convert):
for col in columns_to_convert:
df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x))
return df
def fill_missing_quarters(df, lob, acc, transaction):
filled = []
missing_count = 0
lobs_dict = dict()
print('accident',acc,'transaction',transaction)
columns_to_convert = [acc,transaction] # Only affect acc and transaction
print('Number of NaN values in', acc, ':', df[acc].isna().sum())
print('Number of NaN values in', transaction, ':', df[transaction].isna().sum())
for col in columns_to_convert:
df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x))
quarters = []
start_year = 2017
end_year = 2022
# df_temp = df.copy(deep=True)
# df_temp = df_temp.dropna()
end_year = min(int(df[acc].max()[:4]), 2022)
print("the end year", end_year)
print("safe and sound")
for year in range(start_year, end_year+1):
for quarter in ['03', '06', '09', '12']:
quarters.append(str(year) + quarter)
# Find the missing quarters by LOB
missing_quarters = []
for l in df[lob].unique():
l_df = df[df[lob] == l]
l_quarters = set(quarters) - set(l_df[acc])
l_missing_df = pd.DataFrame({acc: list(l_quarters),
transaction: [str(end_year)+'12'] * len(l_quarters)})
for col in df.columns: # Fill the missing
#print("\n"*5,col,transaction)
if col != lob: # These two checks are nesscary in case we are filling for the premium then we only fill it with the missing quarters without the 202212 for transactions
if col == acc:
l_missing_df[col] = list(l_quarters)
elif str(col) == str(transaction):
l_missing_df[col] = [str(end_year) + '12'] * len(l_quarters)
else:
# Pad
l_missing_df[col] = 0.1
# Count padding per lob
if col not in lobs_dict:
lobs_dict[col] = 0
lobs_dict[col] = 0.1 + lobs_dict[col]
# Count total paddings
missing_count = missing_count + 1
if len(l_quarters) > 0 :
filled_warn = str(l)+' was filled with the dates '+str(l_quarters)
print(filled_warn)
filled.append(filled_warn)
l_missing_df[lob] = l
missing_quarters.append(l_missing_df)
filled.append([lobs_dict.keys(),lobs_dict.values()])
#filled.append("Total paddings (0.1): "+str(missing_count))
print("=="*100)
print('Unique values in', acc, 'for missing quarters:', l_missing_df[acc].unique())
# Concatenate the original dataframe and the missing quarters dataframe
filled_df = pd.concat([df] + missing_quarters, ignore_index=True)
print('Number of NaN values in', acc, 'after concatenation:', filled_df[acc].isna().sum())
print('Unique values in', acc, 'before conversion:', filled_df[acc].unique())
# Convert the 'accident_quarter_bracket' column to datetime format
filled_df[acc] = pd.to_datetime(filled_df[acc], format='%Y%m').dt.strftime('%Y%m')
print('Unique values in', acc, 'after conversion:', filled_df[acc].unique())
print("=="*100)
# Sort the dataframe by quarter
filled_df = filled_df.sort_values(acc)
# Reset the index
filled_df = filled_df.reset_index(drop=True)
# Print the filled quarters or a message if there are no missing quarters
filled_quarters = filled_df[acc].unique()
filtered_quarters = [q for q in filled_quarters if q[:4] in [str(year1) for year1 in range(start_year, end_year + 1)]]
if len(filtered_quarters) == 0:
msg = "No missing quarters between "+start_year+"-"+str(end_year)
print(msg)
filled.append(msg)
else:
pass#print(filtered_quarters)
#filled_df = filled_df[[acc, transaction] + [col for col in filled_df.columns if col not in [acc, transaction]]]
return filled_df,filled
def drop_missing_rows(df, columns):
#import sys
removed_rows = df[df[columns].isnull().any(axis=1)]
#display(removed_rows)
print("LOB NAME", columns[0])
#sys.exit()
removed_rows = df[df[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any')
removed_rows = removed_rows[removed_rows[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any')
df = df.dropna(subset=columns, how='any')
return df,removed_rows
# def write_log(sheet_data_dict):
# workbook = openpyxl.Workbook()
# max_sheet_name_length = 31
# for sheet_name, data_dict in sheet_data_dict.items():
# sheet_name = sheet_name[:max_sheet_name_length]
# sheet = workbook.create_sheet(title=sheet_name)
# col_index = 1 # Start from column 1 (A), column 0 does not exist in Excel
# adjacent_col_index = 2 # Initialize adjacent column index to 2 (B)
# row_index = 1 # Initialize row index to 1 to start writing from the first row
# for title, data in data_dict.items():
# lst, color = data[0], (data[1] if len(data) > 1 else None)
# adjacent = data[2] if len(data) > 2 else False
# if adjacent:
# write_col_index = adjacent_col_index # Use adjacent column
# adjacent_col_index += 1 # Increment adjacent column index for next adjacent data
# else:
# write_col_index = col_index # Use column 1 (A) for non-adjacent data
# row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from next available row in column 1
# # Write title
# title_cell = sheet.cell(row=row_index, column=write_col_index)
# title_cell.value = title
# title_cell.font = Font(size=14, bold=True)
# # Write list items and apply color
# for item_index, item in enumerate(lst, start=row_index + 1):
# cell = sheet.cell(row=item_index, column=write_col_index)
# cell.value = item
# if color:
# fill = PatternFill(start_color=color, end_color=color, fill_type="solid")
# cell.fill = fill
# # Adjust column width
# max_length = 0
# for cell in sheet[get_column_letter(write_col_index)]:
# try:
# if len(str(cell.value)) > max_length:
# max_length = len(cell.value)
# except:
# pass
# adjusted_width = (max_length + 2)
# sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width
# if "Sheet" in workbook.sheetnames:
# workbook.remove(workbook["Sheet"])
# workbook.save('Log.xlsx')
def write_log(sheet_data_dict):
workbook = openpyxl.Workbook()
max_sheet_name_length = 31
for sheet_name, data_dict in sheet_data_dict.items():
sheet_name = sheet_name[:max_sheet_name_length]
sheet = workbook.create_sheet(title=sheet_name)
col_index = 1
adjacent_col_index = 1
start_row_index = 1
for title, data in data_dict.items():
lst, color = data[0], (data[1] if len(data) > 1 else None)
adjacent = data[2] if len(data) > 2 else False
if adjacent:
adjacent_col_index += 1 # Move to the next column for adjacent data
write_col_index = adjacent_col_index # Write data in the adjacent column
else:
col_index = 1 # Reset to column 1 (A) for non-adjacent data
adjacent_col_index = col_index # Reset adjacent column index
write_col_index = col_index # Write data in column 1 (A)
start_row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from the next available row in column 1 (A)
# Write the title
title_cell = sheet.cell(row=start_row_index, column=write_col_index)
title_cell.value = title
title_cell.font = Font(size=14, bold=True)
# Write list items and apply color
for item_index, item in enumerate(lst, start=start_row_index + 1):
cell = sheet.cell(row=item_index, column=write_col_index)
cell.value = item
if color:
fill = PatternFill(start_color=color, end_color=color, fill_type="solid")
cell.fill = fill
# Adjust the column width
max_length = max(len(str(val)) for val in [title, *lst])
adjusted_width = (max_length + 2)
sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width
if "Sheet" in workbook.sheetnames:
workbook.remove(workbook["Sheet"])
workbook.save('Log.xlsx')
def column_letter(index):
"""Convert a column index into a column letter"""
letters = ""
while index > 0:
index, remainder = divmod(index - 1, 26)
letters = chr(65 + remainder) + letters
return letters
warnings = []
def is_found(c,text):
global warnings
if c[-1] == None:
warnings.append(text+" was not found")
def get_alts(atype):
if atype == 'claim':
return ['lob','accident_quarter_bracket','transaction_quarter_bracket','paid_amount','gross_recoveries_settled','os_amount','gross_os_recoveries','claim_count']
return ['lob','quarter_bracket','gross_premium_earned','ERP']
def filter_claims(df):
print("Sum of Null beginning: ",df.isnull().sum())
print("Sum of Null beginning 2: ",(df == '').sum())
print(df.dtypes)
filled_warn = []
global warnings
warnings = []
columns = []
# Find lob
columns.append(get_lob(df))
is_found(columns,"lob")
if None in columns:
return None,None
# Find quarters
sublist = quarters(df)
print("\n"*10,sublist,"\n"*10)
columns.extend(sublist)
# min_col = min(sublist, key=lambda col: df.dropna()[col].sum())
# max_col = max(sublist, key=lambda col: df.dropna()[col].sum())
min_col = df[sublist].sum().idxmin()
max_col = [col for col in sublist if col != min_col][0]
df,temp = drop_missing_rows(df,columns)
print('missing: ',df[df.columns[1]].isnull().sum())
#df.to_csv("gayassshit.csv")
#temp.to_csv("gayassshit1.csv")
#df.to_csv("before_filling.csv")
#print("\n"*10,columns[0],min_col,max_col,"\n"*10)
df, filled_warn = fill_missing_quarters(df,columns[0],min_col,max_col)
#df.to_csv("after_filling.csv")
#print(columns[0],min_col,max_col)
#temp = fill_missing_quarters(temp,columns[0],min_col,max_col)
df = col_to_ints(df,sublist)
#df = df[[min_col, max_col] + [col for col in df.columns if col not in [min_col, max_col]]]
#display(df)
min_col_index = columns.index(min_col) # Find the index of min_col
max_col_index = columns.index(max_col) # Find the index of max_col
# Rearrange the columns list
if min_col_index > max_col_index:
columns.insert(max_col_index, columns.pop(min_col_index))
is_found(columns,"quarters")
# Find paid amount
columns.append(get_paid_amount(df))
is_found(columns,"paid amount")
# Find gross recoveries
columns.append(get_gross_recoveries(df))
is_found(columns,"gross recoveries")
# Find gross os
columns.append(get_gross_os(df))
is_found(columns,"gross os")
# Find recover os
columns.append(get_recover_os(df))
is_found(columns,"recover os")
# Find claims count
columns.append(get_claim_count(df))
is_found(columns,"claim count")
# Warn
for i,w in enumerate(warnings):
print(str(i+1)+'-',w)
#df = pd.concat([df, temp], ignore_index=True)
df = df.replace('nan',0)
df = df.fillna({col: 0 for col in df.columns if col not in sublist})
return df,columns,temp,filled_warn
def filter_premiums(df):
global warnings
warnings = []
columns = []
filled_warn = []
# Find lob
columns.append(get_lob(df))
is_found(columns,"lob")
if None in columns:
return None,None
# Find quarter bracket
columns.append(get_quarter_bracket(df))
df,filled_warn = fill_missing_quarters(df,columns[0],columns[-1],columns[-1])
is_found(columns,"quarter")
# Find premium earned
columns.append(get_earned(df))
is_found(columns,"premium earned")
# Find ERP
columns.append(get_erp(df))
is_found(columns,"ERP")
# Warn
for i,w in enumerate(warnings):
print(str(i+1)+'-',w)
return df,columns,filled_warn
css_code='body{background-image:url("https://picsum.photos/seed/picsum/200/300");}'
# def unzip_files(zip_file_path):
# file_extension = os.path.splitext(zip_file_path)[1]
# if file_extension == '.zip':
# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
# file_list = zip_ref.namelist()
# csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))]
# return csv_excel_files
# else:
# return [zip_file_path]
def unzip_files(zip_file_path):
file_extension = os.path.splitext(zip_file_path)[1]
if file_extension == '.zip':
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))]
extracted_files = []
for file in csv_excel_files:
zip_ref.extract(file)
extracted_files.append(file)
return extracted_files
else:
return [zip_file_path]
def zip_files(file_paths):
current_date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
new_file_name = f"processed_files_{current_date}.zip"
with zipfile.ZipFile(new_file_name, 'w') as zipf:
for file_path in file_paths:
file_name = file_path.split('/')[-1]
zipf.write(file_path, file_name)
print(f"{len(file_paths)} files compressed and saved as '{new_file_name}'.")
return new_file_name
def valid(text):
file_extensions = [".zip", ".xlsx", ".csv"]
pattern = r"\b({})\b".format("|".join(map(re.escape, file_extensions)))
match = re.search(pattern, text, flags=re.IGNORECASE)
return bool(match)
def op_outcome(name,msg):
name = os.path.basename(name)
return name+msg
def process(files,button):
global warnings
fail = ' ❌\n'
passe = ' ✔️\n'
warn = ' ⚠️\n'
status = []
cleaned_names = []
if files is None:
msg = 'No file provided'+fail
return None, msg
names = unzip_files(files.name)
sheet_data = dict()
for name in names:
#name = os.path.basename(name)
if valid(name):
# return zip_files([files.name]),'Success'+passe
temp = None
columns = []
filled_warn = []
replacens = dict()
print("Processing:", name)
try:
df = pd.read_csv(name)
except:
df = pd.read_excel(name)
old_cols = df.columns
old_olds = list(old_cols)
sums_old = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in old_cols]
print("Before columns")
print(old_olds)
if "summ" in name:
print("Summary:")
df,columns,filled_warn = filter_premiums(df)
if columns == None:
print(name,'has no LOB column')
print("--"*50)
status.append(op_outcome(name,' has no LOB column'+fail))
continue
altnames = get_alts('summ')
else:
print("Claims:")
df,columns,temp,filled_warn = filter_claims(df)
if columns == None:
print(name,'has no LOB column')
print("--"*50)
status.append(op_outcome(name,' has no LOB column'+fail))
continue
altnames = get_alts('claim')
finalnames = []
for ind,col in enumerate(columns):
if col is not None:
finalnames.append(columns[ind]+" ("+altnames[ind]+")")
columns = [x for x in columns if x is not None]
print("After columns")
print(columns)
df, msg = map_names(df,name)
df = df[columns]
print("temp",temp)
if isinstance(temp,pd.DataFrame):
temp, _ = map_names(temp,name)
temp = temp[columns]
temp = temp[temp.iloc[:, 3:].sum(axis=1) != 0]
df = pd.concat([df, temp], ignore_index=True)
column_mapping = dict(zip(columns, finalnames))
df = df.rename(columns=column_mapping)
# sum new
ncols = df.columns
sums_new = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in ncols]
#display(df)
name = os.path.basename(name)
#print(columns)
#print(warnings)
sheetwarnings = [['No warnings'],'00FF00']
if len(warnings) > 0:
sheetwarnings = [warnings,'FFA500']
filled_warn.pop(-1)
if len(filled_warn) == 0:
filled_warn = ['No fillings']
# else:
# # tempt_list = [element for element in filled_warn[-2][0] if element in columns]
# # filled_warn[-2] = "Padded columns "+str(list(tempt_list))+" with total of "+str(round(filled_warn[-2][1],3))+" each"
# pass
# fillings_amounts = filled_warn[-1][1]
sheet_data[name] = {
"Before columns": [old_olds],
'Sum Before':[sums_old,None,True],
"After columns": [ncols, '00FF00'],
'Sum After':[sums_new,None,True],
#'Filling amount':[fillings_amounts,None,True],
'Fillings':[filled_warn,None],
"Warnings": sheetwarnings
}
c_name = name.split('.')[0]+'_cleaned.csv'
df.to_csv(c_name,index=False)
cleaned_names.append(c_name)
formatted_warnings = ''
if len(warnings) > 0:
formatted_warnings = '📝:\n'+'\n'.join(warnings)
if msg == None:
status.append(op_outcome(name,' was processed'+passe+formatted_warnings))
else:
status.append(op_outcome(name,msg+warn+formatted_warnings))
else:
name = os.path.basename(name)
status.append(op_outcome(name,' Failed (Only .csv, .xlsx, .zip are allowed)'+fail))
if len(cleaned_names) > 0:
write_log(sheet_data)
cleaned_names.append('Log.xlsx')
final_file = zip_files(cleaned_names)
else:
final_file = None
msg = '\n'.join(f"{index + 1}.{value}" for index, value in enumerate(status))
return gr.File.update(value=final_file,visible=True),msg
#return(str(files)+'fole')
with gr.Blocks(css=elems) as demo:
gr.Markdown(
"""
<style>
.inline-container {
display: flex;
align-items: center;
}
.zip-line {
margin-top: 20px;
position: relative;
}
.zip-line img {
position: absolute;
top: 0;
left: 0;
}
</style>
<div class="inline-container">
<img src="https://mustafasa.com/uploads/excel_sheet.png" alt="Excel Sheet" width="50px">
<h1>Upload a singular xlsx/csv file to clean</h1>
</div>
<div class="inline-container zip-line">
<img src="https://mustafasa.com/uploads/zip_icon.png" alt="Zip Icon" width="50px">
<img src="https://mustafasa.com/uploads/excel_sheet.png" alt="Excel Sheet" width="20px">
<h1 style="margin-left: 50px;">Or upload multiple compressed into a zip file</h1>
</div>
"""
)
with gr.Row():
inp = gr.File(label='Input file/s')
with gr.Row():
bt = gr.Button(value='🧹 Clean',elem_id='button')
#bt1 = gr.Button(value='Restart')
for _ in range(2):
with gr.Row():
pass
with gr.Row():
out = gr.File(label='Cleaned files',visible=False)
with gr.Row():
log = gr.Textbox(label='Process log 📄',visible=True)
bt.click(fn = process, inputs=[inp,bt], outputs=[out,log])
demo.launch(debug=True) |