| | import gradio as gr |
| | import spacy |
| | import pandas as pd |
| | from docx import Document |
| | import tempfile |
| | import os |
| | import multiprocessing as mp |
| | import psutil |
| | from datetime import datetime |
| | from typing import List, Dict |
| | from dataclasses import dataclass |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| |
|
| | @dataclass |
| | class ProcessingResult: |
| | filename: str |
| | names: List[str] |
| | status: str |
| | error: str = None |
| |
|
| | class SystemMonitor: |
| | @staticmethod |
| | def get_status() -> str: |
| | cpu_usage = psutil.cpu_percent() |
| | memory = psutil.virtual_memory() |
| | return f"CPU: {cpu_usage}% | RAM: {memory.percent}% | Último update: {datetime.now().strftime('%H:%M:%S')}" |
| |
|
| | class TextProcessor: |
| | def __init__(self, model_name: str = 'zh_core_web_trf'): |
| | self.nlp = spacy.load(model_name) |
| | self.max_chunk_size = 100000 |
| |
|
| | def extract_names(self, text: str) -> List[str]: |
| | doc = self.nlp(text) |
| | return [ent.text for ent in doc.ents if ent.label_ == 'PERSON'] |
| |
|
| | def split_text(self, text: str) -> List[str]: |
| | result = [] |
| | current_chunk = [] |
| | current_length = 0 |
| | |
| | for paragraph in text.split('\n'): |
| | paragraph_length = len(paragraph) + 1 |
| | if current_length + paragraph_length <= self.max_chunk_size: |
| | current_chunk.append(paragraph) |
| | current_length += paragraph_length |
| | else: |
| | result.append('\n'.join(current_chunk)) |
| | current_chunk = [paragraph] |
| | current_length = paragraph_length |
| |
|
| | if current_chunk: |
| | result.append('\n'.join(current_chunk)) |
| |
|
| | return result |
| |
|
| | class DocumentProcessor: |
| | def __init__(self, text_processor: TextProcessor): |
| | self.text_processor = text_processor |
| | self.num_processes = mp.cpu_count() |
| |
|
| | def process_document(self, file_path: str, progress=None) -> ProcessingResult: |
| | try: |
| | if progress: |
| | progress(0.1, desc=f"Procesando {os.path.basename(file_path)}...") |
| |
|
| | |
| | document = Document(file_path) |
| | text = ' '.join(para.text for para in document.paragraphs) |
| |
|
| | if progress: |
| | progress(0.3, desc="Dividiendo texto en fragmentos...") |
| |
|
| | |
| | fragments = self.text_processor.split_text(text) |
| |
|
| | if progress: |
| | progress(0.5, desc="Extrayendo nombres...") |
| |
|
| | |
| | with mp.Pool(processes=self.num_processes) as pool: |
| | all_names = [] |
| | for names in pool.imap(self.text_processor.extract_names, fragments): |
| | all_names.extend(names) |
| |
|
| | if progress: |
| | progress(0.8, desc="Finalizando procesamiento...") |
| |
|
| | return ProcessingResult( |
| | filename=os.path.basename(file_path), |
| | names=list(set(all_names)), |
| | status="success" |
| | ) |
| |
|
| | except Exception as e: |
| | return ProcessingResult( |
| | filename=os.path.basename(file_path), |
| | names=[], |
| | status="error", |
| | error=str(e) |
| | ) |
| |
|
| | class ResultsExporter: |
| | @staticmethod |
| | def export_to_excel(results: List[ProcessingResult]) -> str: |
| | |
| | data = [] |
| | for result in results: |
| | for name in result.names: |
| | data.append({ |
| | 'Archivo': result.filename, |
| | 'Nombre': name, |
| | 'Estado': result.status, |
| | 'Error': result.error |
| | }) |
| |
|
| | df = pd.DataFrame(data) |
| |
|
| | |
| | temp_dir = tempfile.mkdtemp() |
| | temp_file_path = os.path.join(temp_dir, "nombres_extraidos.xlsx") |
| | |
| | with pd.ExcelWriter(temp_file_path, engine='openpyxl') as writer: |
| | df.to_excel(writer, index=False) |
| |
|
| | return temp_file_path |
| |
|
| | class NameExtractorApp: |
| | def __init__(self): |
| | self.text_processor = TextProcessor() |
| | self.document_processor = DocumentProcessor(self.text_processor) |
| | self.system_monitor = SystemMonitor() |
| | self.results_exporter = ResultsExporter() |
| |
|
| | def process_files(self, files: List[tempfile._TemporaryFileWrapper], progress=None) -> str: |
| | if progress: |
| | progress(0, desc="Iniciando procesamiento...") |
| |
|
| | results = [] |
| | total_files = len(files) |
| |
|
| | |
| | with ThreadPoolExecutor(max_workers=min(total_files, os.cpu_count() * 2)) as executor: |
| | future_to_file = { |
| | executor.submit(self.document_processor.process_document, file.name): file |
| | for file in files |
| | } |
| |
|
| | for i, future in enumerate(as_completed(future_to_file)): |
| | result = future.result() |
| | results.append(result) |
| | if progress: |
| | progress((i + 1) / total_files, |
| | desc=f"Procesado {i + 1} de {total_files} archivos...") |
| |
|
| | if progress: |
| | progress(0.9, desc="Generando archivo de resultados...") |
| |
|
| | |
| | output_file = self.results_exporter.export_to_excel(results) |
| |
|
| | if progress: |
| | progress(1.0, desc="¡Procesamiento completado!") |
| |
|
| | return output_file |
| |
|
| | def create_interface(self): |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# Extractor de Nombres - Procesamiento Paralelo") |
| | gr.Markdown("Sube uno o varios archivos .docx para extraer nombres de personas usando NLP.") |
| | |
| | |
| | system_status = gr.Textbox(label="Estado del Sistema", value="Inicializando...") |
| | |
| | |
| | file_input = gr.File(file_types=[".docx"], file_count="multiple") |
| | output_file = gr.File(label="Archivo de resultados") |
| | |
| | |
| | process_btn = gr.Button("Procesar Documentos") |
| | process_btn.click( |
| | fn=self.process_files, |
| | inputs=file_input, |
| | outputs=output_file |
| | ) |
| | |
| | |
| | demo.load(self.system_monitor.get_status, None, system_status, every=5) |
| |
|
| | return demo |
| |
|
| | def main(): |
| | app = NameExtractorApp() |
| | demo = app.create_interface() |
| | demo.launch() |
| |
|
| | if __name__ == "__main__": |
| | main() |