import spacy from spacy.tokens import Span from typing import List, Dict, Any, Optional import uuid from src.ontology.models import PromptIR, CharacterInstance, SceneInstance, IntentGraph, IntentNode, IntentEdge from src.ontology.matcher import ConceptMatcher from src.embeddings.engine import EmbeddingEngine from src.runtime.translator import OfflineTranslator class Parser: def __init__(self, matcher: ConceptMatcher, embedding_engine: Optional[Any] = None, language: str = "en"): # We now always use an English base model because we translate inputs to English. # 'en_core_web_trf' is more accurate for entity extraction, but we fallback to 'sm' if not available. try: self.nlp = spacy.load("en_core_web_trf") except: self.nlp = spacy.load("en_core_web_sm") self.matcher = matcher self.embedding_engine = embedding_engine self.translator = OfflineTranslator() if language == "es" else None def _get_chunk_text(self, token): # Gather compounds and amods recursively (including Spanish-specific ones) modifiers = [] for child in token.children: if child.dep_ in ("amod", "compound", "nummod", "npadvmod", "flat", "nmod"): modifiers.extend(self._get_chunk_text(child)) modifiers.append(token.text) return modifiers def _resolve_chunk(self, chunk_text: str, root_token, trace: Dict): match_results = self.matcher.match(chunk_text) if not match_results: return self._handle_unresolved_chunk(chunk_text, root_token) result = match_results[0] best_match = result["record"] method = result["method"] conf = result["score"] # --- Disambiguation Layer --- if best_match.category in ("character", "hair_color", "eye_color", "style", "emotion"): best_match, method, conf = self._apply_disambiguation(best_match, root_token, match_results, method, conf) return best_match, method, conf def _apply_disambiguation(self, best_match, root_token, match_results, method, conf): has_possessive = any(child.dep_ == "poss" for child in root_token.children) has_article = any(child.dep_ == "det" for child in root_token.children) if not (has_possessive or (has_article and not root_token.text[0].isupper())): return best_match, method, conf # Try to find an alternative that makes sense for a noun (e.g., accessory, effect, scene, special) # Avoid picking another character or color if we are disambiguating alt_result = next((m for m in match_results if m["record"].category not in ("character", "hair_color", "eye_color", "style", "emotion")), None) if alt_result: return alt_result["record"], "disambiguated_alt", alt_result["score"] return None, "demoted_to_generic", 0.0 def _handle_unresolved_chunk(self, chunk_text: str, root_token): # 1. Soft Matching (Noise stripping) noise_words = {"elegant", "luxurious", "beautiful", "cute", "cool", "amazing", "classic", "modern", "stylish"} words = chunk_text.split() if any(w in noise_words for w in words): clean_text = " ".join([w for w in words if w not in noise_words]).strip() if clean_text: soft_matches = self.matcher.match(clean_text) if soft_matches: return soft_matches[0]["record"], "alias", 0.95 # 2. Semantic Retrieval Fallback if not self.embedding_engine: return None, "none", 0.0 target_cat = self._guess_category(root_token) results = self.embedding_engine.search(chunk_text, category=target_cat, top_k=1) if not results: results = self.embedding_engine.search(chunk_text, category=None, top_k=1) if results: record, score = results[0] # --- Anti-Hallucination Thresholding --- # Characters are prone to false positives with common phrases. Require very high confidence. threshold = 0.90 if record.category == "character" else 0.65 if score >= threshold: return record, "semantic", score return None, "none", 0.0 def _guess_category(self, root_token) -> Optional[str]: if root_token.dep_ in ("nsubj", "nsubjpass"): return "character" if root_token.head.lemma_ in ("wear", "have", "in", "with"): return "clothing" return None def _build_intent_graph(self, doc, safe_mode: bool, trace: Dict) -> IntentGraph: graph = IntentGraph() chunks = self._extract_chunks(doc) for chunk_text, root_token in chunks: best_match, method, conf = self._resolve_chunk(chunk_text, root_token, trace) if not best_match: self._add_fallback_node(graph, chunk_text, root_token, trace) continue # Update trace and stats self._update_resolution_trace(trace, chunk_text, best_match, method, conf, safe_mode) if safe_mode and best_match.nsfw: continue node = IntentNode(id=str(uuid.uuid4()), label=best_match.canonical, category=best_match.category, confidence=conf) # Semantic attribute recovery if method == "semantic": self._recover_attributes(node, chunk_text, best_match) graph.nodes[node.id] = node self._build_edges(graph) return graph def _extract_chunks(self, doc): chunks = [] visited = set() # 1. Extract Named Entities (often captures Character Names even if POS is weird) for ent in doc.ents: words = ent.text.lower().strip() chunks.append((words, ent.root)) for token in ent: visited.add(token.i) # 2. Extract Noun Chunks (captures things like "red dress" or "vestido rojo") for chunk in doc.noun_chunks: # Skip if already captured by entities if any(token.i in visited for token in chunk): continue words = chunk.text.lower().strip() chunks.append((words, chunk.root)) for token in chunk: visited.add(token.i) # 3. Fallback: standalone adjectives (like "elegant", "cute") or unvisited nouns for token in doc: if token.i in visited: continue if token.pos_ in ("ADJ", "NOUN", "PROPN") or (token.pos_ == "NUM" and token.text.istitle()): # Try to gather simple modifiers if not already visited words = [] for child in token.children: if child.dep_ in ("amod", "compound", "flat", "nmod") and child.i not in visited: words.append(child) visited.add(child.i) words.append(token) visited.add(token.i) # Sort by original token index words = sorted(words, key=lambda t: t.i) text = " ".join([t.text for t in words]).lower().strip() chunks.append((text, token)) return chunks def _update_resolution_trace(self, trace, chunk_text, best_match, method, conf, safe_mode): if "resolution_stats" not in trace: trace["resolution_stats"] = {} trace["resolution_stats"][method] = trace["resolution_stats"].get(method, 0) + 1 trace["resolved"][chunk_text] = { "canonical": best_match.canonical, "confidence": conf, "method": method, "status": "filtered_nsfw" if (safe_mode and best_match.nsfw) else "added" } def _recover_attributes(self, node, chunk_text, best_match): words_in = set(chunk_text.split()) words_out = set(best_match.canonical.lower().split()) diff = words_in - words_out if diff: node.attributes.extend(list(diff)) def _add_fallback_node(self, graph, chunk_text, root_token, trace): if root_token.pos_ in ("ADJ", "NOUN", "PROPN") or (root_token.pos_ == "NUM" and root_token.text.istitle()): node = IntentNode(id=str(uuid.uuid4()), label=chunk_text, category="attribute", confidence=0.5) graph.nodes[node.id] = node trace["resolved"][chunk_text] = {"canonical": chunk_text, "confidence": 0.5, "method": "fallback_attribute", "status": "added"} def _build_edges(self, graph): chars = [n for n in graph.nodes.values() if n.category == "character"] if not chars: return main_char = chars[0] for node in graph.nodes.values(): if node.id != main_char.id and node.category not in ["scene", "lighting", "atmosphere"]: graph.edges.append(IntentEdge(source_id=main_char.id, target_id=node.id, relation="has_attribute")) def parse(self, text: str, safe_mode: bool = True) -> PromptIR: # Step 0: Translate to English if needed (Always-to-English Pipeline) working_text = text if self.translator: working_text = self.translator.translate(text) doc = self.nlp(working_text) ir = PromptIR(safe_mode=safe_mode) ir.trace["input_original"] = text ir.trace["input_working"] = working_text ir.trace["resolved"] = {} # Build intent graph handles chunk extraction and matching graph = self._build_intent_graph(doc, safe_mode, ir.trace) ir.intent_graph = graph self._map_characters_to_ir(graph, ir) self._map_global_nodes_to_ir(graph, ir) return ir def _normalize_text(self, text: str) -> str: # Deprecated: Kept for interface compatibility but returns original text. # Now that we have native offline Spanish aliases, regex normalization causes Spanglish. return text def _map_characters_to_ir(self, graph, ir): char_nodes = [n for n in graph.nodes.values() if n.category == "character"] if not char_nodes: dummy = IntentNode(id="subject", label="Subject", category="character") graph.nodes["subject"] = dummy char_nodes.append(dummy) for cnode in char_nodes: ci = CharacterInstance(name=cnode.label) if cnode.attributes: ci.appearance.extend(cnode.attributes) attached = [e.target_id for e in graph.edges if e.source_id == cnode.id] for tid in attached: tnode = graph.nodes[tid] self._assign_attribute_to_character(ci, tnode.category, tnode.label) ir.characters.append(ci) def _map_global_nodes_to_ir(self, graph, ir): main_char = ir.characters[0] for node in graph.nodes.values(): cat, label = node.category, node.label if cat == "scene": ir.scene.locations.append(label) elif cat == "lighting": ir.scene.lighting.append(label) elif cat == "atmosphere": ir.scene.atmosphere.append(label) elif cat == "style": ir.style.append(label) elif cat == "effect": ir.effects.append(label) elif cat == "special": ir.technical_details.append(label) elif cat not in ["character", "scene", "lighting", "atmosphere", "style", "effect", "special"]: self._assign_attribute_to_character(main_char, cat, label) def _assign_attribute_to_character(self, char_instance, category, label): if category == "clothing" and label not in char_instance.clothing: char_instance.clothing.append(label) elif category in ["hairstyle", "hair_color", "eye_color", "attribute"] and label not in char_instance.appearance: char_instance.appearance.append(label) elif category == "accessory" and label not in char_instance.accessories: char_instance.accessories.append(label) elif category == "pose" and label not in char_instance.pose: char_instance.pose.append(label) elif category == "emotion" and label not in char_instance.expression: char_instance.expression.append(label)