Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| import pandas as pd | |
| from gradio import Progress | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from tools.config import ( | |
| DO_INITIAL_TABULAR_DATA_CLEAN, | |
| MAX_SIMULTANEOUS_FILES, | |
| MAX_TABLE_ROWS, | |
| REMOVE_DUPLICATE_ROWS, | |
| ) | |
| from tools.data_anonymise import initial_clean | |
| from tools.helper_functions import OUTPUT_FOLDER, read_file | |
| from tools.load_spacy_model_custom_recognisers import nlp | |
| from tools.secure_path_utils import secure_join | |
| def clean_and_stem_text_series( | |
| df: pd.DataFrame, | |
| column: str, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| ): | |
| """ | |
| Clean and stem text columns in a data frame for tabular data | |
| """ | |
| # Function to apply lemmatisation and remove stopwords | |
| def _apply_lemmatization(text): | |
| doc = nlp(text) | |
| # Keep only alphabetic tokens and remove stopwords | |
| lemmatized_words = [ | |
| token.lemma_ for token in doc if token.is_alpha and not token.is_stop | |
| ] | |
| return " ".join(lemmatized_words) | |
| # Always create text_clean column first | |
| if do_initial_clean_dup: | |
| df["text_clean"] = initial_clean(df[column]) | |
| else: | |
| df["text_clean"] = df[column] | |
| df["text_clean"] = df["text_clean"].apply(_apply_lemmatization) | |
| df["text_clean"] = df[ | |
| "text_clean" | |
| ].str.lower() # .str.replace(r'[^\w\s]', '', regex=True) | |
| return df | |
| def convert_tabular_data_to_analysis_format( | |
| df: pd.DataFrame, file_name: str, text_columns: List[str] = None | |
| ) -> List[Tuple[str, pd.DataFrame]]: | |
| """ | |
| Convert tabular data (CSV/XLSX) to the format needed for duplicate analysis. | |
| Args: | |
| df (pd.DataFrame): The input DataFrame | |
| file_name (str): Name of the file | |
| text_columns (List[str], optional): Columns to analyze for duplicates. | |
| If None, uses all string columns. | |
| Returns: | |
| List[Tuple[str, pd.DataFrame]]: List containing (file_name, processed_df) tuple | |
| """ | |
| # if text_columns is None: | |
| # # Auto-detect text columns (string type columns) | |
| # print(f"No text columns given for {file_name}") | |
| # return [] | |
| # text_columns = df.select_dtypes(include=['object', 'string']).columns.tolist() | |
| text_columns = [col for col in text_columns if col in df.columns] | |
| if not text_columns: | |
| print(f"No text columns found in {file_name}") | |
| return list() | |
| # Create a copy to avoid modifying original | |
| df_copy = df.copy() | |
| # Create a combined text column from all text columns | |
| df_copy["combined_text"] = ( | |
| df_copy[text_columns].fillna("").astype(str).agg(" ".join, axis=1) | |
| ) | |
| # Add row identifier | |
| df_copy["row_id"] = df_copy.index | |
| # Create the format expected by the duplicate detection system | |
| # Using 'row_number' as row number and 'text' as the combined text | |
| processed_df = pd.DataFrame( | |
| { | |
| "row_number": df_copy["row_id"], | |
| "text": df_copy["combined_text"], | |
| "file": file_name, | |
| } | |
| ) | |
| # Add original row data for reference | |
| for col in text_columns: | |
| processed_df[f"original_{col}"] = df_copy[col] | |
| return [(file_name, processed_df)] | |
| def find_duplicate_cells_in_tabular_data( | |
| input_files: List[str], | |
| similarity_threshold: float = 0.95, | |
| min_word_count: int = 3, | |
| text_columns: List[str] = [], | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: str = "", | |
| progress: Progress = Progress(track_tqdm=True), | |
| ) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]: | |
| """ | |
| Find duplicate cells/text in tabular data files (CSV, XLSX, Parquet). | |
| Args: | |
| input_files (List[str]): List of file paths to analyze | |
| similarity_threshold (float): Minimum similarity score to consider duplicates | |
| min_word_count (int): Minimum word count for text to be considered | |
| text_columns (List[str], optional): Specific columns to analyze | |
| output_folder (str, optional): Output folder for results | |
| do_initial_clean_dup (bool, optional): Whether to do initial clean of text | |
| progress (Progress): Progress tracking object | |
| Returns: | |
| Tuple containing: | |
| - results_df: DataFrame with duplicate matches | |
| - output_paths: List of output file paths | |
| - full_data_by_file: Dictionary of processed data by file | |
| """ | |
| if not input_files: | |
| raise gr.Error("Please upload files to analyze.") | |
| progress(0.1, desc="Loading and processing files...") | |
| all_data_to_process = list() | |
| full_data_by_file = dict() | |
| file_paths = list() | |
| # Process each file | |
| for file_path in input_files: | |
| try: | |
| if file_path.endswith(".xlsx") or file_path.endswith(".xls"): | |
| temp_df = pd.DataFrame() | |
| # Try finding each sheet in the given list until a match is found | |
| for sheet_name in in_excel_tabular_sheets: | |
| temp_df = read_file(file_path, excel_sheet_name=sheet_name) | |
| # If sheet was successfully_loaded | |
| if not temp_df.empty: | |
| if temp_df.shape[0] > MAX_TABLE_ROWS: | |
| out_message = f"Number of rows in {file_path} for sheet {sheet_name} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file." | |
| print(out_message) | |
| raise Exception(out_message) | |
| file_name = os.path.basename(file_path) + "_" + sheet_name | |
| file_paths.append(file_path) | |
| # Convert to analysis format | |
| processed_data = convert_tabular_data_to_analysis_format( | |
| temp_df, file_name, text_columns | |
| ) | |
| if processed_data: | |
| all_data_to_process.extend(processed_data) | |
| full_data_by_file[file_name] = processed_data[0][1] | |
| temp_df = pd.DataFrame() | |
| else: | |
| temp_df = read_file(file_path) | |
| if temp_df.shape[0] > MAX_TABLE_ROWS: | |
| out_message = f"Number of rows in {file_path} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file." | |
| print(out_message) | |
| raise Exception(out_message) | |
| file_name = os.path.basename(file_path) | |
| file_paths.append(file_path) | |
| # Convert to analysis format | |
| processed_data = convert_tabular_data_to_analysis_format( | |
| temp_df, file_name, text_columns | |
| ) | |
| if processed_data: | |
| all_data_to_process.extend(processed_data) | |
| full_data_by_file[file_name] = processed_data[0][1] | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {e}") | |
| continue | |
| if not all_data_to_process: | |
| raise gr.Error("No valid data found in uploaded files.") | |
| progress(0.2, desc="Combining data...") | |
| # Combine all data | |
| combined_df = pd.concat( | |
| [data[1] for data in all_data_to_process], ignore_index=True | |
| ) | |
| combined_df = combined_df.drop_duplicates(subset=["row_number", "file"]) | |
| progress(0.3, desc="Cleaning and preparing text...") | |
| # Clean and prepare text | |
| combined_df = clean_and_stem_text_series( | |
| combined_df, "text", do_initial_clean_dup=do_initial_clean_dup | |
| ) | |
| # Filter by minimum word count | |
| combined_df["word_count"] = ( | |
| combined_df["text_clean"].str.split().str.len().fillna(0) | |
| ) | |
| combined_df = combined_df[combined_df["word_count"] >= min_word_count].copy() | |
| if len(combined_df) < 2: | |
| return pd.DataFrame(), [], full_data_by_file | |
| progress(0.4, desc="Calculating similarities...") | |
| # Calculate similarities | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(combined_df["text_clean"]) | |
| similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False) | |
| # Find similar pairs | |
| coo_matrix = similarity_matrix.tocoo() | |
| similar_pairs = [ | |
| (r, c, v) | |
| for r, c, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data) | |
| if r < c and v >= similarity_threshold | |
| ] | |
| if not similar_pairs: | |
| gr.Info("No duplicate cells found.") | |
| return pd.DataFrame(), [], full_data_by_file | |
| progress(0.7, desc="Processing results...") | |
| # Create results DataFrame | |
| results_data = [] | |
| for row1, row2, similarity in similar_pairs: | |
| row1_data = combined_df.iloc[row1] | |
| row2_data = combined_df.iloc[row2] | |
| results_data.append( | |
| { | |
| "File1": row1_data["file"], | |
| "Row1": int(row1_data["row_number"]), | |
| "File2": row2_data["file"], | |
| "Row2": int(row2_data["row_number"]), | |
| "Similarity_Score": round(similarity, 3), | |
| "Text1": ( | |
| row1_data["text"][:200] + "..." | |
| if len(row1_data["text"]) > 200 | |
| else row1_data["text"] | |
| ), | |
| "Text2": ( | |
| row2_data["text"][:200] + "..." | |
| if len(row2_data["text"]) > 200 | |
| else row2_data["text"] | |
| ), | |
| "Original_Index1": row1, | |
| "Original_Index2": row2, | |
| } | |
| ) | |
| results_df = pd.DataFrame(results_data) | |
| results_df = results_df.sort_values(["File1", "Row1", "File2", "Row2"]) | |
| progress(0.9, desc="Saving results...") | |
| # Save results | |
| output_paths = save_tabular_duplicate_results( | |
| results_df, | |
| output_folder, | |
| file_paths, | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| in_excel_tabular_sheets=in_excel_tabular_sheets, | |
| ) | |
| gr.Info(f"Found {len(results_df)} duplicate cell matches") | |
| return results_df, output_paths, full_data_by_file | |
| def save_tabular_duplicate_results( | |
| results_df: pd.DataFrame, | |
| output_folder: str, | |
| file_paths: List[str], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: List[str] = [], | |
| ) -> List[str]: | |
| """ | |
| Save tabular duplicate detection results to files. | |
| Args: | |
| results_df (pd.DataFrame): Results DataFrame | |
| output_folder (str): Output folder path | |
| file_paths (List[str]): List of file paths | |
| remove_duplicate_rows (bool): Whether to remove duplicate rows | |
| in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to | |
| Returns: | |
| List[str]: List of output file paths | |
| """ | |
| output_paths = list() | |
| output_folder_path = Path(output_folder) | |
| output_folder_path.mkdir(exist_ok=True) | |
| if results_df.empty: | |
| print("No duplicate matches to save.") | |
| return list() | |
| # Save main results | |
| results_file = output_folder_path / "tabular_duplicate_results.csv" | |
| results_df.to_csv(results_file, index=False, encoding="utf-8-sig") | |
| output_paths.append(str(results_file)) | |
| # Group results by original file to handle Excel files properly | |
| excel_files_processed = dict() # Track which Excel files have been processed | |
| # Save per-file duplicate lists | |
| for file_name, group in results_df.groupby("File2"): | |
| # Check for matches with original file names | |
| for original_file in file_paths: | |
| original_file_name = os.path.basename(original_file) | |
| if original_file_name in file_name: | |
| original_file_extension = os.path.splitext(original_file)[-1] | |
| if original_file_extension in [".xlsx", ".xls"]: | |
| # Split the string using secure regex to handle both .xlsx_ and .xls_ delimiters | |
| from tools.secure_regex_utils import safe_split_filename | |
| parts = safe_split_filename( | |
| os.path.basename(file_name), [".xlsx_", ".xls_"] | |
| ) | |
| # The sheet name is the last part after splitting | |
| file_sheet_name = parts[-1] | |
| file_path = original_file | |
| # Initialize Excel file tracking if not already done | |
| if file_path not in excel_files_processed: | |
| excel_files_processed[file_path] = { | |
| "sheets_data": dict(), | |
| "all_sheets": list(), | |
| "processed_sheets": set(), | |
| } | |
| # Read the original Excel file to get all sheet names | |
| if not excel_files_processed[file_path]["all_sheets"]: | |
| try: | |
| excel_file = pd.ExcelFile(file_path) | |
| excel_files_processed[file_path][ | |
| "all_sheets" | |
| ] = excel_file.sheet_names | |
| except Exception as e: | |
| print(f"Error reading Excel file {file_path}: {e}") | |
| continue | |
| # Read the current sheet | |
| df = read_file(file_path, excel_sheet_name=file_sheet_name) | |
| # Create duplicate rows file for this sheet | |
| file_stem = Path(file_name).stem | |
| duplicate_rows_file = ( | |
| output_folder_path | |
| / f"{file_stem}_{file_sheet_name}_duplicate_rows.csv" | |
| ) | |
| # Get unique row numbers to remove | |
| rows_to_remove = sorted(group["Row2"].unique()) | |
| duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove}) | |
| duplicate_df.to_csv(duplicate_rows_file, index=False) | |
| output_paths.append(str(duplicate_rows_file)) | |
| # Process the sheet data | |
| df_cleaned = df.copy() | |
| df_cleaned["duplicated"] = False | |
| df_cleaned.loc[rows_to_remove, "duplicated"] = True | |
| if remove_duplicate_rows: | |
| df_cleaned = df_cleaned.drop(index=rows_to_remove) | |
| # Store the processed sheet data | |
| excel_files_processed[file_path]["sheets_data"][ | |
| file_sheet_name | |
| ] = df_cleaned | |
| excel_files_processed[file_path]["processed_sheets"].add( | |
| file_sheet_name | |
| ) | |
| else: | |
| file_sheet_name = "" | |
| file_path = original_file | |
| print("file_path after match:", file_path) | |
| file_base_name = os.path.basename(file_path) | |
| df = read_file(file_path) | |
| file_stem = Path(file_name).stem | |
| duplicate_rows_file = ( | |
| output_folder_path / f"{file_stem}_duplicate_rows.csv" | |
| ) | |
| # Get unique row numbers to remove | |
| rows_to_remove = sorted(group["Row2"].unique()) | |
| duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove}) | |
| duplicate_df.to_csv(duplicate_rows_file, index=False) | |
| output_paths.append(str(duplicate_rows_file)) | |
| df_cleaned = df.copy() | |
| df_cleaned["duplicated"] = False | |
| df_cleaned.loc[rows_to_remove, "duplicated"] = True | |
| if remove_duplicate_rows: | |
| df_cleaned = df_cleaned.drop(index=rows_to_remove) | |
| file_ext = os.path.splitext(file_name)[-1] | |
| if file_ext in [".parquet"]: | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated.parquet" | |
| ) | |
| df_cleaned.to_parquet(output_path, index=False) | |
| else: | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated.csv" | |
| ) | |
| df_cleaned.to_csv( | |
| output_path, index=False, encoding="utf-8-sig" | |
| ) | |
| output_paths.append(str(output_path)) | |
| break | |
| # Process Excel files to create complete deduplicated files | |
| for file_path, file_data in excel_files_processed.items(): | |
| try: | |
| # Create output filename | |
| file_base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| file_ext = os.path.splitext(file_path)[-1] | |
| output_path = secure_join( | |
| output_folder, f"{file_base_name}_deduplicated{file_ext}" | |
| ) | |
| # Create Excel writer | |
| with pd.ExcelWriter(output_path, engine="openpyxl") as writer: | |
| # Write all sheets | |
| for sheet_name in file_data["all_sheets"]: | |
| if sheet_name in file_data["processed_sheets"]: | |
| # Use the processed (deduplicated) version | |
| file_data["sheets_data"][sheet_name].to_excel( | |
| writer, sheet_name=sheet_name, index=False | |
| ) | |
| else: | |
| # Use the original sheet (no duplicates found) | |
| original_df = read_file(file_path, excel_sheet_name=sheet_name) | |
| original_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| output_paths.append(str(output_path)) | |
| print(f"Created deduplicated Excel file: {output_path}") | |
| except Exception as e: | |
| print(f"Error creating deduplicated Excel file for {file_path}: {e}") | |
| continue | |
| return output_paths | |
| def remove_duplicate_rows_from_tabular_data( | |
| file_path: str, | |
| duplicate_rows: List[int], | |
| output_folder: str = OUTPUT_FOLDER, | |
| in_excel_tabular_sheets: List[str] = [], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ) -> str: | |
| """ | |
| Remove duplicate rows from a tabular data file. | |
| Args: | |
| file_path (str): Path to the input file | |
| duplicate_rows (List[int]): List of row indices to remove | |
| output_folder (str): Output folder for cleaned file | |
| in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to | |
| remove_duplicate_rows (bool): Whether to remove duplicate rows | |
| Returns: | |
| str: Path to the cleaned file | |
| """ | |
| try: | |
| # Load the file | |
| df = read_file( | |
| file_path, | |
| excel_sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else "", | |
| ) | |
| # Remove duplicate rows (0-indexed) | |
| df_cleaned = df.drop(index=duplicate_rows).reset_index(drop=True) | |
| # Save cleaned file | |
| file_name = os.path.basename(file_path) | |
| file_stem = os.path.splitext(file_name)[0] | |
| file_ext = os.path.splitext(file_name)[-1] | |
| output_path = secure_join(output_folder, f"{file_stem}_deduplicated{file_ext}") | |
| if file_ext in [".xlsx", ".xls"]: | |
| df_cleaned.to_excel( | |
| output_path, | |
| index=False, | |
| sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else [], | |
| ) | |
| elif file_ext in [".parquet"]: | |
| df_cleaned.to_parquet(output_path, index=False) | |
| else: | |
| df_cleaned.to_csv(output_path, index=False, encoding="utf-8-sig") | |
| return output_path | |
| except Exception as e: | |
| print(f"Error removing duplicates from {file_path}: {e}") | |
| raise | |
| def run_tabular_duplicate_analysis( | |
| files: List[str], | |
| threshold: float, | |
| min_words: int, | |
| text_columns: List[str] = [], | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| in_excel_tabular_sheets: List[str] = [], | |
| progress: Progress = Progress(track_tqdm=True), | |
| ) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]: | |
| """ | |
| Main function to run tabular duplicate analysis. | |
| Args: | |
| files (List[str]): List of file paths | |
| threshold (float): Similarity threshold | |
| min_words (int): Minimum word count | |
| text_columns (List[str], optional): Specific columns to analyze | |
| output_folder (str, optional): Output folder for results | |
| progress (Progress): Progress tracking | |
| Returns: | |
| Tuple containing results DataFrame, output paths, and full data by file | |
| """ | |
| return find_duplicate_cells_in_tabular_data( | |
| input_files=files, | |
| similarity_threshold=threshold, | |
| min_word_count=min_words, | |
| text_columns=text_columns if text_columns else [], | |
| output_folder=output_folder, | |
| do_initial_clean_dup=do_initial_clean_dup, | |
| in_excel_tabular_sheets=( | |
| in_excel_tabular_sheets if in_excel_tabular_sheets else [] | |
| ), | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| # Function to update column choices when files are uploaded | |
| def update_tabular_column_choices(files, in_excel_tabular_sheets: List[str] = []): | |
| if not files: | |
| return gr.update(choices=[]) | |
| all_columns = set() | |
| for file in files: | |
| try: | |
| file_extension = os.path.splitext(file.name)[-1] | |
| if file_extension in [".xlsx", ".xls"]: | |
| for sheet_name in in_excel_tabular_sheets: | |
| df = read_file(file.name, excel_sheet_name=sheet_name) | |
| text_cols = df.select_dtypes( | |
| include=["object", "string"] | |
| ).columns.tolist() | |
| all_columns.update(text_cols) | |
| else: | |
| df = read_file(file.name) | |
| text_cols = df.select_dtypes( | |
| include=["object", "string"] | |
| ).columns.tolist() | |
| all_columns.update(text_cols) | |
| # Get text columns | |
| text_cols = df.select_dtypes(include=["object", "string"]).columns.tolist() | |
| all_columns.update(text_cols) | |
| except Exception as e: | |
| print(f"Error reading {file.name}: {e}") | |
| continue | |
| return gr.Dropdown(choices=sorted(list(all_columns))) | |
| # Function to handle tabular duplicate detection | |
| def run_tabular_duplicate_detection( | |
| files, | |
| threshold, | |
| min_words, | |
| text_columns, | |
| output_folder: str = OUTPUT_FOLDER, | |
| do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN, | |
| in_excel_tabular_sheets: List[str] = [], | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ): | |
| if not files: | |
| print("No files uploaded") | |
| return pd.DataFrame(), [], gr.Dropdown(choices=[]), 0, "deduplicate" | |
| start_time = time.time() | |
| task_textbox = "deduplicate" | |
| # If output folder doesn't end with a forward slash, add one | |
| if not output_folder.endswith("/"): | |
| output_folder = output_folder + "/" | |
| file_paths = list() | |
| if isinstance(files, str): | |
| # If 'files' is a single string, treat it as a list with one element | |
| file_paths.append(files) | |
| elif isinstance(files, list): | |
| # If 'files' is a list, iterate through its elements | |
| for f_item in files: | |
| if isinstance(f_item, str): | |
| # If an element is a string, it's a direct file path | |
| file_paths.append(f_item) | |
| elif hasattr(f_item, "name"): | |
| # If an element has a '.name' attribute (e.g., a Gradio File object), use its name | |
| file_paths.append(f_item.name) | |
| else: | |
| # Log a warning for unexpected element types within the list | |
| print( | |
| f"Warning: Skipping an element in 'files' list that is neither a string nor has a '.name' attribute: {type(f_item)}" | |
| ) | |
| elif hasattr(files, "name"): | |
| # Handle the case where a single file object (e.g., gr.File) is passed directly, not in a list | |
| file_paths.append(files.name) | |
| else: | |
| # Raise an error for any other unexpected type of the 'files' argument itself | |
| raise TypeError( | |
| f"Unexpected type for 'files' argument: {type(files)}. Expected str, list of str/file objects, or a single file object." | |
| ) | |
| if len(file_paths) > MAX_SIMULTANEOUS_FILES: | |
| out_message = f"Number of files to deduplicate is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files." | |
| print(out_message) | |
| raise Exception(out_message) | |
| results_df, output_paths, full_data = run_tabular_duplicate_analysis( | |
| files=file_paths, | |
| threshold=threshold, | |
| min_words=min_words, | |
| text_columns=text_columns if text_columns else [], | |
| output_folder=output_folder, | |
| do_initial_clean_dup=do_initial_clean_dup, | |
| in_excel_tabular_sheets=( | |
| in_excel_tabular_sheets if in_excel_tabular_sheets else None | |
| ), | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| # Update file choices for cleaning | |
| file_choices = list(set([f for f in file_paths])) | |
| end_time = time.time() | |
| processing_time = round(end_time - start_time, 2) | |
| return ( | |
| results_df, | |
| output_paths, | |
| gr.Dropdown(choices=file_choices), | |
| processing_time, | |
| task_textbox, | |
| ) | |
| # Function to handle row selection for preview | |
| def handle_tabular_row_selection(results_df, evt: gr.SelectData): | |
| if not evt: | |
| return None, "", "" | |
| if not isinstance(results_df, pd.DataFrame): | |
| return None, "", "" | |
| elif results_df.empty: | |
| return None, "", "" | |
| selected_index = evt.index[0] | |
| if selected_index >= len(results_df): | |
| return None, "", "" | |
| row = results_df.iloc[selected_index] | |
| return selected_index, row["Text1"], row["Text2"] | |
| # Function to clean duplicates from selected file | |
| def clean_tabular_duplicates( | |
| file_name, | |
| results_df, | |
| output_folder, | |
| in_excel_tabular_sheets: str = "", | |
| remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS, | |
| ): | |
| if not file_name or results_df.empty: | |
| return None | |
| # Get duplicate rows for this file | |
| file_duplicates = results_df[results_df["File2"] == file_name]["Row2"].tolist() | |
| if not file_duplicates: | |
| return None | |
| try: | |
| # Find the original file path | |
| # This is a simplified approach - in practice you might want to store file paths | |
| cleaned_file = remove_duplicate_rows_from_tabular_data( | |
| file_path=file_name, | |
| duplicate_rows=file_duplicates, | |
| output_folder=output_folder, | |
| in_excel_tabular_sheets=in_excel_tabular_sheets, | |
| remove_duplicate_rows=remove_duplicate_rows, | |
| ) | |
| return cleaned_file | |
| except Exception as e: | |
| print(f"Error cleaning duplicates: {e}") | |
| return None | |