import random import torch import torch.nn.functional as F import numpy as np from transformers import BertTokenizer, BertForMaskedLM, pipeline from sklearn.metrics.pairwise import cosine_similarity MIN_PROBABILITY = 52.0 GRAY_ZONE_UPPER = 65.0 try: from modules.syntactic_pmi import SyntacticPMIScorer as _PMIScorer except ImportError: try: from syntactic_pmi import SyntacticPMIScorer as _PMIScorer except ImportError: _PMIScorer = None # --------------------------------------------------------------------------- # Costanti lingua-dipendenti # --------------------------------------------------------------------------- BERT_MODELS = { "en": "bert-base-uncased", "it": "bert-base-multilingual-uncased", } COPULA_VERBS = { "en": {"is", "was", "are", "were", "be", "'s"}, "it": {"è", "era", "sono", "erano", "essere", "fu", "sarà", "sarebbe", "sei", "siete", "eravamo", "eravate", "fosse", "fossero", "sii", "sia"}, } # Placeholder usati per mascherare nomi propri prima di calcolare l'embedding. # Devono essere parole note al modello BERT della rispettiva lingua. PLACEHOLDER_MAP = { "en": { "PERSON": "person", "GPE": "place", "LOC": "place", "FAC": "place", "ORG": "thing", "PRODUCT": "thing", "EVENT": "thing", "DEFAULT_PROPN": "he", "DEFAULT": "it", }, "it": { "PERSON": "persona", "GPE": "luogo", "LOC": "luogo", "FAC": "luogo", "ORG": "cosa", "PRODUCT": "cosa", "EVENT": "cosa", "DEFAULT_PROPN": "lui", "DEFAULT": "esso", }, } # dep_ attesi per soggetto/oggetto/modificatore SUBJ_OBJ_DEPS = { "en": {"nsubj", "nsubjpass", "obj", "dobj", "amod"}, "it": {"nsubj", "nsubj:pass", "obj", "amod"}, } # dep_ dei figli che rappresentano il predicato nominale in copula PREDICATE_DEPS = { "en": {"attr", "acomp", "prd", "dobj", "obj"}, "it": set(), # in italiano UD il predicato nominale È la testa, non un figlio } # --------------------------------------------------------------------------- # Modulo principale # --------------------------------------------------------------------------- class FigurativeModule: def __init__(self, action_vehicle_weight=0.6, use_pmi=True): self.action_vehicle_weight = action_vehicle_weight # PMI scorer (opzionale) self.pmi = None if use_pmi and _PMIScorer is not None: try: self.pmi = _PMIScorer() except Exception as e: print(f"[FigurativeModule] PMI non disponibile: {e}") self.device = 0 if torch.cuda.is_available() else -1 # Modelli BERT: inglese pre-caricato, altri lazy self._bert: dict = {} self._load_bert("en") self.TOP_K = 20 # ------------------------------------------------------------------ # Gestione modelli BERT # ------------------------------------------------------------------ def _load_bert(self, lang: str) -> None: if lang in self._bert: return model_name = BERT_MODELS.get(lang, BERT_MODELS["en"]) print(f">>> Loading BERT model for [{lang}]: {model_name} ...") tokenizer = BertTokenizer.from_pretrained(model_name) model = BertForMaskedLM.from_pretrained(model_name) model.eval() if self.device == 0: model.to("cuda") filler = pipeline("fill-mask", model=model, tokenizer=tokenizer, device=self.device) self._bert[lang] = {"tokenizer": tokenizer, "model": model, "mask_filler": filler} def _bert_for(self, lang: str) -> dict: if lang not in self._bert: self._load_bert(lang) return self._bert[lang] # ------------------------------------------------------------------ # Embedding e similarità # ------------------------------------------------------------------ def get_embedding(self, text: str, lang: str = "en") -> np.ndarray: b = self._bert_for(lang) inputs = b["tokenizer"](text, return_tensors="pt", padding=True, truncation=True) if self.device == 0: inputs = {k: v.to("cuda") for k, v in inputs.items()} with torch.no_grad(): outputs = b["model"].bert(inputs["input_ids"], attention_mask=inputs["attention_mask"]) emb = outputs.last_hidden_state mask = inputs["attention_mask"].unsqueeze(-1).expand(emb.size()).float() pooled = torch.sum(emb * mask, 1) / torch.clamp(mask.sum(1), min=1e-9) pooled = F.normalize(pooled, p=2, dim=1) return pooled.cpu().numpy()[0] def _cosine_sim(self, v1: np.ndarray, v2: np.ndarray) -> float: return float(cosine_similarity(v1.reshape(1, -1), v2.reshape(1, -1))[0][0]) # ------------------------------------------------------------------ # Placeholder per nomi propri # ------------------------------------------------------------------ def _get_placeholder(self, token, lang: str = "en") -> str: pm = PLACEHOLDER_MAP.get(lang, PLACEHOLDER_MAP["en"]) is_proper = (token.pos_ == "PROPN" or (token.text[0].isupper() and token.i > 0) or token.ent_type_ != "") if not is_proper: return token.text.lower().strip() ent = token.ent_type_ if ent in pm: return pm[ent] # Fallback: nome proprio senza entità riconosciuta if token.pos_ == "PROPN": return pm["DEFAULT_PROPN"] return pm["DEFAULT"] def _is_unrecognized_subject(self, token) -> bool: return (token.pos_ == "PROPN" or (token.text[0].isupper() and token.i > 0) or token.ent_type_ != "") # ------------------------------------------------------------------ # Best-match BERT (MLM + embedding) # ------------------------------------------------------------------ def calculate_best_match(self, sent_obj, target_token, lang: str = "en") -> float: try: actual_text = target_token.text.lower().strip() placeholder = self._get_placeholder(target_token, lang) tokens_text = [t.text if t.i != target_token.i else "[MASK]" for t in sent_obj] masked_sentence = " ".join(tokens_text) b = self._bert_for(lang) candidates = b["mask_filler"](masked_sentence, top_k=self.TOP_K) cand_texts = [c["token_str"].strip().lower() for c in candidates] cand_texts = [c for c in cand_texts if c != actual_text] actual_vector = self.get_embedding(placeholder, lang) max_similarity = 0.0 for cand in cand_texts[:10]: sim = self._cosine_sim(actual_vector, self.get_embedding(cand, lang)) if sim > max_similarity: max_similarity = sim return max_similarity except Exception as e: print(f"Error in calculate_best_match: {e}") return -1.0 # ------------------------------------------------------------------ # Valutazione coppia (soggetto / testa) # ------------------------------------------------------------------ def evaluate_pair(self, sent, arg, head, lang: str = "en"): s1 = self.calculate_best_match(sent, arg, lang) s2 = self.calculate_best_match(sent, head, lang) if s1 == -1: s1 = 1.0 if s2 == -1: s2 = 1.0 is_propn = self._is_unrecognized_subject(arg) thr = 0.85 if is_propn else 0.90 copula = COPULA_VERBS.get(lang, COPULA_VERBS["en"]) head_text = head.text.lower() is_copula = head_text in copula if is_propn and is_copula: return False, s1, s2, "PROPN_DESCRIPTION" if s1 < thr: reason = "COPULA ANOMALY" if is_copula else "ACTION ANOMALY" return True, s1, s2, reason return False, s1, s2, "LITERAL" # ------------------------------------------------------------------ # Analisi principale # ------------------------------------------------------------------ def analyze(self, doc, sample_rate: float = 1.0, lang: str = "en") -> dict: # Lazy-load del modello BERT per la lingua richiesta self._load_bert(lang) copula_set = COPULA_VERBS.get(lang, COPULA_VERBS["en"]) subj_deps = SUBJ_OBJ_DEPS.get(lang, SUBJ_OBJ_DEPS["en"]) pred_deps = PREDICATE_DEPS.get(lang, PREDICATE_DEPS["en"]) all_sentences = list(doc.sents) if sample_rate < 1.0: n = max(1, int(len(all_sentences) * sample_rate)) sampled = random.sample(all_sentences, n) else: sampled = all_sentences num_sents_analyzed = len(sampled) total_words_analyzed = sum(1 for s in sampled for t in s if not t.is_punct) detections = [] for sent in sampled: for token in sent: target_head = None is_meta = False if token.dep_ not in subj_deps: continue head = token.head if head.pos_ not in {"VERB", "NOUN", "ADJ", "AUX", "PROPN"}: continue if lang == "it": # In italiano UD la copula ("è") è figlia del predicato nominale # con dep_="cop". Il predicato nominale è già `head`. has_cop = any(child.dep_ == "cop" for child in head.children) if has_cop: target_head = head # il predicato nominale è già la testa is_meta, s1, s2, reason = self.evaluate_pair( sent, token, target_head, lang) else: is_meta, s1, s2, reason = self.evaluate_pair( sent, token, head, lang) else: # Inglese: copula è la testa; il predicato nominale è un figlio if head.text.lower() in copula_set or head.pos_ == "AUX": for child in head.children: if child.dep_ in pred_deps: target_head = child break if target_head: is_meta, s1, s2, reason = self.evaluate_pair( sent, token, target_head, lang) else: is_meta, s1, s2, reason = self.evaluate_pair( sent, token, head, lang) else: is_meta, s1, s2, reason = self.evaluate_pair( sent, token, head, lang) if not is_meta: continue if reason == "ACTION ANOMALY" and self.action_vehicle_weight == 0: continue display_head = target_head.text if target_head else head.text display_head_lemma = target_head.lemma_.lower() if target_head else head.lemma_.lower() is_propn = self._is_unrecognized_subject(token) thr = 0.85 if is_propn else 0.90 diff = max(0, thr - s1) prob = min(99.0, 50 + diff * 150) if prob < MIN_PROBABILITY: continue pmi_adj = 0.0 if self.pmi is not None and prob < GRAY_ZONE_UPPER: pmi_adj = self.pmi.adjustment( token.lemma_.lower(), display_head_lemma, lang) prob = round(min(99.0, prob + pmi_adj), 1) if prob < MIN_PROBABILITY: continue detections.append({ "sentence": sent.text.strip(), "term": token.text, "term_lemma": token.lemma_.lower(), "head": display_head, "head_lemma": display_head_lemma, "s1": round(float(s1), 4), "s2": round(float(s2), 4), "reason": reason, "probability": round(prob, 1), "pmi_adj": pmi_adj, }) return { "detections": detections, "mds_s": len(detections) / num_sents_analyzed if num_sents_analyzed > 0 else 0, "mds_w": len(detections) / total_words_analyzed * 1000 if total_words_analyzed > 0 else 0, "total": len(detections), "is_sample": sample_rate < 1.0, }