# CYTA - Código completo corregido (descarga, traducción, embeddings, Gradio) # CYTA - Código completo corregido (descarga, traducción, embeddings, Gradio) v4.1 import os import json import re import numpy as np import pandas as pd import tensorflow_hub as hub from sklearn.neighbors import NearestNeighbors import gradio as gr from deep_translator import GoogleTranslator from google.oauth2 import service_account from googleapiclient.discovery import build import time # ------------------------- # Config # ------------------------- SERVICE_ACCOUNT_JSON = os.environ.get("CytaKey") # ajusta si tu JSON está en otro sitio DEFAULT_SHEET_RANGE = "Data!A:Z" TRANSLATOR_SRC = "es" TRANSLATOR_TGT = "en" # Tolerancia fija DEFAULT_THRESHOLD = 0.28 # ------------------------- # 1. Bases # ------------------------- def get_bases_info(): return [ { "nombre": "Facultad de Ciencia USACH", "url": os.environ.get("URL_FC_USACH"), "fecha": "11-10-2025;FCiencia", "resumen": "Lista de académicos Facultad de Ciencia con base de datos de patentes, papers y líneas de investigación." }, { "nombre": "Lista ScienceUp: Facultades de Ciencia USACH, PUCV, UCN", "url": os.environ.get("URL_SCIENCEUP"), "fecha": "15-10-2025;ScienceUp", "resumen": "Información del Catálogo de capacidades ScienceUp: solo laboratorios y líneas de investigación." }, { "nombre": "Lista USACH", "url": os.environ.get("URL_USACH"), "fecha": "28-05-2024;Usach", "resumen": "Información general de líneas de investigación de todos los académicos de la Universidad de Santiago de Chile (en construcción)." }, { "nombre": "Facultad de Ciencia USACH Plus", "url": os.environ.get("URL_PUBS_CIENCIA"), "fecha": "13-11-2025;FCienciaPlus", "resumen": "Base extendida: papers + autores asociados a la Facultad de Ciencia." } ] # ------------------------- # 2. Utilidades # ------------------------- def preprocess(text): if text is None: return "" return str(text).replace('\n', ' ').strip() def download_google_sheet(sheet_id, output_path="data.xlsx", range_name=DEFAULT_SHEET_RANGE, service_account_json=None): SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly'] # usar secreto por defecto si no se pasa uno if service_account_json is None: service_account_json = SERVICE_ACCOUNT_JSON creds_info = json.loads(service_account_json) creds = service_account.Credentials.from_service_account_info(creds_info, scopes=SCOPES) service = build('sheets', 'v4', credentials=creds) try: result = service.spreadsheets().values().get( spreadsheetId=sheet_id, range=range_name ).execute() values = result.get('values', []) if not values: print("⚠️ No se encontraron datos en la hoja.") return None df = pd.DataFrame(values[1:], columns=values[0]) df.to_excel(output_path, index=False) print(f"✅ Descarga completada y guardada en {output_path}") return output_path except Exception as e: print(f"❌ Error descargando hoja: {e}") return None def excel_to_texts(path, column_index=22, sheet_name=0): """ Convierte una columna de un Excel a lista de textos procesados. """ try: xls = pd.ExcelFile(path) # Resolver hoja a usar if isinstance(sheet_name, int): sheet_to_use = xls.sheet_names[sheet_name] if sheet_name < len(xls.sheet_names) else xls.sheet_names[0] else: sheet_to_use = sheet_name if sheet_name in xls.sheet_names else xls.sheet_names[0] if sheet_name not in xls.sheet_names: print(f"⚠️ Hoja '{sheet_name}' no encontrada. Usando '{sheet_to_use}' en su lugar.") df = pd.read_excel(xls, sheet_name=sheet_to_use) # Resolver columna: puede ser índice (int) o nombre (str) if isinstance(column_index, int): if column_index >= len(df.columns): raise ValueError(f"El índice de columna {column_index} excede el número de columnas ({len(df.columns)})") col = df.columns[column_index] else: if column_index not in df.columns: col = df.columns[22] if len(df.columns) > 22 else df.columns[0] print(f"⚠️ Columna '{column_index}' no encontrada. Usando '{col}' en su lugar.") else: col = column_index texts = df[col].dropna().astype(str).tolist() texts = [preprocess(t) for t in texts] print(f"✅ Transformados {len(texts)} registros desde Excel (hoja: '{sheet_to_use}', columna: '{col}')") return texts except Exception as e: print(f"❌ Error en excel_to_texts: {e}") return [] def translate_texts(texts, src=TRANSLATOR_SRC, tgt=TRANSLATOR_TGT, cache_path=None, max_chunk_len=4500, sleep_between=0.35): # Cargar cache si existe y crear dict original->translated cache_map = {} if cache_path and os.path.exists(cache_path): try: df_cache = pd.read_excel(cache_path) if "Original" in df_cache.columns and "Translated" in df_cache.columns: cache_map = {str(o): str(t) for o, t in zip(df_cache["Original"].astype(str), df_cache["Translated"].astype(str))} print(f"✅ Usando cache de traducción: {cache_map} (entradas: {len(cache_map)})") except Exception as e: print(f"⚠️ No se pudo leer cache {cache_path}: {e}") translator = GoogleTranslator(source=src, target=tgt) translated = [] # For caching later new_cache_rows = [] for idx, txt in enumerate(texts): orig = "" if txt is None else str(txt) if orig in cache_map: translated_text = cache_map[orig] translated.append(translated_text) continue # si el texto es corto, traducir directamente if len(orig) <= max_chunk_len: try: translated_text = translator.translate(orig) translated.append(translated_text) new_cache_rows.append((orig, translated_text)) time.sleep(sleep_between) except Exception as e: print(f"❌ Error traduciendo fila {idx+1}, usando original: {e}") translated.append(orig) else: # dividir en fragmentos intentando cortar por punto para no romper oraciones parts = [] start = 0 L = len(orig) while start < L: end = min(start + max_chunk_len, L) if end < L: # buscar último punto antes de 'end' para cortar en oración (si existe) cut = orig.rfind('.', start, end) if cut == -1 or cut <= start: cut = end else: cut = cut + 1 # incluir el punto else: cut = end part = orig[start:cut].strip() if part: parts.append(part) start = cut # traducir partes secuencialmente translated_parts = [] for j, part in enumerate(parts): try: tp = translator.translate(part) translated_parts.append(tp) time.sleep(sleep_between) except Exception as e: print(f"❌ Error traduciendo fragmento {j+1} de fila {idx+1}: {e}") translated_parts.append(part) # fallback al original del fragmento translated_text = " ".join(translated_parts) translated.append(translated_text) new_cache_rows.append((orig, translated_text)) # Guardar cache incremental si corresponde (por pruebas mejor dejarlo, en cyta 3) if cache_path: try: if os.path.exists(cache_path): # leer existente y concatenar evitando duplicados exactos df_old = pd.read_excel(cache_path) rows = list(zip(df_old["Original"].astype(str), df_old["Translated"].astype(str))) if "Original" in df_old.columns and "Translated" in df_old.columns else [] rows.extend(new_cache_rows) else: rows = new_cache_rows # eliminar duplicados manteniendo el primero seen = set() rows_uniq = [] for o, t in rows: if o not in seen: rows_uniq.append((o, t)) seen.add(o) df_out = pd.DataFrame(rows_uniq, columns=["Original", "Translated"]) df_out.to_excel(cache_path, index=False) print(f"✅ Traducciones guardadas/actualizadas en cache: {cache_path} (total: {len(df_out)})") except Exception as e: print(f"⚠️ No se pudo guardar cache en {cache_path}: {e}") return translated def text_to_chunks(texts, word_length=400): """ Divide el texto de cada académico en múltiples sub-chunks semánticos, manteniendo un ID común (group_id) para agruparlos después. """ import re try: chunks = [] for idx, raw in enumerate(texts): if not raw: continue group_id = f"Row_{idx+1}" # este ID une los chunks del mismo académico # dividir por ';' y también por puntos largos parts = [p.strip() for p in re.split(r';|\n|\.\s{1,2}', str(raw)) if p.strip()] # agrupar por tamaño (ajustable) current_chunk = [] for part in parts: current_chunk.append(part) # cada 3 frases aprox (puedes tunear esto según longitud) if len(current_chunk) >= 3: chunks.append({ "group_id": group_id, "text": ". ".join(current_chunk) }) current_chunk = [] # guardar lo que quede if current_chunk: chunks.append({ "group_id": group_id, "text": ". ".join(current_chunk) }) print(f"✅ Generados {len(chunks)} sub-chunks") return chunks except Exception as e: print(f"Error en text_to_chunks: {e}") return [] # Clase SemanticSearch (reemplazo) # ------------------------- class SemanticSearch: def __init__(self, model_url='https://tfhub.dev/google/universal-sentence-encoder-large/5'): self.use = hub.load(model_url) self.fitted = False def fit(self, data, batch=1000, n_neighbors=6, save_path="embeddings.json"): """ data: lista de strings OR lista de dicts {'group_id':..., 'text':...} """ # Guardar raw data (puede ser dicts) self.data = data # Prepara lista de textos que alimentará al encoder texts_for_emb = [] for item in data: if isinstance(item, dict): texts_for_emb.append(item.get("text", "")) else: texts_for_emb.append(str(item)) self.embeddings = self.get_text_embedding(texts_for_emb, batch=batch) n_neighbors = min(n_neighbors, len(self.embeddings)) self.nn = NearestNeighbors(n_neighbors=n_neighbors, metric="cosine") self.nn.fit(self.embeddings) self.fitted = True emb_list = self.embeddings.tolist() # Guardar tanto los chunks (raw data) como embeddings with open(save_path, "w", encoding="utf-8") as f: json.dump({"chunks": self.data, "embeddings": emb_list}, f, ensure_ascii=False, indent=2) print(f"Embeddings guardados en {save_path}") def load(self, path="embeddings.json", n_neighbors=6): with open(path, "r", encoding="utf-8") as f: obj = json.load(f) self.data = obj["chunks"] # puede ser lista de dicts self.embeddings = np.array(obj["embeddings"]) n_neighbors = min(n_neighbors, len(self.embeddings)) self.nn = NearestNeighbors(n_neighbors=n_neighbors, metric="cosine") self.nn.fit(self.embeddings) self.fitted = True print(f"Embeddings cargados desde {path}") def __call__(self, text, return_data=True, threshold=0.35, top_k=10): inp_emb = self.use([text]) distances, neighbors = self.nn.kneighbors(inp_emb, return_distance=True) neighbors, distances = neighbors[0], distances[0] results = [] for idx, dist in zip(neighbors[:top_k], distances[:top_k]): sim = 1.0 - float(dist) results.append({"chunk": self.data[idx], "distance": float(dist), "similarity": sim}) # filtrar por similarity usando umbral (threshold es distancia en tu UI) filtered = [r for r in results if r["similarity"] >= (1.0 - threshold)] return filtered if return_data else results def get_text_embedding(self, texts, batch=1000): embeddings = [] for i in range(0, len(texts), batch): text_batch = texts[i:(i + batch)] emb_batch = self.use(text_batch) # ahora sí son strings embeddings.append(emb_batch) return np.vstack(embeddings) # ------------------------- # 4.1 Preparar recommender con traducción y mapeo # ------------------------- CACHE = {} # almacenará por base: {'recommender':obj, 'originals':[], 'translated':[]} def prepare_recommender(base_name, sheet_id, column_index=22, sheet_name="Data", service_account_json=SERVICE_ACCOUNT_JSON): safe = base_name.replace(" ", "_").replace(":", "").replace(";", "") embeddings_file = f"embeddings_{safe}.json" translated_excel = f"Data_{safe}_en.xlsx" original_excel = f"Data_{safe}.xlsx" mapping_file = f"mapping_{safe}.json" # Si embeddings existen, cargamos embeddings y mapping if os.path.exists(embeddings_file) and os.path.exists(mapping_file): recommender = SemanticSearch() recommender.load(embeddings_file) with open(mapping_file, "r", encoding="utf-8") as f: mapping = json.load(f) CACHE[base_name] = {"recommender": recommender, "originals": mapping["originals"], "translated": mapping["translated"]} print(f"✅ Model y mapping cargados para {base_name}") return recommender # 1) descargar hoja y guardar excel original path = download_google_sheet(sheet_id, output_path=original_excel, range_name=f"{sheet_name}!A:Z", service_account_json=service_account_json) if path is None: print("❌ Error: no se pudo descargar la hoja.") return None # 2) extraer textos en español import openpyxl wb = openpyxl.load_workbook(original_excel, read_only=True) sheet_name_real = wb.sheetnames[0] # usa la primera hoja wb.close() print(f"📘 Usando hoja '{sheet_name_real}' del Excel descargado") texts_es = excel_to_texts(original_excel, column_index=column_index, sheet_name=sheet_name_real) if not texts_es: print("❌ No hay textos para procesar.") return None # 3) traducir al inglés (con cache) texts_en = translate_texts(texts_es, src="es", tgt="en", cache_path=translated_excel) # Guardar mapping (original <-> translated) mapping = {"originals": texts_es, "translated": texts_en} with open(mapping_file, "w", encoding="utf-8") as f: json.dump(mapping, f, ensure_ascii=False, indent=2) # 4) crear chunks desde textos traducidos y entrenar chunks = text_to_chunks(texts_en, word_length=5000) recommender = SemanticSearch() recommender.fit(chunks, batch=512, n_neighbors=10, save_path=embeddings_file) # 5) Guardar en CACHE para uso inmediato CACHE[base_name] = {"recommender": recommender, "originals": texts_es, "translated": texts_en} return recommender # ------------------------- # Helpers para mapear chunks a original # ------------------------- def get_row_from_chunk(chunk): if isinstance(chunk, dict): gid = chunk.get("group_id", "") # buscar número m = re.search(r'Row[_\-\s]?(\d+)', gid) if m: return int(m.group(1)) - 1 # en caso de que 'text' tenga prefijo antiguo txt = chunk.get("text", "") else: txt = str(chunk) m = re.search(r'\[Row\s+(\d+)\]', txt) if m: return int(m.group(1)) - 1 return None def aggregate_unique_originals(results, originals): best = {} for r in results: chunk = r.get("chunk") row = get_row_from_chunk(chunk) if row is None or row >= len(originals): continue sim = r.get("similarity", 0) if row not in best or sim > best[row]["similarity"]: best[row] = {"original": originals[row], "similarity": sim, "chunk": chunk} ranked = sorted(best.values(), key=lambda x: x["similarity"], reverse=True) return ranked # ------------------------- # 4.2 Recommender plus # ------------------------- def prepare_recommender_plus(base_name, sheet_id_plus, column_index=22, service_account_json=SERVICE_ACCOUNT_JSON): """ Lógica extendida para bases 'Plus'. Combina: - Base simple (misma base sin 'Plus') - Base secundaria (papers) Y genera texts_es extendidos: papers + autores vinculados. """ # ============================================================================ # 4.2.1. Determinar la base simple asociada # ============================================================================ base_simple = base_name.replace(" Plus", "").strip() # Buscar en tu lista estándar base_info_simple = next((b for b in get_bases_info() if b["nombre"] == base_simple), None) if base_info_simple is None: print(f"❌ No existe base simple asociada a {base_name}") return None sheet_id_simple = base_info_simple["url"] # ============================================================================ # 4.2.2. Descargar ambas hojas y guardarlas como Excel # ============================================================================ safe = base_name.replace(" ", "_").replace(":", "").replace(";", "") embeddings_file = f"embeddings_{safe}.json" translated_excel = f"Data_{safe}_en.xlsx" mapping_file = f"mapping_{safe}.json" original_excel_simple = f"Data_{safe}_simple.xlsx" original_excel_plus = f"Data_{safe}_plus.xlsx" # Si existe embeddings: cargar y salir if os.path.exists(embeddings_file) and os.path.exists(mapping_file): recommender = SemanticSearch() recommender.load(embeddings_file) with open(mapping_file, "r", encoding="utf-8") as f: mapping = json.load(f) CACHE[base_name] = {"recommender": recommender, "originals": mapping["originals"], "translated": mapping["translated"]} print(f"✅ Model y mapping cargados para {base_name}") return recommender # Descargar base simple p1 = download_google_sheet(sheet_id_simple, output_path=original_excel_simple, range_name="Data!A:Z", service_account_json=service_account_json) if p1 is None: print("❌ No se pudo descargar la base simple.") return None # Descargar base secundaria (papers) p2 = download_google_sheet(sheet_id_plus, output_path=original_excel_plus, range_name="Data!A:Z", service_account_json=service_account_json) if p2 is None: print("❌ No se pudo descargar la base secundaria.") return None # ============================================================================ # 4.2.3. Cargar data simple: autores válidos (A, W) # ============================================================================ df_simple = pd.read_excel(original_excel_simple, sheet_name=0) fac_autores_validos = {} for _, row in df_simple.iterrows(): nombre = str(row.get("A", "")).strip() texto_w = str(row.get("W", "")).strip() if texto_w: fac_autores_validos[nombre.lower()] = texto_w # ============================================================================ # 4.2.4. Cargar data plus: papers (W) + autores (M) # ============================================================================ df_plus = pd.read_excel(original_excel_plus, sheet_name=0) textos_es_finales = [] for _, row in df_plus.iterrows(): texto_paper = str(row.get("W", "")).strip() autores_raw = str(row.get("M", "")).strip() if texto_paper: textos_es_finales.append(texto_paper) autores = [a.strip() for a in autores_raw.split("\\") if a.strip()] for autor in autores: autor_key = autor.lower() if autor_key in fac_autores_validos: textos_es_finales.append(fac_autores_validos[autor_key]) # ============================================================================ # 4.2.5. Traducir, chunkear y entrenar (mismo proceso que la función original) # ============================================================================ if not textos_es_finales: print("❌ No hay textos para procesar.") return None texts_en = translate_texts(textos_es_finales, src="es", tgt="en", cache_path=translated_excel) mapping = {"originals": textos_es_finales, "translated": texts_en} with open(mapping_file, "w", encoding="utf-8") as f: json.dump(mapping, f, ensure_ascii=False, indent=2) from text_chunker import text_to_chunks chunks = text_to_chunks(texts_en, word_length=5000) recommender = SemanticSearch() recommender.fit(chunks, batch=512, n_neighbors=10, save_path=embeddings_file) CACHE[base_name] = {"recommender": recommender, "originals": textos_es_finales, "translated": texts_en} return recommender # ------------------------- # 5. Funciones UI # ------------------------- def cargar_base(nombre_base): #Parchada base = next((b for b in get_bases_info() if b["nombre"] == nombre_base), None) if base is None: return f"Base '{nombre_base}' no encontrada.", None if nombre_base in CACHE: return f"Base '{nombre_base}' ya cargada.", nombre_base # --- LÓGICA NUEVA: bases Plus --- if nombre_base.endswith(" Plus"): # sheet_id de la base secundaria (Pubs) sheet_id_plus = base["url"] recommender = prepare_recommender_plus(nombre_base, sheet_id_plus) else: # Lógica normal recommender = prepare_recommender(nombre_base, base["url"], column_index=22, sheet_name="Data") if recommender is None: return "Error al preparar la base.", None return f"✅ Base '{nombre_base}' lista.", nombre_base def buscar_investigadores(consulta, base_seleccionada, threshold=None): """ Mantengo el parámetro threshold por compatibilidad pero uso DEFAULT_THRESHOLD fijo. """ if base_seleccionada not in CACHE: return "Primero selecciona y carga una base." # traducir consulta try: consulta_en = GoogleTranslator(source='es', target='en').translate(consulta) except Exception as e: print(f"Error traduciendo consulta: {e}") consulta_en = consulta recommender = CACHE[base_seleccionada]["recommender"] originals = CACHE[base_seleccionada]["originals"] # usar tolerancia fija DEFAULT_THRESHOLD results = recommender(consulta_en, threshold=1 - DEFAULT_THRESHOLD, top_k=30) if not results: return "No se encontraron coincidencias con el umbral actual. Prueba con otra consulta." ranked = aggregate_unique_originals(results, originals) # Formatear salida mostrando similitud salida = [] for r in ranked[:10]: sim_pct = int(r["similarity"] * 100) salida.append(f"[{sim_pct}%] {r['original']}") return "\n\n".join(salida) # 6. Interfaz Gradio # ------------------------- Bases_info = get_bases_info() nombres_bases = [b["nombre"] for b in Bases_info] css_code = """ .background-video { position: fixed; top: 0; left: 0; width: 100%; height: 100%; object-fit: cover; opacity: 0.9; filter: blur(1px); z-index: 0; } /* Header con tres logos */ .header-bar { display: flex; justify-content: center; align-items: center; gap: 60px; position: relative; z-index: 3; margin-top: 15px; } .header-bar img { max-height: 150px; width: auto; } /* Logo central un poco más grande */ .header-bar .center-logo { max-height: 180px; width: auto; } /* Footer */ .footer-logo { position: relative; bottom: 15px; left: 50%; transform: translateX(-50%); max-height: 165px; width: auto; opacity: 0.9; z-index: 3; } /* Contenido principal */ .z-index-1 { position: relative; z-index: 10; /* 🔼 ahora encima de los logos */ } /* Fondo transparente sin scroll */ .gradio-container { background: transparent !important; overflow: hidden; } /* Título visible */ h1 { color: #000000 !important; } /* Botón naranja inline a la derecha del textbox */ .inline-row { display: flex; gap: 8px; align-items: center; } .btn-orange { background-color: #ff7f11 !important; color: white !important; border: none !important; padding: 8px 14px !important; font-weight: 600 !important; border-radius: 8px !important; } """ with gr.Blocks(css=css_code, title="Cyta: Buscador de capacidades académicas") as demo: # --- Video de fondo --- gr.HTML( '' ) # --- Logos --- gr.HTML( '
' ) # --- Encabezado principal --- gr.Markdown( "
')
demo.launch()