Spaces:
Sleeping
Sleeping
| import spacy | |
| from spacy.tokens import Span | |
| from typing import List, Dict, Any, Optional | |
| import uuid | |
| from src.ontology.models import PromptIR, CharacterInstance, SceneInstance, IntentGraph, IntentNode, IntentEdge | |
| from src.ontology.matcher import ConceptMatcher | |
| from src.embeddings.engine import EmbeddingEngine | |
| from src.runtime.translator import OfflineTranslator | |
| class Parser: | |
| def __init__(self, matcher: ConceptMatcher, embedding_engine: Optional[Any] = None, language: str = "en"): | |
| # We now always use an English base model because we translate inputs to English. | |
| # 'en_core_web_trf' is more accurate for entity extraction, but we fallback to 'sm' if not available. | |
| try: | |
| self.nlp = spacy.load("en_core_web_trf") | |
| except: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| self.matcher = matcher | |
| self.embedding_engine = embedding_engine | |
| self.translator = OfflineTranslator() if language == "es" else None | |
| def _get_chunk_text(self, token): | |
| # Gather compounds and amods recursively (including Spanish-specific ones) | |
| modifiers = [] | |
| for child in token.children: | |
| if child.dep_ in ("amod", "compound", "nummod", "npadvmod", "flat", "nmod"): | |
| modifiers.extend(self._get_chunk_text(child)) | |
| modifiers.append(token.text) | |
| return modifiers | |
| def _resolve_chunk(self, chunk_text: str, root_token, trace: Dict): | |
| match_results = self.matcher.match(chunk_text) | |
| if not match_results: | |
| return self._handle_unresolved_chunk(chunk_text, root_token) | |
| result = match_results[0] | |
| best_match = result["record"] | |
| method = result["method"] | |
| conf = result["score"] | |
| # --- Disambiguation Layer --- | |
| if best_match.category in ("character", "hair_color", "eye_color", "style", "emotion"): | |
| best_match, method, conf = self._apply_disambiguation(best_match, root_token, match_results, method, conf) | |
| return best_match, method, conf | |
| def _apply_disambiguation(self, best_match, root_token, match_results, method, conf): | |
| has_possessive = any(child.dep_ == "poss" for child in root_token.children) | |
| has_article = any(child.dep_ == "det" for child in root_token.children) | |
| if not (has_possessive or (has_article and not root_token.text[0].isupper())): | |
| return best_match, method, conf | |
| # Try to find an alternative that makes sense for a noun (e.g., accessory, effect, scene, special) | |
| # Avoid picking another character or color if we are disambiguating | |
| alt_result = next((m for m in match_results if m["record"].category not in ("character", "hair_color", "eye_color", "style", "emotion")), None) | |
| if alt_result: | |
| return alt_result["record"], "disambiguated_alt", alt_result["score"] | |
| return None, "demoted_to_generic", 0.0 | |
| def _handle_unresolved_chunk(self, chunk_text: str, root_token): | |
| # 1. Soft Matching (Noise stripping) | |
| noise_words = {"elegant", "luxurious", "beautiful", "cute", "cool", "amazing", "classic", "modern", "stylish"} | |
| words = chunk_text.split() | |
| if any(w in noise_words for w in words): | |
| clean_text = " ".join([w for w in words if w not in noise_words]).strip() | |
| if clean_text: | |
| soft_matches = self.matcher.match(clean_text) | |
| if soft_matches: | |
| return soft_matches[0]["record"], "alias", 0.95 | |
| # 2. Semantic Retrieval Fallback | |
| if not self.embedding_engine: | |
| return None, "none", 0.0 | |
| target_cat = self._guess_category(root_token) | |
| results = self.embedding_engine.search(chunk_text, category=target_cat, top_k=1) | |
| if not results: | |
| results = self.embedding_engine.search(chunk_text, category=None, top_k=1) | |
| if results: | |
| record, score = results[0] | |
| # --- Anti-Hallucination Thresholding --- | |
| # Characters are prone to false positives with common phrases. Require very high confidence. | |
| threshold = 0.90 if record.category == "character" else 0.65 | |
| if score >= threshold: | |
| return record, "semantic", score | |
| return None, "none", 0.0 | |
| def _guess_category(self, root_token) -> Optional[str]: | |
| if root_token.dep_ in ("nsubj", "nsubjpass"): | |
| return "character" | |
| if root_token.head.lemma_ in ("wear", "have", "in", "with"): | |
| return "clothing" | |
| return None | |
| def _build_intent_graph(self, doc, safe_mode: bool, trace: Dict) -> IntentGraph: | |
| graph = IntentGraph() | |
| chunks = self._extract_chunks(doc) | |
| for chunk_text, root_token in chunks: | |
| best_match, method, conf = self._resolve_chunk(chunk_text, root_token, trace) | |
| if not best_match: | |
| self._add_fallback_node(graph, chunk_text, root_token, trace) | |
| continue | |
| # Update trace and stats | |
| self._update_resolution_trace(trace, chunk_text, best_match, method, conf, safe_mode) | |
| if safe_mode and best_match.nsfw: | |
| continue | |
| node = IntentNode(id=str(uuid.uuid4()), label=best_match.canonical, category=best_match.category, confidence=conf) | |
| # Semantic attribute recovery | |
| if method == "semantic": | |
| self._recover_attributes(node, chunk_text, best_match) | |
| graph.nodes[node.id] = node | |
| self._build_edges(graph) | |
| return graph | |
| def _extract_chunks(self, doc): | |
| chunks = [] | |
| visited = set() | |
| # 1. Extract Named Entities (often captures Character Names even if POS is weird) | |
| for ent in doc.ents: | |
| words = ent.text.lower().strip() | |
| chunks.append((words, ent.root)) | |
| for token in ent: | |
| visited.add(token.i) | |
| # 2. Extract Noun Chunks (captures things like "red dress" or "vestido rojo") | |
| for chunk in doc.noun_chunks: | |
| # Skip if already captured by entities | |
| if any(token.i in visited for token in chunk): | |
| continue | |
| words = chunk.text.lower().strip() | |
| chunks.append((words, chunk.root)) | |
| for token in chunk: | |
| visited.add(token.i) | |
| # 3. Fallback: standalone adjectives (like "elegant", "cute") or unvisited nouns | |
| for token in doc: | |
| if token.i in visited: | |
| continue | |
| if token.pos_ in ("ADJ", "NOUN", "PROPN") or (token.pos_ == "NUM" and token.text.istitle()): | |
| # Try to gather simple modifiers if not already visited | |
| words = [] | |
| for child in token.children: | |
| if child.dep_ in ("amod", "compound", "flat", "nmod") and child.i not in visited: | |
| words.append(child) | |
| visited.add(child.i) | |
| words.append(token) | |
| visited.add(token.i) | |
| # Sort by original token index | |
| words = sorted(words, key=lambda t: t.i) | |
| text = " ".join([t.text for t in words]).lower().strip() | |
| chunks.append((text, token)) | |
| return chunks | |
| def _update_resolution_trace(self, trace, chunk_text, best_match, method, conf, safe_mode): | |
| if "resolution_stats" not in trace: | |
| trace["resolution_stats"] = {} | |
| trace["resolution_stats"][method] = trace["resolution_stats"].get(method, 0) + 1 | |
| trace["resolved"][chunk_text] = { | |
| "canonical": best_match.canonical, | |
| "confidence": conf, | |
| "method": method, | |
| "status": "filtered_nsfw" if (safe_mode and best_match.nsfw) else "added" | |
| } | |
| def _recover_attributes(self, node, chunk_text, best_match): | |
| words_in = set(chunk_text.split()) | |
| words_out = set(best_match.canonical.lower().split()) | |
| diff = words_in - words_out | |
| if diff: | |
| node.attributes.extend(list(diff)) | |
| def _add_fallback_node(self, graph, chunk_text, root_token, trace): | |
| if root_token.pos_ in ("ADJ", "NOUN", "PROPN") or (root_token.pos_ == "NUM" and root_token.text.istitle()): | |
| node = IntentNode(id=str(uuid.uuid4()), label=chunk_text, category="attribute", confidence=0.5) | |
| graph.nodes[node.id] = node | |
| trace["resolved"][chunk_text] = {"canonical": chunk_text, "confidence": 0.5, "method": "fallback_attribute", "status": "added"} | |
| def _build_edges(self, graph): | |
| chars = [n for n in graph.nodes.values() if n.category == "character"] | |
| if not chars: return | |
| main_char = chars[0] | |
| for node in graph.nodes.values(): | |
| if node.id != main_char.id and node.category not in ["scene", "lighting", "atmosphere"]: | |
| graph.edges.append(IntentEdge(source_id=main_char.id, target_id=node.id, relation="has_attribute")) | |
| def parse(self, text: str, safe_mode: bool = True) -> PromptIR: | |
| # Step 0: Translate to English if needed (Always-to-English Pipeline) | |
| working_text = text | |
| if self.translator: | |
| working_text = self.translator.translate(text) | |
| doc = self.nlp(working_text) | |
| ir = PromptIR(safe_mode=safe_mode) | |
| ir.trace["input_original"] = text | |
| ir.trace["input_working"] = working_text | |
| ir.trace["resolved"] = {} | |
| # Build intent graph handles chunk extraction and matching | |
| graph = self._build_intent_graph(doc, safe_mode, ir.trace) | |
| ir.intent_graph = graph | |
| self._map_characters_to_ir(graph, ir) | |
| self._map_global_nodes_to_ir(graph, ir) | |
| return ir | |
| def _normalize_text(self, text: str) -> str: | |
| # Deprecated: Kept for interface compatibility but returns original text. | |
| # Now that we have native offline Spanish aliases, regex normalization causes Spanglish. | |
| return text | |
| def _map_characters_to_ir(self, graph, ir): | |
| char_nodes = [n for n in graph.nodes.values() if n.category == "character"] | |
| if not char_nodes: | |
| dummy = IntentNode(id="subject", label="Subject", category="character") | |
| graph.nodes["subject"] = dummy | |
| char_nodes.append(dummy) | |
| for cnode in char_nodes: | |
| ci = CharacterInstance(name=cnode.label) | |
| if cnode.attributes: | |
| ci.appearance.extend(cnode.attributes) | |
| attached = [e.target_id for e in graph.edges if e.source_id == cnode.id] | |
| for tid in attached: | |
| tnode = graph.nodes[tid] | |
| self._assign_attribute_to_character(ci, tnode.category, tnode.label) | |
| ir.characters.append(ci) | |
| def _map_global_nodes_to_ir(self, graph, ir): | |
| main_char = ir.characters[0] | |
| for node in graph.nodes.values(): | |
| cat, label = node.category, node.label | |
| if cat == "scene": | |
| ir.scene.locations.append(label) | |
| elif cat == "lighting": | |
| ir.scene.lighting.append(label) | |
| elif cat == "atmosphere": | |
| ir.scene.atmosphere.append(label) | |
| elif cat == "style": | |
| ir.style.append(label) | |
| elif cat == "effect": | |
| ir.effects.append(label) | |
| elif cat == "special": | |
| ir.technical_details.append(label) | |
| elif cat not in ["character", "scene", "lighting", "atmosphere", "style", "effect", "special"]: | |
| self._assign_attribute_to_character(main_char, cat, label) | |
| def _assign_attribute_to_character(self, char_instance, category, label): | |
| if category == "clothing" and label not in char_instance.clothing: | |
| char_instance.clothing.append(label) | |
| elif category in ["hairstyle", "hair_color", "eye_color", "attribute"] and label not in char_instance.appearance: | |
| char_instance.appearance.append(label) | |
| elif category == "accessory" and label not in char_instance.accessories: | |
| char_instance.accessories.append(label) | |
| elif category == "pose" and label not in char_instance.pose: | |
| char_instance.pose.append(label) | |
| elif category == "emotion" and label not in char_instance.expression: | |
| char_instance.expression.append(label) | |