Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import openpyxl | |
| import csv | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import pandas as pd | |
| import re | |
| # Load the sentence transformer model | |
| model = SentenceTransformer('BAAI/bge-small-en-v1.5') | |
| def filter_excel1(excel_path, min_row, max_row): | |
| try: | |
| excel = openpyxl.load_workbook(excel_path) | |
| sheet_0 = excel.worksheets[0] | |
| data = [["category", "diagnostic_statement"]] | |
| prev_category = "" | |
| for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): | |
| category = row[1].value | |
| diagnostic_statement = row[5].value | |
| if prev_category == "": | |
| prev_category = category | |
| if not category: | |
| category = prev_category | |
| else: | |
| prev_category = category | |
| data.append([category, diagnostic_statement]) | |
| return data | |
| except Exception as e: | |
| raise gr.Error(f"Error processing Excel 1: {str(e)}") | |
| def filter_excel2(excel_path, min_row, max_row, sheetname): | |
| try: | |
| excel = openpyxl.load_workbook(excel_path) | |
| sheet_0 = excel[sheetname] | |
| data = [["description", "category"]] | |
| for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): | |
| description = row[0].value | |
| category = row[6].value | |
| # filtering out the categories | |
| if isinstance(category, str) and category!="#N/A": | |
| pass | |
| elif isinstance(category, int): | |
| category="#N/A" | |
| else: | |
| category="#N/A" | |
| if description: | |
| data.append([description, category]) | |
| return data | |
| except Exception as e: | |
| raise gr.Error(f"Error processing Excel 2: {str(e)}") | |
| def sheet_lookup(current_sheet_name, excel_file_path): | |
| # Read the Excel file | |
| xl = pd.ExcelFile(excel_file_path) | |
| # Determine the previous quarter sheet name | |
| match = re.match(r'(\d)Q(\d{4})', current_sheet_name) | |
| if match: | |
| quarter, year = map(int, match.groups()) | |
| prev_quarter = 4 if quarter == 1 else quarter - 1 | |
| prev_year = year - 1 if quarter == 1 else year | |
| prev_sheet_name = f"{prev_quarter}Q{prev_year}" | |
| else: | |
| raise ValueError("Invalid sheet name format") | |
| # Read the current sheet | |
| current_df = xl.parse(current_sheet_name) | |
| # Check if previous sheet exists | |
| if prev_sheet_name in xl.sheet_names: | |
| # Read the previous quarter sheet | |
| prev_df = xl.parse(prev_sheet_name) | |
| # Perform the lookup | |
| lookup_col = 'Monitoring Tool Instance ID-AU' | |
| current_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) | |
| prev_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) | |
| value_col = f"{prev_quarter}q CRI Profile Mapping" | |
| result_col = f"{quarter}q CRI Profile Mapping" | |
| # Create a dictionary for faster lookup | |
| lookup_dict = dict(zip(prev_df[lookup_col], prev_df[value_col])) | |
| # Perform the lookup and fill the result column | |
| current_df[result_col] = current_df[lookup_col].map(lookup_dict).fillna('#N/A') | |
| else: | |
| # If previous sheet doesn't exist, fill the result column with '#N/A' | |
| result_col = f"{quarter}q CRI Profile Mapping" | |
| current_df[result_col] = '#N/A' | |
| print(f"Warning: Previous sheet {prev_sheet_name} not found. Filling {result_col} with '#N/A'") | |
| # Save the results back to the Excel file | |
| with pd.ExcelWriter(excel_file_path, mode='a', if_sheet_exists='replace') as writer: | |
| current_df.to_excel(writer, sheet_name=current_sheet_name, index=False) | |
| print(f"Processing complete for sheet {current_sheet_name}") | |
| def get_embeddings(texts): | |
| return model.encode(texts) | |
| def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3): | |
| similarities = cosine_similarity([query_embedding], statement_embeddings)[0] | |
| top_indices = np.argsort(similarities)[-n:][::-1] | |
| return [categories[i] for i in top_indices] | |
| def process_data(csv1_data, csv2_data): | |
| try: | |
| diagnostic_statements = [row[1] for row in csv1_data[1:]] | |
| statement_embeddings = get_embeddings(diagnostic_statements) | |
| categories = [row[0] for row in csv1_data[1:]] | |
| processed_descriptions = [] | |
| processed_categories = [] | |
| for row in csv2_data[1:]: | |
| description = row[0] | |
| if description in processed_descriptions: | |
| row[1] = processed_categories[processed_descriptions.index(description)] | |
| continue | |
| if row[1] != "#N/A": | |
| processed_categories.append(row[1]) | |
| processed_descriptions.append(description) | |
| continue | |
| description_embedding = get_embeddings([description])[0] | |
| top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories) | |
| row[1] = ', '.join(top_categories) | |
| processed_descriptions.append(description) | |
| processed_categories.append(', '.join(top_categories)) | |
| return csv2_data | |
| except Exception as e: | |
| raise gr.Error(f"Error processing data: {str(e)}") | |
| def update_excel(excel_path, processed_data, sheetname): | |
| try: | |
| excel = openpyxl.load_workbook(excel_path) | |
| sheet_0 = excel[sheetname] | |
| idx = 0 | |
| for row in sheet_0.iter_rows(min_row=2): | |
| description = row[0] | |
| category = row[6] | |
| if not description.value: | |
| continue | |
| try: | |
| sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1]) | |
| idx += 1 | |
| except IndexError: | |
| print(f"Warning: Not enough processed data for row {category.row}") | |
| return excel | |
| except Exception as e: | |
| raise gr.Error(f"Error updating Excel: {str(e)}") | |
| def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname): | |
| try: | |
| gr.Info("Starting processing...") | |
| gr.Info("Doing lookup...") | |
| sheet_lookup(sheetname, excel2) | |
| # Process Excel 1 | |
| gr.Info("Processing Excel 1...") | |
| csv1_data = filter_excel1(excel1, min_row1, max_row1) | |
| # Process Excel 2 | |
| gr.Info("Processing Excel 2...") | |
| csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname) | |
| # Process data | |
| gr.Info("Running similarity search...") | |
| processed_data = process_data(csv1_data, csv2_data) | |
| # Update Excel 2 | |
| gr.Info("Updating Excel file...") | |
| updated_excel = update_excel(excel2, processed_data[1:], sheetname) | |
| # Save the updated Excel file | |
| gr.Info("Saving updated Excel file...") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
| updated_excel.save(tmp.name) | |
| gr.Info("Processing complete!") | |
| return tmp.name | |
| except gr.Error as e: | |
| # Re-raise Gradio errors to display them in the interface | |
| raise e | |
| except Exception as e: | |
| # Catch any other unexpected errors | |
| raise gr.Error(f"An unexpected error occurred: {str(e)}") | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=process_files, | |
| inputs=[ | |
| gr.File(label="Upload Source Excel (Excel 1)"), | |
| gr.File(label="Upload Excel to be Filled (Excel 2)"), | |
| gr.Number(label="Min Row for Excel 1", value=2), | |
| gr.Number(label="Max Row for Excel 1", value=1000), | |
| gr.Number(label="Min Row for Excel 2", value=2), | |
| gr.Number(label="Max Row for Excel 2", value=3009), | |
| gr.Textbox(label="Sheet Name for Excel 2") | |
| ], | |
| outputs=gr.File(label="Download Updated Excel"), | |
| title="Excel Processor", | |
| description="Upload two Excel files, specify row ranges, and download the processed Excel file." | |
| ) | |
| iface.launch() |