JairoDanielMT's picture
Upload src/parser/parser.py with huggingface_hub
9cecc38 verified
Raw
History Blame Contribute Delete
13.1 kB
import spacy
from spacy.tokens import Span
from typing import List, Dict, Any, Optional
import uuid
from src.ontology.models import PromptIR, CharacterInstance, SceneInstance, IntentGraph, IntentNode, IntentEdge
from src.ontology.matcher import ConceptMatcher
from src.embeddings.engine import EmbeddingEngine
from src.runtime.translator import OfflineTranslator
class Parser:
def __init__(self, matcher: ConceptMatcher, embedding_engine: Optional[Any] = None, language: str = "en"):
# We now always use an English base model because we translate inputs to English.
# 'en_core_web_trf' is more accurate for entity extraction, but we fallback to 'sm' if not available.
try:
self.nlp = spacy.load("en_core_web_trf")
except:
self.nlp = spacy.load("en_core_web_sm")
self.matcher = matcher
self.embedding_engine = embedding_engine
self.translator = OfflineTranslator() if language == "es" else None
def _get_chunk_text(self, token):
# Gather compounds and amods recursively (including Spanish-specific ones)
modifiers = []
for child in token.children:
if child.dep_ in ("amod", "compound", "nummod", "npadvmod", "flat", "nmod"):
modifiers.extend(self._get_chunk_text(child))
modifiers.append(token.text)
return modifiers
def _resolve_chunk(self, chunk_text: str, root_token, trace: Dict):
match_results = self.matcher.match(chunk_text)
if not match_results:
return self._handle_unresolved_chunk(chunk_text, root_token)
result = match_results[0]
best_match = result["record"]
method = result["method"]
conf = result["score"]
# --- Disambiguation Layer ---
if best_match.category in ("character", "hair_color", "eye_color", "style", "emotion"):
best_match, method, conf = self._apply_disambiguation(best_match, root_token, match_results, method, conf)
return best_match, method, conf
def _apply_disambiguation(self, best_match, root_token, match_results, method, conf):
has_possessive = any(child.dep_ == "poss" for child in root_token.children)
has_article = any(child.dep_ == "det" for child in root_token.children)
if not (has_possessive or (has_article and not root_token.text[0].isupper())):
return best_match, method, conf
# Try to find an alternative that makes sense for a noun (e.g., accessory, effect, scene, special)
# Avoid picking another character or color if we are disambiguating
alt_result = next((m for m in match_results if m["record"].category not in ("character", "hair_color", "eye_color", "style", "emotion")), None)
if alt_result:
return alt_result["record"], "disambiguated_alt", alt_result["score"]
return None, "demoted_to_generic", 0.0
def _handle_unresolved_chunk(self, chunk_text: str, root_token):
# 1. Soft Matching (Noise stripping)
noise_words = {"elegant", "luxurious", "beautiful", "cute", "cool", "amazing", "classic", "modern", "stylish"}
words = chunk_text.split()
if any(w in noise_words for w in words):
clean_text = " ".join([w for w in words if w not in noise_words]).strip()
if clean_text:
soft_matches = self.matcher.match(clean_text)
if soft_matches:
return soft_matches[0]["record"], "alias", 0.95
# 2. Semantic Retrieval Fallback
if not self.embedding_engine:
return None, "none", 0.0
target_cat = self._guess_category(root_token)
results = self.embedding_engine.search(chunk_text, category=target_cat, top_k=1)
if not results:
results = self.embedding_engine.search(chunk_text, category=None, top_k=1)
if results:
record, score = results[0]
# --- Anti-Hallucination Thresholding ---
# Characters are prone to false positives with common phrases. Require very high confidence.
threshold = 0.90 if record.category == "character" else 0.65
if score >= threshold:
return record, "semantic", score
return None, "none", 0.0
def _guess_category(self, root_token) -> Optional[str]:
if root_token.dep_ in ("nsubj", "nsubjpass"):
return "character"
if root_token.head.lemma_ in ("wear", "have", "in", "with"):
return "clothing"
return None
def _build_intent_graph(self, doc, safe_mode: bool, trace: Dict) -> IntentGraph:
graph = IntentGraph()
chunks = self._extract_chunks(doc)
for chunk_text, root_token in chunks:
best_match, method, conf = self._resolve_chunk(chunk_text, root_token, trace)
if not best_match:
self._add_fallback_node(graph, chunk_text, root_token, trace)
continue
# Update trace and stats
self._update_resolution_trace(trace, chunk_text, best_match, method, conf, safe_mode)
if safe_mode and best_match.nsfw:
continue
node = IntentNode(id=str(uuid.uuid4()), label=best_match.canonical, category=best_match.category, confidence=conf)
# Semantic attribute recovery
if method == "semantic":
self._recover_attributes(node, chunk_text, best_match)
graph.nodes[node.id] = node
self._build_edges(graph)
return graph
def _extract_chunks(self, doc):
chunks = []
visited = set()
# 1. Extract Named Entities (often captures Character Names even if POS is weird)
for ent in doc.ents:
words = ent.text.lower().strip()
chunks.append((words, ent.root))
for token in ent:
visited.add(token.i)
# 2. Extract Noun Chunks (captures things like "red dress" or "vestido rojo")
for chunk in doc.noun_chunks:
# Skip if already captured by entities
if any(token.i in visited for token in chunk):
continue
words = chunk.text.lower().strip()
chunks.append((words, chunk.root))
for token in chunk:
visited.add(token.i)
# 3. Fallback: standalone adjectives (like "elegant", "cute") or unvisited nouns
for token in doc:
if token.i in visited:
continue
if token.pos_ in ("ADJ", "NOUN", "PROPN") or (token.pos_ == "NUM" and token.text.istitle()):
# Try to gather simple modifiers if not already visited
words = []
for child in token.children:
if child.dep_ in ("amod", "compound", "flat", "nmod") and child.i not in visited:
words.append(child)
visited.add(child.i)
words.append(token)
visited.add(token.i)
# Sort by original token index
words = sorted(words, key=lambda t: t.i)
text = " ".join([t.text for t in words]).lower().strip()
chunks.append((text, token))
return chunks
def _update_resolution_trace(self, trace, chunk_text, best_match, method, conf, safe_mode):
if "resolution_stats" not in trace:
trace["resolution_stats"] = {}
trace["resolution_stats"][method] = trace["resolution_stats"].get(method, 0) + 1
trace["resolved"][chunk_text] = {
"canonical": best_match.canonical,
"confidence": conf,
"method": method,
"status": "filtered_nsfw" if (safe_mode and best_match.nsfw) else "added"
}
def _recover_attributes(self, node, chunk_text, best_match):
words_in = set(chunk_text.split())
words_out = set(best_match.canonical.lower().split())
diff = words_in - words_out
if diff:
node.attributes.extend(list(diff))
def _add_fallback_node(self, graph, chunk_text, root_token, trace):
if root_token.pos_ in ("ADJ", "NOUN", "PROPN") or (root_token.pos_ == "NUM" and root_token.text.istitle()):
node = IntentNode(id=str(uuid.uuid4()), label=chunk_text, category="attribute", confidence=0.5)
graph.nodes[node.id] = node
trace["resolved"][chunk_text] = {"canonical": chunk_text, "confidence": 0.5, "method": "fallback_attribute", "status": "added"}
def _build_edges(self, graph):
chars = [n for n in graph.nodes.values() if n.category == "character"]
if not chars: return
main_char = chars[0]
for node in graph.nodes.values():
if node.id != main_char.id and node.category not in ["scene", "lighting", "atmosphere"]:
graph.edges.append(IntentEdge(source_id=main_char.id, target_id=node.id, relation="has_attribute"))
def parse(self, text: str, safe_mode: bool = True) -> PromptIR:
# Step 0: Translate to English if needed (Always-to-English Pipeline)
working_text = text
if self.translator:
working_text = self.translator.translate(text)
doc = self.nlp(working_text)
ir = PromptIR(safe_mode=safe_mode)
ir.trace["input_original"] = text
ir.trace["input_working"] = working_text
ir.trace["resolved"] = {}
# Build intent graph handles chunk extraction and matching
graph = self._build_intent_graph(doc, safe_mode, ir.trace)
ir.intent_graph = graph
self._map_characters_to_ir(graph, ir)
self._map_global_nodes_to_ir(graph, ir)
return ir
def _normalize_text(self, text: str) -> str:
# Deprecated: Kept for interface compatibility but returns original text.
# Now that we have native offline Spanish aliases, regex normalization causes Spanglish.
return text
def _map_characters_to_ir(self, graph, ir):
char_nodes = [n for n in graph.nodes.values() if n.category == "character"]
if not char_nodes:
dummy = IntentNode(id="subject", label="Subject", category="character")
graph.nodes["subject"] = dummy
char_nodes.append(dummy)
for cnode in char_nodes:
ci = CharacterInstance(name=cnode.label)
if cnode.attributes:
ci.appearance.extend(cnode.attributes)
attached = [e.target_id for e in graph.edges if e.source_id == cnode.id]
for tid in attached:
tnode = graph.nodes[tid]
self._assign_attribute_to_character(ci, tnode.category, tnode.label)
ir.characters.append(ci)
def _map_global_nodes_to_ir(self, graph, ir):
main_char = ir.characters[0]
for node in graph.nodes.values():
cat, label = node.category, node.label
if cat == "scene":
ir.scene.locations.append(label)
elif cat == "lighting":
ir.scene.lighting.append(label)
elif cat == "atmosphere":
ir.scene.atmosphere.append(label)
elif cat == "style":
ir.style.append(label)
elif cat == "effect":
ir.effects.append(label)
elif cat == "special":
ir.technical_details.append(label)
elif cat not in ["character", "scene", "lighting", "atmosphere", "style", "effect", "special"]:
self._assign_attribute_to_character(main_char, cat, label)
def _assign_attribute_to_character(self, char_instance, category, label):
if category == "clothing" and label not in char_instance.clothing:
char_instance.clothing.append(label)
elif category in ["hairstyle", "hair_color", "eye_color", "attribute"] and label not in char_instance.appearance:
char_instance.appearance.append(label)
elif category == "accessory" and label not in char_instance.accessories:
char_instance.accessories.append(label)
elif category == "pose" and label not in char_instance.pose:
char_instance.pose.append(label)
elif category == "emotion" and label not in char_instance.expression:
char_instance.expression.append(label)