Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import xlsxwriter | |
| import gradio as gr | |
| from io import BytesIO | |
| import zipfile | |
| import os | |
| import tempfile | |
| def process_dataframe(df): | |
| # Sort the DataFrame by 'Year' | |
| df = df.sort_values('Year').reset_index(drop=True) | |
| # Initialize dictionaries to keep track of Director IDs and Tenure | |
| director_id_dict = {} | |
| tenure_dict = {} | |
| # Initialize a set to keep track of directors from the previous year | |
| prev_year_directors = set() | |
| year_list = sorted(df['Year'].unique()) | |
| # Dictionary to collect new directors for formatting | |
| new_directors_per_year = {} | |
| current_id = 1 | |
| # DataFrame to store the processed data | |
| processed_data = pd.DataFrame() | |
| for year in year_list: | |
| # Get data for the current year | |
| current_year_data = df[df['Year'] == year] | |
| current_year_directors = current_year_data['Name'].tolist() | |
| data_rows = [] | |
| for idx, row in current_year_data.iterrows(): | |
| director = row['Name'] | |
| # Check if director has been assigned an ID | |
| if director not in director_id_dict: | |
| # Assign new Director ID | |
| director_id_dict[director] = current_id | |
| current_id += 1 | |
| # Director is new overall, so tenure starts at 1 | |
| tenure_dict[director] = 1 | |
| # Mark director as new for formatting, except for the lowest year | |
| if year != year_list[0]: | |
| new_directors_per_year.setdefault(year, []).append(director) | |
| else: | |
| # If director was present in the previous year, increment tenure | |
| if director in prev_year_directors: | |
| tenure_dict[director] += 1 | |
| else: | |
| # Director is returning after absence, reset tenure | |
| tenure_dict[director] = 1 | |
| # Treat director as new for this year, except if it's the lowest year | |
| if year != year_list[0]: | |
| new_directors_per_year.setdefault(year, []).append(director) | |
| # Prepare the row data, inserting 'Director ID' and 'Tenure' next to 'Name' | |
| row_data = row.to_dict() | |
| # Insert 'Director ID' and 'Tenure' after 'Name' | |
| cols_before_name = list(df.columns[:df.columns.get_loc('Name') + 1]) | |
| cols_after_name = list(df.columns[df.columns.get_loc('Name') + 1:]) | |
| # Create Ordered Dict to maintain column order | |
| ordered_cols = cols_before_name + ['Director ID', 'Tenure'] + cols_after_name | |
| row_data['Director ID'] = director_id_dict[director] | |
| row_data['Tenure'] = tenure_dict[director] | |
| row_data = {col: row_data[col] for col in ordered_cols} | |
| data_rows.append(row_data) | |
| # Update the processed_data DataFrame using pd.concat() | |
| processed_data = pd.concat([processed_data, pd.DataFrame(data_rows)], ignore_index=True) | |
| # Update the set of directors from the previous year | |
| prev_year_directors = set(current_year_directors) | |
| return processed_data, new_directors_per_year, year_list | |
| def process_file(file_obj): | |
| try: | |
| # Get the original filename without extension | |
| original_filename = os.path.splitext(os.path.basename(file_obj.name))[0] | |
| file_ext = os.path.splitext(file_obj.name)[1].lower() | |
| # Read file content into DataFrame | |
| if file_ext == '.csv': | |
| df = pd.read_csv(file_obj) | |
| elif file_ext in ['.xls', '.xlsx']: | |
| df = pd.read_excel(file_obj) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_ext}") | |
| # Process the DataFrame | |
| processed_data, new_directors_per_year, year_list = process_dataframe(df) | |
| # Handle NaN/Inf values before writing to Excel | |
| processed_data = processed_data.replace([np.inf, -np.inf], np.nan) | |
| processed_data = processed_data.fillna('') | |
| # Save the processed data to a BytesIO stream using XlsxWriter | |
| output = BytesIO() | |
| writer = pd.ExcelWriter(output, engine='xlsxwriter') | |
| processed_data.to_excel(writer, index=False, sheet_name='Directors') | |
| # Get the xlsxwriter workbook and worksheet objects | |
| workbook = writer.book | |
| worksheet = writer.sheets['Directors'] | |
| # Define a format for blue text | |
| blue_font = workbook.add_format({'font_color': 'blue'}) | |
| # Apply formatting | |
| for idx, row in processed_data.iterrows(): | |
| year = row['Year'] | |
| director = row['Name'] | |
| # Check if the director is new in that year (excluding the lowest year) | |
| if year != year_list[0] and director in new_directors_per_year.get(year, []): | |
| # Apply blue font to the 'Name' cell | |
| worksheet.write(idx + 1, processed_data.columns.get_loc('Name'), director, blue_font) | |
| else: | |
| # Write the 'Name' cell without formatting | |
| worksheet.write(idx + 1, processed_data.columns.get_loc('Name'), director) | |
| # Write other cells | |
| for col_num, value in enumerate(row): | |
| column_name = processed_data.columns[col_num] | |
| if column_name != 'Name': | |
| # Replace NaNs or Infs with empty strings | |
| if pd.isnull(value) or value in [np.inf, -np.inf]: | |
| value = '' | |
| worksheet.write(idx + 1, col_num, value) | |
| # Close the writer and save the Excel file to BytesIO | |
| writer.close() | |
| output.seek(0) | |
| # Write the BytesIO content to a temporary file and return the file path | |
| # Use the original filename with .xlsx extension | |
| temp_dir = tempfile.gettempdir() | |
| processed_filename = f"{original_filename}.xlsx" | |
| temp_file_path = os.path.join(temp_dir, processed_filename) | |
| with open(temp_file_path, 'wb') as f: | |
| f.write(output.getvalue()) | |
| return temp_file_path | |
| except Exception as e: | |
| # Raise exception to be caught in process_files | |
| raise Exception(f"Error processing file {file_obj.name}: {str(e)}") | |
| def process_files(file_objs): | |
| if not file_objs: | |
| return None, "No files were uploaded." | |
| output_files = [] | |
| error_messages = [] | |
| for file_obj in file_objs: | |
| try: | |
| processed_file_path = process_file(file_obj) | |
| # Keep track of the original filename and processed file path | |
| output_files.append((os.path.basename(processed_file_path), processed_file_path)) | |
| except Exception as e: | |
| error_messages.append(str(e)) | |
| if output_files: | |
| if len(output_files) == 1: | |
| # If only one file, return its path directly | |
| filename, file_path = output_files[0] | |
| return file_path, "\n".join(error_messages) | |
| else: | |
| # If multiple files, create a zip file | |
| num_files = len(output_files) | |
| zip_filename = f"{num_files} Directors report.zip" | |
| temp_dir = tempfile.gettempdir() | |
| final_zip_path = os.path.join(temp_dir, zip_filename) | |
| with zipfile.ZipFile(final_zip_path, 'w') as zip_file: | |
| for filename, file_path in output_files: | |
| # Add the file to the zip archive with the original filename | |
| zip_file.write(file_path, arcname=filename) | |
| return final_zip_path, "\n".join(error_messages) | |
| else: | |
| # No files processed successfully | |
| return None, "\n".join(error_messages) | |
| def gradio_interface(file_objs): | |
| files_output, error_messages = process_files(file_objs) | |
| return files_output, error_messages | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.File(label="Upload CSV or Excel files", file_count="multiple"), | |
| outputs=[ | |
| gr.File(label="Download Processed File(s)"), | |
| gr.Textbox(label="Error Messages"), | |
| ], | |
| title="Director Data Processor", | |
| description=""" | |
| Upload one or more CSV/XLSX files to process director data. | |
| The processed files will be available for download. | |
| If multiple files are uploaded, they will be zipped together. | |
| """ | |
| ) | |
| # Launch the interface with share=True to create a public link | |
| iface.launch(share=True) |