Data_Cleaning / app.py
Waqasjan123's picture
Update app.py
aa07d57 verified
import pandas as pd
import numpy as np
import xlsxwriter
import gradio as gr
from io import BytesIO
import zipfile
import os
import tempfile
def process_dataframe(df):
# Sort the DataFrame by 'Year'
df = df.sort_values('Year').reset_index(drop=True)
# Initialize dictionaries to keep track of Director IDs and Tenure
director_id_dict = {}
tenure_dict = {}
# Initialize a set to keep track of directors from the previous year
prev_year_directors = set()
year_list = sorted(df['Year'].unique())
# Dictionary to collect new directors for formatting
new_directors_per_year = {}
current_id = 1
# DataFrame to store the processed data
processed_data = pd.DataFrame()
for year in year_list:
# Get data for the current year
current_year_data = df[df['Year'] == year]
current_year_directors = current_year_data['Name'].tolist()
data_rows = []
for idx, row in current_year_data.iterrows():
director = row['Name']
# Check if director has been assigned an ID
if director not in director_id_dict:
# Assign new Director ID
director_id_dict[director] = current_id
current_id += 1
# Director is new overall, so tenure starts at 1
tenure_dict[director] = 1
# Mark director as new for formatting, except for the lowest year
if year != year_list[0]:
new_directors_per_year.setdefault(year, []).append(director)
else:
# If director was present in the previous year, increment tenure
if director in prev_year_directors:
tenure_dict[director] += 1
else:
# Director is returning after absence, reset tenure
tenure_dict[director] = 1
# Treat director as new for this year, except if it's the lowest year
if year != year_list[0]:
new_directors_per_year.setdefault(year, []).append(director)
# Prepare the row data, inserting 'Director ID' and 'Tenure' next to 'Name'
row_data = row.to_dict()
# Insert 'Director ID' and 'Tenure' after 'Name'
cols_before_name = list(df.columns[:df.columns.get_loc('Name') + 1])
cols_after_name = list(df.columns[df.columns.get_loc('Name') + 1:])
# Create Ordered Dict to maintain column order
ordered_cols = cols_before_name + ['Director ID', 'Tenure'] + cols_after_name
row_data['Director ID'] = director_id_dict[director]
row_data['Tenure'] = tenure_dict[director]
row_data = {col: row_data[col] for col in ordered_cols}
data_rows.append(row_data)
# Update the processed_data DataFrame using pd.concat()
processed_data = pd.concat([processed_data, pd.DataFrame(data_rows)], ignore_index=True)
# Update the set of directors from the previous year
prev_year_directors = set(current_year_directors)
return processed_data, new_directors_per_year, year_list
def process_file(file_obj):
try:
# Get the original filename without extension
original_filename = os.path.splitext(os.path.basename(file_obj.name))[0]
file_ext = os.path.splitext(file_obj.name)[1].lower()
# Read file content into DataFrame
if file_ext == '.csv':
df = pd.read_csv(file_obj)
elif file_ext in ['.xls', '.xlsx']:
df = pd.read_excel(file_obj)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
# Process the DataFrame
processed_data, new_directors_per_year, year_list = process_dataframe(df)
# Handle NaN/Inf values before writing to Excel
processed_data = processed_data.replace([np.inf, -np.inf], np.nan)
processed_data = processed_data.fillna('')
# Save the processed data to a BytesIO stream using XlsxWriter
output = BytesIO()
writer = pd.ExcelWriter(output, engine='xlsxwriter')
processed_data.to_excel(writer, index=False, sheet_name='Directors')
# Get the xlsxwriter workbook and worksheet objects
workbook = writer.book
worksheet = writer.sheets['Directors']
# Define a format for blue text
blue_font = workbook.add_format({'font_color': 'blue'})
# Apply formatting
for idx, row in processed_data.iterrows():
year = row['Year']
director = row['Name']
# Check if the director is new in that year (excluding the lowest year)
if year != year_list[0] and director in new_directors_per_year.get(year, []):
# Apply blue font to the 'Name' cell
worksheet.write(idx + 1, processed_data.columns.get_loc('Name'), director, blue_font)
else:
# Write the 'Name' cell without formatting
worksheet.write(idx + 1, processed_data.columns.get_loc('Name'), director)
# Write other cells
for col_num, value in enumerate(row):
column_name = processed_data.columns[col_num]
if column_name != 'Name':
# Replace NaNs or Infs with empty strings
if pd.isnull(value) or value in [np.inf, -np.inf]:
value = ''
worksheet.write(idx + 1, col_num, value)
# Close the writer and save the Excel file to BytesIO
writer.close()
output.seek(0)
# Write the BytesIO content to a temporary file and return the file path
# Use the original filename with .xlsx extension
temp_dir = tempfile.gettempdir()
processed_filename = f"{original_filename}.xlsx"
temp_file_path = os.path.join(temp_dir, processed_filename)
with open(temp_file_path, 'wb') as f:
f.write(output.getvalue())
return temp_file_path
except Exception as e:
# Raise exception to be caught in process_files
raise Exception(f"Error processing file {file_obj.name}: {str(e)}")
def process_files(file_objs):
if not file_objs:
return None, "No files were uploaded."
output_files = []
error_messages = []
for file_obj in file_objs:
try:
processed_file_path = process_file(file_obj)
# Keep track of the original filename and processed file path
output_files.append((os.path.basename(processed_file_path), processed_file_path))
except Exception as e:
error_messages.append(str(e))
if output_files:
if len(output_files) == 1:
# If only one file, return its path directly
filename, file_path = output_files[0]
return file_path, "\n".join(error_messages)
else:
# If multiple files, create a zip file
num_files = len(output_files)
zip_filename = f"{num_files} Directors report.zip"
temp_dir = tempfile.gettempdir()
final_zip_path = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(final_zip_path, 'w') as zip_file:
for filename, file_path in output_files:
# Add the file to the zip archive with the original filename
zip_file.write(file_path, arcname=filename)
return final_zip_path, "\n".join(error_messages)
else:
# No files processed successfully
return None, "\n".join(error_messages)
def gradio_interface(file_objs):
files_output, error_messages = process_files(file_objs)
return files_output, error_messages
iface = gr.Interface(
fn=gradio_interface,
inputs=gr.File(label="Upload CSV or Excel files", file_count="multiple"),
outputs=[
gr.File(label="Download Processed File(s)"),
gr.Textbox(label="Error Messages"),
],
title="Director Data Processor",
description="""
Upload one or more CSV/XLSX files to process director data.
The processed files will be available for download.
If multiple files are uploaded, they will be zipped together.
"""
)
# Launch the interface with share=True to create a public link
iface.launch(share=True)