Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from docx import Document | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import os | |
| import shutil | |
| import gradio as gr | |
| import tempfile | |
| import zipfile | |
| from io import BytesIO | |
| import json | |
| import docx2pdf | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.input_files = [] | |
| self.excel_data = None | |
| self.mapping = {} | |
| self.conditional_docs = {} | |
| self.general_docs = [] | |
| self.column_associations = {} | |
| self.first_df = None | |
| self.second_df = None | |
| self.folder_column = None # Agregar para recordar la columna de carpetas | |
| def load_excel_file(self, excel_file): | |
| """Cargar archivo Excel y retornar información de columnas""" | |
| if excel_file is None: | |
| return "No file selected", gr.Dropdown(choices=[]), "", gr.Dropdown(choices=[]) | |
| try: | |
| self.excel_data = pd.read_excel(excel_file.name) | |
| columns = list(self.excel_data.columns) | |
| preview = self.excel_data.head().to_string() | |
| return (f"Excel file loaded successfully. Columns: {', '.join(columns)}", | |
| gr.Dropdown(choices=columns), | |
| preview, | |
| gr.Dropdown(choices=columns)) | |
| except Exception as e: | |
| return f"Error loading Excel file: {str(e)}", gr.Dropdown(choices=[]), "", gr.Dropdown(choices=[]) | |
| def set_folder_column(self, column_name): | |
| """Establecer la columna para nombrar carpetas""" | |
| self.folder_column = column_name | |
| return f"Folder column set to: {column_name}" | |
| def add_keyword_mapping(self, column_name, keyword): | |
| """Agregar un mapeo individual de palabra clave""" | |
| if not column_name or not keyword: | |
| return "Please select a column and enter a keyword" | |
| self.mapping[keyword] = column_name | |
| # Retornar lista actualizada de mapeos | |
| mappings_text = "\n".join([f"{k} -> {v}" for k, v in self.mapping.items()]) | |
| return f"Mapping added successfully!\n\nCurrent mappings:\n{mappings_text}" | |
| def load_conditional_files(self, files): | |
| """Cargar archivos para documentos condicionales""" | |
| if not files: | |
| return "No files selected" | |
| file_list = [] | |
| for file in files: | |
| if file.name.endswith(('.docx', '.pdf')): | |
| file_list.append(file.name) | |
| return f"Loaded {len(file_list)} conditional files: {', '.join([os.path.basename(f) for f in file_list])}" | |
| def get_excel_columns(self): | |
| """Obtener columnas del Excel cargado""" | |
| if self.excel_data is not None: | |
| return list(self.excel_data.columns) | |
| return [] | |
| def get_column_unique_values(self, column_name): | |
| """Obtener valores únicos de una columna específica""" | |
| if self.excel_data is not None and column_name in self.excel_data.columns: | |
| values = self.excel_data[column_name].dropna().unique().tolist() | |
| return [str(v) for v in values] | |
| return [] | |
| def set_document_condition(self, column_name, condition_value, selected_files): | |
| """Establecer condición para documentos específicos""" | |
| if not selected_files or not column_name or not condition_value: | |
| return "Please select files, column name, and condition value" | |
| # Inicializar estructura si no existe | |
| if column_name not in self.conditional_docs: | |
| self.conditional_docs[column_name] = {} | |
| if condition_value not in self.conditional_docs[column_name]: | |
| self.conditional_docs[column_name][condition_value] = [] | |
| # Agregar archivos a la condición | |
| for file in selected_files: | |
| if file not in self.conditional_docs[column_name][condition_value]: | |
| self.conditional_docs[column_name][condition_value].append(file) | |
| # Generar resumen de todas las condiciones | |
| summary = "Current Conditional Documents:\n\n" | |
| for col, conditions in self.conditional_docs.items(): | |
| summary += f"Column: {col}\n" | |
| for value, files in conditions.items(): | |
| summary += f" When {col} = '{value}':\n" | |
| for file in files: | |
| summary += f" - {os.path.basename(file.name) if hasattr(file, 'name') else file}\n" | |
| summary += "\n" | |
| return summary | |
| def load_first_excel(self, file): | |
| """Cargar primer archivo Excel para asociaciones""" | |
| if file is None: | |
| return "No file selected", gr.Dropdown(choices=[]) | |
| try: | |
| self.first_df = pd.read_excel(file.name) | |
| columns = list(self.first_df.columns) | |
| return f"First Excel loaded: {', '.join(columns)}", gr.Dropdown(choices=columns) | |
| except Exception as e: | |
| return f"Error: {str(e)}", gr.Dropdown(choices=[]) | |
| def load_second_excel(self, file): | |
| """Cargar segundo archivo Excel para asociaciones""" | |
| if file is None: | |
| return "No file selected", gr.Dropdown(choices=[]) | |
| try: | |
| self.second_df = pd.read_excel(file.name) | |
| columns = list(self.second_df.columns) | |
| return f"Second Excel loaded: {', '.join(columns)}", gr.Dropdown(choices=columns) | |
| except Exception as e: | |
| return f"Error: {str(e)}", gr.Dropdown(choices=[]) | |
| def get_column_values(self, df, column_name): | |
| """Obtener valores únicos de una columna""" | |
| if df is not None and column_name in df.columns: | |
| return list(df[column_name].dropna().unique()) | |
| return [] | |
| def assign_values(self, first_column, first_value, second_column, second_value): | |
| """Asignar valores para asociaciones de columnas""" | |
| if not all([first_column, first_value, second_column, second_value]): | |
| return "Please fill all fields" | |
| # Crear asociación | |
| if first_value not in self.column_associations: | |
| self.column_associations[first_value] = second_value | |
| # Retornar resumen actual de asociaciones | |
| associations_text = "\n".join([f"{k} -> {v}" for k, v in self.column_associations.items()]) | |
| return f"Association added. Current associations:\n{associations_text}" | |
| def load_general_documents(self, files): | |
| """Cargar documentos generales""" | |
| if not files: | |
| return "No files selected" | |
| self.general_docs = files | |
| file_names = [os.path.basename(f.name) for f in files] | |
| return f"Loaded {len(files)} general documents: {', '.join(file_names)}" | |
| def process_documents(self, folder_column=None, progress=gr.Progress()): | |
| """Procesar todos los documentos""" | |
| if self.excel_data is None: | |
| return "Please load an Excel file first", None | |
| # Usar la columna guardada si no se proporciona una | |
| if folder_column is None: | |
| folder_column = self.folder_column | |
| if not folder_column: | |
| return "Please select a column for folder naming in Main Setup tab", None | |
| if folder_column not in self.excel_data.columns: | |
| return f"Column '{folder_column}' not found in Excel file", None | |
| try: | |
| # Crear directorio temporal para salida | |
| output_dir = tempfile.mkdtemp() | |
| total_rows = len(self.excel_data) | |
| for index, row in progress.tqdm(self.excel_data.iterrows(), total=total_rows, desc="Processing documents"): | |
| folder_name = str(row[folder_column]).strip().replace(" ", "_") | |
| client_folder = os.path.join(output_dir, folder_name) | |
| os.makedirs(client_folder, exist_ok=True) | |
| # Procesar documentos generales | |
| for file in self.general_docs: | |
| self.process_file(file, row, client_folder) | |
| # Procesar documentos condicionales | |
| for column, conditions in self.conditional_docs.items(): | |
| if column in row: | |
| value = str(row[column]) | |
| if value in conditions: | |
| for doc_file in conditions[value]: | |
| self.process_file(doc_file, row, client_folder) | |
| # Crear archivo ZIP con todos los resultados | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for root, dirs, files in os.walk(output_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.relpath(file_path, output_dir) | |
| zip_file.write(file_path, arcname) | |
| zip_buffer.seek(0) | |
| # Guardar ZIP temporal | |
| zip_path = os.path.join(tempfile.gettempdir(), "processed_documents.zip") | |
| with open(zip_path, 'wb') as f: | |
| f.write(zip_buffer.getvalue()) | |
| # Limpiar directorio temporal | |
| shutil.rmtree(output_dir) | |
| return f"Processing complete! Generated documents for {total_rows} entries.", zip_path | |
| except Exception as e: | |
| return f"Error during processing: {str(e)}", None | |
| def process_file(self, file, row, client_folder): | |
| """Procesar un archivo individual""" | |
| filename = os.path.basename(file.name) | |
| if filename.endswith(".docx"): | |
| # Copiar archivo a temporal para procesamiento | |
| temp_path = os.path.join(tempfile.gettempdir(), filename) | |
| shutil.copy(file.name, temp_path) | |
| doc = Document(temp_path) | |
| self.replace_keywords_in_doc(doc, row) | |
| new_doc_path = os.path.join(client_folder, filename) | |
| doc.save(new_doc_path) | |
| # Convertir a PDF | |
| try: | |
| pdf_filename = os.path.splitext(filename)[0] + ".pdf" | |
| pdf_path = os.path.join(client_folder, pdf_filename) | |
| docx2pdf.convert(new_doc_path, pdf_path) | |
| except: | |
| pass # Si falla la conversión a PDF, continuar | |
| elif filename.endswith(".pdf"): | |
| shutil.copy(file.name, os.path.join(client_folder, filename)) | |
| def replace_keywords_in_doc(self, doc, row): | |
| """Reemplazar palabras clave en documento""" | |
| for keyword, column in self.mapping.items(): | |
| if column in row: | |
| replacement = str(row[column]) | |
| # Aplicar asociaciones de columnas si existen | |
| if replacement in self.column_associations: | |
| replacement = self.column_associations[replacement] | |
| # Manejar texto multilínea | |
| if '\\n' in replacement: | |
| replacement = self.format_multiline_text(replacement) | |
| # Reemplazar en párrafos | |
| for paragraph in doc.paragraphs: | |
| self.replace_text_in_runs(paragraph.runs, keyword, replacement) | |
| # Reemplazar en encabezados | |
| for section in doc.sections: | |
| header = section.header | |
| for paragraph in header.paragraphs: | |
| self.replace_text_in_runs(paragraph.runs, keyword, replacement) | |
| # Reemplazar en tablas | |
| for table in doc.tables: | |
| for table_row in table.rows: | |
| for cell in table_row.cells: | |
| for paragraph in cell.paragraphs: | |
| self.replace_text_in_runs(paragraph.runs, keyword, replacement) | |
| # Reemplazar en shapes y textboxes | |
| self.replace_text_in_shapes_and_textboxes(doc, keyword, replacement) | |
| self.replace_text_in_textboxes(doc, keyword, replacement) | |
| def replace_text_in_runs(self, runs, keyword, replacement): | |
| """Reemplazar texto en runs manteniendo formato""" | |
| for run in runs: | |
| if keyword in run.text: | |
| run.text = run.text.replace(keyword, replacement) | |
| run.font.highlight_color = None | |
| def format_multiline_text(self, text): | |
| """Formatear texto multilínea""" | |
| lines = text.split('\\n') | |
| return '\n'.join(lines) | |
| def replace_text_in_textboxes(self, document, placeholder, replacement): | |
| """Reemplazar texto en cuadros de texto""" | |
| txbx_contents = document._element.findall('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}txbxContent') | |
| for txbx in txbx_contents: | |
| text_elements = txbx.findall('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t') | |
| for text_element in text_elements: | |
| if text_element.text and placeholder in text_element.text: | |
| text_element.text = text_element.text.replace(placeholder, str(replacement)) | |
| def replace_text_in_shapes_and_textboxes(self, document, placeholder, replacement): | |
| """Reemplazar texto en shapes y textboxes""" | |
| for element in document._element.iter(): | |
| text_elements = element.findall('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t') | |
| for text_element in text_elements: | |
| if text_element.text and placeholder in text_element.text: | |
| text_element.text = text_element.text.replace(placeholder, str(replacement)) | |
| for shape in document.inline_shapes: | |
| if hasattr(shape, '_element'): | |
| shape_xml = shape._element | |
| text_elements = shape_xml.findall('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t') | |
| for text_element in text_elements: | |
| if text_element.text and placeholder in text_element.text: | |
| text_element.text = text_element.text.replace(placeholder, str(replacement)) | |
| # Inicializar procesador | |
| processor = DocumentProcessor() | |
| # Crear interfaz Gradio | |
| def create_interface(): | |
| with gr.Blocks(title="Document Processor", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Document Processor") | |
| gr.Markdown("Process documents by replacing keywords with Excel data") | |
| with gr.Tabs(): | |
| # Pestaña Principal | |
| with gr.TabItem("Main Setup"): | |
| with gr.Row(): | |
| excel_file = gr.File(label="Upload Excel File", file_types=[".xlsx", ".xls"]) | |
| folder_column = gr.Dropdown(label="Select Column for Folder Naming", choices=[], interactive=True) | |
| excel_status = gr.Textbox(label="Excel Status", interactive=False) | |
| excel_preview = gr.Textbox(label="Excel Preview", lines=10, interactive=False) | |
| gr.Markdown("### Keyword Mapping") | |
| gr.Markdown("Select a column and enter the keyword that will be replaced in documents") | |
| with gr.Row(): | |
| mapping_column = gr.Dropdown(label="Select Column", choices=[], interactive=True) | |
| keyword_input = gr.Textbox(label="Enter Keyword (e.g., {{name}}, {{address}})", placeholder="{{keyword}}") | |
| add_mapping_btn = gr.Button("Add Mapping", variant="secondary") | |
| mapping_status = gr.Textbox(label="Current Mappings", lines=8, interactive=False) | |
| # Pestaña Documentos Condicionales | |
| with gr.TabItem("Conditional Documents"): | |
| gr.Markdown("### Set up conditional documents that will only be included based on Excel data") | |
| conditional_files = gr.File( | |
| label="Upload Conditional Documents", | |
| file_count="multiple", | |
| file_types=[".docx", ".pdf"] | |
| ) | |
| conditional_status = gr.Textbox(label="Conditional Files Status", interactive=False) | |
| gr.Markdown("### Set Conditions") | |
| gr.Markdown("Choose which documents to include based on Excel column values") | |
| with gr.Row(): | |
| condition_column = gr.Dropdown( | |
| label="Select Excel Column for Condition", | |
| choices=[], | |
| interactive=True | |
| ) | |
| condition_value = gr.Dropdown( | |
| label="Select Value from Column", | |
| choices=[], | |
| interactive=True | |
| ) | |
| selected_docs = gr.CheckboxGroup( | |
| label="Select Documents to Include for this Condition", | |
| choices=[], | |
| interactive=True | |
| ) | |
| set_condition_btn = gr.Button("Set Condition for Selected Documents") | |
| condition_result = gr.Textbox(label="Condition Status", lines=8, interactive=False) | |
| # Pestaña Asociaciones de Columnas | |
| with gr.TabItem("Column Associations"): | |
| gr.Markdown("### Set up value mappings between different Excel files") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**First Excel File**") | |
| first_excel = gr.File(label="First Excel File", file_types=[".xlsx", ".xls"]) | |
| first_status = gr.Textbox(label="First Excel Status", interactive=False) | |
| first_column = gr.Dropdown(label="First Column", choices=[]) | |
| first_value = gr.Dropdown(label="First Value", choices=[]) | |
| with gr.Column(): | |
| gr.Markdown("**Second Excel File**") | |
| second_excel = gr.File(label="Second Excel File", file_types=[".xlsx", ".xls"]) | |
| second_status = gr.Textbox(label="Second Excel Status", interactive=False) | |
| second_column = gr.Dropdown(label="Second Column", choices=[]) | |
| second_value = gr.Dropdown(label="Second Value", choices=[]) | |
| assign_btn = gr.Button("Create Association") | |
| associations_display = gr.Textbox(label="Current Associations", lines=10, interactive=False) | |
| # Pestaña Documentos Generales | |
| with gr.TabItem("General Documents"): | |
| gr.Markdown("### Upload documents that will be processed for all entries") | |
| general_files = gr.File( | |
| label="Upload General Documents", | |
| file_count="multiple", | |
| file_types=[".docx", ".pdf"] | |
| ) | |
| general_status = gr.Textbox(label="General Documents Status", interactive=False) | |
| # Pestaña de Procesamiento | |
| with gr.TabItem("Process Documents"): | |
| gr.Markdown("## Ready to Process?") | |
| gr.Markdown("Make sure you have completed all the setup in the previous tabs:") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| ### Checklist: | |
| ✅ **Main Setup**: Excel file loaded and folder column selected | |
| ✅ **Keyword Mapping**: Keywords mapped to Excel columns | |
| ✅ **Conditional Documents**: Documents with conditions set (optional) | |
| ✅ **Column Associations**: Value mappings configured (optional) | |
| ✅ **General Documents**: Documents for all entries uploaded (optional) | |
| """) | |
| with gr.Column(): | |
| process_btn = gr.Button( | |
| "🚀 Process All Documents", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| process_status = gr.Textbox(label="Processing Status", lines=3, interactive=False) | |
| download_file = gr.File(label="📥 Download Processed Documents") | |
| # CONFIGURAR TODOS LOS EVENTOS DESPUÉS DE DEFINIR TODOS LOS COMPONENTES | |
| # Eventos de Main Setup | |
| excel_file.change( | |
| fn=lambda file: ( | |
| processor.load_excel_file(file)[0], # status | |
| processor.load_excel_file(file)[1], # folder_column choices | |
| processor.load_excel_file(file)[2], # preview | |
| processor.load_excel_file(file)[3], # mapping_column choices | |
| gr.Dropdown(choices=processor.get_excel_columns()) # condition_column choices | |
| ), | |
| inputs=[excel_file], | |
| outputs=[excel_status, folder_column, excel_preview, mapping_column, condition_column] | |
| ) | |
| add_mapping_btn.click( | |
| fn=processor.add_keyword_mapping, | |
| inputs=[mapping_column, keyword_input], | |
| outputs=[mapping_status] | |
| ) | |
| folder_column.change( | |
| fn=processor.set_folder_column, | |
| inputs=[folder_column], | |
| outputs=[] | |
| ) | |
| # Eventos de Conditional Documents | |
| conditional_files.change( | |
| fn=lambda files: ( | |
| processor.load_conditional_files(files), | |
| gr.CheckboxGroup(choices=[os.path.basename(f.name) for f in files] if files else []) | |
| ), | |
| inputs=[conditional_files], | |
| outputs=[conditional_status, selected_docs] | |
| ) | |
| condition_column.change( | |
| fn=lambda column_name: gr.Dropdown(choices=processor.get_column_unique_values(column_name) if column_name else []), | |
| inputs=[condition_column], | |
| outputs=[condition_value] | |
| ) | |
| set_condition_btn.click( | |
| fn=lambda col, val, selected_doc_names, files: processor.set_document_condition( | |
| col, val, [f for f in files if os.path.basename(f.name) in selected_doc_names] if files else [] | |
| ), | |
| inputs=[condition_column, condition_value, selected_docs, conditional_files], | |
| outputs=[condition_result] | |
| ) | |
| # Eventos para asociaciones | |
| first_excel.change( | |
| fn=processor.load_first_excel, | |
| inputs=[first_excel], | |
| outputs=[first_status, first_column] | |
| ) | |
| second_excel.change( | |
| fn=processor.load_second_excel, | |
| inputs=[second_excel], | |
| outputs=[second_status, second_column] | |
| ) | |
| first_column.change( | |
| fn=lambda col: gr.Dropdown(choices=processor.get_column_values(processor.first_df, col) if processor.first_df is not None else []), | |
| inputs=[first_column], | |
| outputs=[first_value] | |
| ) | |
| second_column.change( | |
| fn=lambda col: gr.Dropdown(choices=processor.get_column_values(processor.second_df, col) if processor.second_df is not None else []), | |
| inputs=[second_column], | |
| outputs=[second_value] | |
| ) | |
| assign_btn.click( | |
| fn=processor.assign_values, | |
| inputs=[first_column, first_value, second_column, second_value], | |
| outputs=[associations_display] | |
| ) | |
| # Eventos de General Documents | |
| general_files.change( | |
| fn=processor.load_general_documents, | |
| inputs=[general_files], | |
| outputs=[general_status] | |
| ) | |
| # Eventos de procesamiento | |
| process_btn.click( | |
| fn=lambda: processor.process_documents(None), | |
| inputs=[], | |
| outputs=[process_status, download_file] | |
| ) | |
| return demo | |
| # Crear y lanzar la aplicación | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch(share=True) |