Spaces:
Sleeping
Sleeping
| # agent.py — AGENTE SEMÁNTICO CON SÍNTESIS INTELIGENTE v1.0 | |
| import os | |
| import time | |
| import logging | |
| import numpy as np | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
| from typing import Optional, Tuple, List, Dict, Any | |
| from sentence_transformers import CrossEncoder, SentenceTransformer | |
| import faiss | |
| import spacy | |
| from spacy.lang.en import English | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| try: | |
| NLP = spacy.load("en_core_web_sm") | |
| logger.info("✅ spaCy 'en_core_web_sm' cargado.") | |
| except OSError: | |
| logger.info("📥 Descargando 'en_core_web_sm'...") | |
| from spacy.cli import download | |
| download("en_core_web_sm") | |
| NLP = spacy.load("en_core_web_sm") | |
| logger.info("✅ spaCy 'en_core_web_sm' descargado y cargado.") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error con spaCy: {e}. Usando tokenizer básico.") | |
| NLP = English() | |
| NLP.add_pipe("sentencizer") | |
| class ImprovedSemanticAgent: | |
| def __init__(self): | |
| logger.info("🚀 Cargando modelo de embeddings (bge-small-en-v1.5)...") | |
| self.embedding_model = SentenceTransformer('BAAI/bge-small-en-v1.5') | |
| self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L6-v2', max_length=512) | |
| self.index = None | |
| self.indexed_examples = [] | |
| self.is_ready = False | |
| self.total_indexed = 0 | |
| self.searches_performed = 0 | |
| logger.info("✅ Agente semántico inicializado") | |
| def _lazy_init(self) -> str: | |
| if self.is_ready: | |
| return "✅ Agente ya inicializado" | |
| try: | |
| with ThreadPoolExecutor() as executor: | |
| future = executor.submit(self._load_precomputed_index) | |
| try: | |
| return future.result(timeout=60) | |
| except FutureTimeoutError: | |
| return "❌ Timeout inicializando agente" | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}" | |
| def _load_precomputed_index(self) -> str: | |
| if not os.path.exists("faiss_index.bin") or not os.path.exists("metadata.json"): | |
| return "❌ Archivos de índice no encontrados" | |
| self.index = faiss.read_index("faiss_index.bin") | |
| with open("metadata.json", 'r', encoding='utf-8') as f: | |
| import json | |
| self.indexed_examples = json.load(f) | |
| self.total_indexed = len(self.indexed_examples) | |
| self.is_ready = True | |
| return f"✅ ¡Listo! {self.total_indexed:,} ejemplos cargados" | |
| def _extract_core_entities(self, text: str) -> set: | |
| if not text.strip(): | |
| return set() | |
| doc = NLP(text.lower()) | |
| entities = set() | |
| for token in doc: | |
| if token.pos_ in ("NOUN", "PROPN", "ADJ") and len(token.text) >= 3 and not token.is_stop: | |
| entities.add(token.lemma_) | |
| for chunk in doc.noun_chunks: | |
| if len(chunk.text) > 2 and not all(t.is_stop for t in chunk): | |
| entities.add(chunk.lemma_.replace(" ", "_")) | |
| text_lower = text.lower() | |
| if "fire" in text_lower or "flame" in text_lower: | |
| entities.add("on_fire") | |
| if "ice" in text_lower or "frozen" in text_lower: | |
| entities.add("frozen") | |
| if "gold" in text_lower or "golden" in text_lower: | |
| entities.add("golden") | |
| return entities | |
| def enhance_prompt(self, user_prompt: str, category: str = "auto") -> Tuple[str, str]: | |
| if not self.is_ready: | |
| init_status = self._lazy_init() | |
| if not self.is_ready: | |
| return user_prompt, f"⚠️ {init_status}" | |
| start_time = time.time() | |
| self.searches_performed += 1 | |
| enhanced, search_info = self._do_enhancement(user_prompt, category) | |
| elapsed = time.time() - start_time | |
| return enhanced, f"{search_info} (Tiempo: {elapsed:.2f}s)" | |
| def _do_enhancement(self, user_prompt: str, category: str) -> Tuple[str, str]: | |
| try: | |
| logger.info(f"🔍 Analizando: '{user_prompt}'") | |
| query_embedding = self.embedding_model.encode([user_prompt], convert_to_numpy=True, normalize_embeddings=True)[0] | |
| query_embedding = query_embedding.astype('float32').reshape(1, -1) | |
| distances, indices = self.index.search(query_embedding, 5) | |
| candidates = [] | |
| for idx in indices[0]: | |
| if idx < len(self.indexed_examples): | |
| candidates.append(self.indexed_examples[idx]['caption']) | |
| if not candidates: | |
| return self._structural_fallback(user_prompt, category), "🔧 Fallback estructural" | |
| user_words = set(user_prompt.lower().split()) | |
| all_parts = [] | |
| for caption in candidates: | |
| parts = [p.strip() for p in caption.split(',') if 8 <= len(p) <= 120] | |
| for part in parts: | |
| part_lower = part.lower() | |
| if len(set(part_lower.split()) - user_words) >= 2: | |
| all_parts.append(part) | |
| seen = set() | |
| unique_parts = [] | |
| for p in all_parts: | |
| if p not in seen: | |
| unique_parts.append(p) | |
| seen.add(p) | |
| selected = unique_parts[:6] | |
| if selected: | |
| additions = ", ".join(selected) | |
| enhanced = f"{user_prompt}, {additions}" | |
| return enhanced, f"✨ Prompt sintetizado con {len(candidates)} ejemplos" | |
| else: | |
| return self._structural_fallback(user_prompt, category), "🔧 Fallback estructural (sin frases útiles)" | |
| except Exception as e: | |
| logger.error(f"❌ Error en _do_enhancement: {e}") | |
| return user_prompt, f"❌ Error: {str(e)}" | |
| def _structural_fallback(self, prompt: str, category: str) -> str: | |
| enhancements = { | |
| "entity": ", highly detailed, sharp focus, professional photography, 8k resolution", | |
| "composition": ", cinematic composition, atmospheric perspective, golden hour, ultra-detailed", | |
| "style": ", artistic rendering, masterpiece, vibrant colors, museum quality", | |
| "imaginative": ", fantasy art, dreamlike atmosphere, magical lighting, intricate details", | |
| "text": ", typography design, clear lettering, high contrast, professional layout" | |
| } | |
| return prompt + enhancements.get(category, ", high quality, detailed, professional, 8k resolution") | |
| def get_semantic_example(self, category: str, user_prompt: str = "") -> Optional[str]: | |
| try: | |
| if not self.is_ready: | |
| return "⚠️ Agente no inicializado" | |
| search_text = user_prompt if user_prompt.strip() else "detailed professional artwork" | |
| search_embedding = self.embedding_model.encode([search_text], convert_to_numpy=True, normalize_embeddings=True)[0] | |
| search_embedding = search_embedding.astype('float32').reshape(1, -1) | |
| k = min(20, len(self.indexed_examples)) | |
| distances, indices = self.index.search(search_embedding, k) | |
| used_indices = getattr(self, '_used_indices', set()) | |
| for idx in indices[0]: | |
| if idx < len(self.indexed_examples) and idx not in used_indices: | |
| used_indices.add(idx) | |
| self._used_indices = used_indices | |
| return self.indexed_examples[idx]['caption'] | |
| self._used_indices = set() | |
| if indices[0].size > 0 and indices[0][0] < len(self.indexed_examples): | |
| idx = indices[0][0] | |
| self._used_indices.add(idx) | |
| return self.indexed_examples[idx]['caption'] | |
| return "🔍 No encontrado" | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}" | |
| def get_stats(self) -> Dict: | |
| return { | |
| "agente": { | |
| "total_indexado": self.total_indexed, | |
| "búsquedas_realizadas": self.searches_performed, | |
| "listo": self.is_ready | |
| } | |
| } |