| import random |
| import torch |
| import torch.nn.functional as F |
| import numpy as np |
| from transformers import BertTokenizer, BertForMaskedLM, pipeline |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| MIN_PROBABILITY = 52.0 |
| GRAY_ZONE_UPPER = 65.0 |
|
|
| try: |
| from modules.syntactic_pmi import SyntacticPMIScorer as _PMIScorer |
| except ImportError: |
| try: |
| from syntactic_pmi import SyntacticPMIScorer as _PMIScorer |
| except ImportError: |
| _PMIScorer = None |
|
|
| |
| |
| |
|
|
| BERT_MODELS = { |
| "en": "bert-base-uncased", |
| "it": "bert-base-multilingual-uncased", |
| } |
|
|
| COPULA_VERBS = { |
| "en": {"is", "was", "are", "were", "be", "'s"}, |
| "it": {"è", "era", "sono", "erano", "essere", "fu", "sarà", "sarebbe", |
| "sei", "siete", "eravamo", "eravate", "fosse", "fossero", "sii", "sia"}, |
| } |
|
|
| |
| |
| PLACEHOLDER_MAP = { |
| "en": { |
| "PERSON": "person", "GPE": "place", "LOC": "place", "FAC": "place", |
| "ORG": "thing", "PRODUCT": "thing", "EVENT": "thing", |
| "DEFAULT_PROPN": "he", "DEFAULT": "it", |
| }, |
| "it": { |
| "PERSON": "persona", "GPE": "luogo", "LOC": "luogo", "FAC": "luogo", |
| "ORG": "cosa", "PRODUCT": "cosa", "EVENT": "cosa", |
| "DEFAULT_PROPN": "lui", "DEFAULT": "esso", |
| }, |
| } |
|
|
| |
| SUBJ_OBJ_DEPS = { |
| "en": {"nsubj", "nsubjpass", "obj", "dobj", "amod"}, |
| "it": {"nsubj", "nsubj:pass", "obj", "amod"}, |
| } |
|
|
| |
| PREDICATE_DEPS = { |
| "en": {"attr", "acomp", "prd", "dobj", "obj"}, |
| "it": set(), |
| } |
|
|
|
|
| |
| |
| |
|
|
| class FigurativeModule: |
| def __init__(self, action_vehicle_weight=0.6, use_pmi=True): |
| self.action_vehicle_weight = action_vehicle_weight |
|
|
| |
| self.pmi = None |
| if use_pmi and _PMIScorer is not None: |
| try: |
| self.pmi = _PMIScorer() |
| except Exception as e: |
| print(f"[FigurativeModule] PMI non disponibile: {e}") |
|
|
| self.device = 0 if torch.cuda.is_available() else -1 |
|
|
| |
| self._bert: dict = {} |
| self._load_bert("en") |
|
|
| self.TOP_K = 20 |
|
|
| |
| |
| |
|
|
| def _load_bert(self, lang: str) -> None: |
| if lang in self._bert: |
| return |
| model_name = BERT_MODELS.get(lang, BERT_MODELS["en"]) |
| print(f">>> Loading BERT model for [{lang}]: {model_name} ...") |
| tokenizer = BertTokenizer.from_pretrained(model_name) |
| model = BertForMaskedLM.from_pretrained(model_name) |
| model.eval() |
| if self.device == 0: |
| model.to("cuda") |
| filler = pipeline("fill-mask", model=model, tokenizer=tokenizer, |
| device=self.device) |
| self._bert[lang] = {"tokenizer": tokenizer, "model": model, |
| "mask_filler": filler} |
|
|
| def _bert_for(self, lang: str) -> dict: |
| if lang not in self._bert: |
| self._load_bert(lang) |
| return self._bert[lang] |
|
|
| |
| |
| |
|
|
| def get_embedding(self, text: str, lang: str = "en") -> np.ndarray: |
| b = self._bert_for(lang) |
| inputs = b["tokenizer"](text, return_tensors="pt", |
| padding=True, truncation=True) |
| if self.device == 0: |
| inputs = {k: v.to("cuda") for k, v in inputs.items()} |
|
|
| with torch.no_grad(): |
| outputs = b["model"].bert(inputs["input_ids"], |
| attention_mask=inputs["attention_mask"]) |
| emb = outputs.last_hidden_state |
| mask = inputs["attention_mask"].unsqueeze(-1).expand(emb.size()).float() |
| pooled = torch.sum(emb * mask, 1) / torch.clamp(mask.sum(1), min=1e-9) |
| pooled = F.normalize(pooled, p=2, dim=1) |
|
|
| return pooled.cpu().numpy()[0] |
|
|
| def _cosine_sim(self, v1: np.ndarray, v2: np.ndarray) -> float: |
| return float(cosine_similarity(v1.reshape(1, -1), v2.reshape(1, -1))[0][0]) |
|
|
| |
| |
| |
|
|
| def _get_placeholder(self, token, lang: str = "en") -> str: |
| pm = PLACEHOLDER_MAP.get(lang, PLACEHOLDER_MAP["en"]) |
| is_proper = (token.pos_ == "PROPN" |
| or (token.text[0].isupper() and token.i > 0) |
| or token.ent_type_ != "") |
| if not is_proper: |
| return token.text.lower().strip() |
|
|
| ent = token.ent_type_ |
| if ent in pm: |
| return pm[ent] |
|
|
| |
| if token.pos_ == "PROPN": |
| return pm["DEFAULT_PROPN"] |
| return pm["DEFAULT"] |
|
|
| def _is_unrecognized_subject(self, token) -> bool: |
| return (token.pos_ == "PROPN" |
| or (token.text[0].isupper() and token.i > 0) |
| or token.ent_type_ != "") |
|
|
| |
| |
| |
|
|
| def calculate_best_match(self, sent_obj, target_token, lang: str = "en") -> float: |
| try: |
| actual_text = target_token.text.lower().strip() |
| placeholder = self._get_placeholder(target_token, lang) |
|
|
| tokens_text = [t.text if t.i != target_token.i else "[MASK]" |
| for t in sent_obj] |
| masked_sentence = " ".join(tokens_text) |
|
|
| b = self._bert_for(lang) |
| candidates = b["mask_filler"](masked_sentence, top_k=self.TOP_K) |
| cand_texts = [c["token_str"].strip().lower() for c in candidates] |
| cand_texts = [c for c in cand_texts if c != actual_text] |
|
|
| actual_vector = self.get_embedding(placeholder, lang) |
|
|
| max_similarity = 0.0 |
| for cand in cand_texts[:10]: |
| sim = self._cosine_sim(actual_vector, self.get_embedding(cand, lang)) |
| if sim > max_similarity: |
| max_similarity = sim |
|
|
| return max_similarity |
|
|
| except Exception as e: |
| print(f"Error in calculate_best_match: {e}") |
| return -1.0 |
|
|
| |
| |
| |
|
|
| def evaluate_pair(self, sent, arg, head, lang: str = "en"): |
| s1 = self.calculate_best_match(sent, arg, lang) |
| s2 = self.calculate_best_match(sent, head, lang) |
|
|
| if s1 == -1: s1 = 1.0 |
| if s2 == -1: s2 = 1.0 |
|
|
| is_propn = self._is_unrecognized_subject(arg) |
| thr = 0.85 if is_propn else 0.90 |
|
|
| copula = COPULA_VERBS.get(lang, COPULA_VERBS["en"]) |
| head_text = head.text.lower() |
| is_copula = head_text in copula |
|
|
| if is_propn and is_copula: |
| return False, s1, s2, "PROPN_DESCRIPTION" |
|
|
| if s1 < thr: |
| reason = "COPULA ANOMALY" if is_copula else "ACTION ANOMALY" |
| return True, s1, s2, reason |
|
|
| return False, s1, s2, "LITERAL" |
|
|
| |
| |
| |
|
|
| def analyze(self, doc, sample_rate: float = 1.0, lang: str = "en") -> dict: |
| |
| self._load_bert(lang) |
|
|
| copula_set = COPULA_VERBS.get(lang, COPULA_VERBS["en"]) |
| subj_deps = SUBJ_OBJ_DEPS.get(lang, SUBJ_OBJ_DEPS["en"]) |
| pred_deps = PREDICATE_DEPS.get(lang, PREDICATE_DEPS["en"]) |
|
|
| all_sentences = list(doc.sents) |
| if sample_rate < 1.0: |
| n = max(1, int(len(all_sentences) * sample_rate)) |
| sampled = random.sample(all_sentences, n) |
| else: |
| sampled = all_sentences |
|
|
| num_sents_analyzed = len(sampled) |
| total_words_analyzed = sum(1 for s in sampled for t in s if not t.is_punct) |
|
|
| detections = [] |
|
|
| for sent in sampled: |
| for token in sent: |
| target_head = None |
| is_meta = False |
|
|
| if token.dep_ not in subj_deps: |
| continue |
|
|
| head = token.head |
| if head.pos_ not in {"VERB", "NOUN", "ADJ", "AUX", "PROPN"}: |
| continue |
|
|
| if lang == "it": |
| |
| |
| has_cop = any(child.dep_ == "cop" for child in head.children) |
| if has_cop: |
| target_head = head |
| is_meta, s1, s2, reason = self.evaluate_pair( |
| sent, token, target_head, lang) |
| else: |
| is_meta, s1, s2, reason = self.evaluate_pair( |
| sent, token, head, lang) |
| else: |
| |
| if head.text.lower() in copula_set or head.pos_ == "AUX": |
| for child in head.children: |
| if child.dep_ in pred_deps: |
| target_head = child |
| break |
| if target_head: |
| is_meta, s1, s2, reason = self.evaluate_pair( |
| sent, token, target_head, lang) |
| else: |
| is_meta, s1, s2, reason = self.evaluate_pair( |
| sent, token, head, lang) |
| else: |
| is_meta, s1, s2, reason = self.evaluate_pair( |
| sent, token, head, lang) |
|
|
| if not is_meta: |
| continue |
|
|
| if reason == "ACTION ANOMALY" and self.action_vehicle_weight == 0: |
| continue |
|
|
| display_head = target_head.text if target_head else head.text |
| display_head_lemma = target_head.lemma_.lower() if target_head else head.lemma_.lower() |
|
|
| is_propn = self._is_unrecognized_subject(token) |
| thr = 0.85 if is_propn else 0.90 |
| diff = max(0, thr - s1) |
| prob = min(99.0, 50 + diff * 150) |
|
|
| if prob < MIN_PROBABILITY: |
| continue |
|
|
| pmi_adj = 0.0 |
| if self.pmi is not None and prob < GRAY_ZONE_UPPER: |
| pmi_adj = self.pmi.adjustment( |
| token.lemma_.lower(), display_head_lemma, lang) |
| prob = round(min(99.0, prob + pmi_adj), 1) |
| if prob < MIN_PROBABILITY: |
| continue |
|
|
| detections.append({ |
| "sentence": sent.text.strip(), |
| "term": token.text, |
| "term_lemma": token.lemma_.lower(), |
| "head": display_head, |
| "head_lemma": display_head_lemma, |
| "s1": round(float(s1), 4), |
| "s2": round(float(s2), 4), |
| "reason": reason, |
| "probability": round(prob, 1), |
| "pmi_adj": pmi_adj, |
| }) |
|
|
| return { |
| "detections": detections, |
| "mds_s": len(detections) / num_sents_analyzed if num_sents_analyzed > 0 else 0, |
| "mds_w": len(detections) / total_words_analyzed * 1000 if total_words_analyzed > 0 else 0, |
| "total": len(detections), |
| "is_sample": sample_rate < 1.0, |
| } |
|
|