CO3 / src /modules /figurative.py
amaisto's picture
Upload figurative.py
2fe94a4 verified
import random
import torch
import torch.nn.functional as F
import numpy as np
from transformers import BertTokenizer, BertForMaskedLM, pipeline
from sklearn.metrics.pairwise import cosine_similarity
MIN_PROBABILITY = 52.0
GRAY_ZONE_UPPER = 65.0
try:
from modules.syntactic_pmi import SyntacticPMIScorer as _PMIScorer
except ImportError:
try:
from syntactic_pmi import SyntacticPMIScorer as _PMIScorer
except ImportError:
_PMIScorer = None
# ---------------------------------------------------------------------------
# Costanti lingua-dipendenti
# ---------------------------------------------------------------------------
BERT_MODELS = {
"en": "bert-base-uncased",
"it": "bert-base-multilingual-uncased",
}
COPULA_VERBS = {
"en": {"is", "was", "are", "were", "be", "'s"},
"it": {"è", "era", "sono", "erano", "essere", "fu", "sarà", "sarebbe",
"sei", "siete", "eravamo", "eravate", "fosse", "fossero", "sii", "sia"},
}
# Placeholder usati per mascherare nomi propri prima di calcolare l'embedding.
# Devono essere parole note al modello BERT della rispettiva lingua.
PLACEHOLDER_MAP = {
"en": {
"PERSON": "person", "GPE": "place", "LOC": "place", "FAC": "place",
"ORG": "thing", "PRODUCT": "thing", "EVENT": "thing",
"DEFAULT_PROPN": "he", "DEFAULT": "it",
},
"it": {
"PERSON": "persona", "GPE": "luogo", "LOC": "luogo", "FAC": "luogo",
"ORG": "cosa", "PRODUCT": "cosa", "EVENT": "cosa",
"DEFAULT_PROPN": "lui", "DEFAULT": "esso",
},
}
# dep_ attesi per soggetto/oggetto/modificatore
SUBJ_OBJ_DEPS = {
"en": {"nsubj", "nsubjpass", "obj", "dobj", "amod"},
"it": {"nsubj", "nsubj:pass", "obj", "amod"},
}
# dep_ dei figli che rappresentano il predicato nominale in copula
PREDICATE_DEPS = {
"en": {"attr", "acomp", "prd", "dobj", "obj"},
"it": set(), # in italiano UD il predicato nominale È la testa, non un figlio
}
# ---------------------------------------------------------------------------
# Modulo principale
# ---------------------------------------------------------------------------
class FigurativeModule:
def __init__(self, action_vehicle_weight=0.6, use_pmi=True):
self.action_vehicle_weight = action_vehicle_weight
# PMI scorer (opzionale)
self.pmi = None
if use_pmi and _PMIScorer is not None:
try:
self.pmi = _PMIScorer()
except Exception as e:
print(f"[FigurativeModule] PMI non disponibile: {e}")
self.device = 0 if torch.cuda.is_available() else -1
# Modelli BERT: inglese pre-caricato, altri lazy
self._bert: dict = {}
self._load_bert("en")
self.TOP_K = 20
# ------------------------------------------------------------------
# Gestione modelli BERT
# ------------------------------------------------------------------
def _load_bert(self, lang: str) -> None:
if lang in self._bert:
return
model_name = BERT_MODELS.get(lang, BERT_MODELS["en"])
print(f">>> Loading BERT model for [{lang}]: {model_name} ...")
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForMaskedLM.from_pretrained(model_name)
model.eval()
if self.device == 0:
model.to("cuda")
filler = pipeline("fill-mask", model=model, tokenizer=tokenizer,
device=self.device)
self._bert[lang] = {"tokenizer": tokenizer, "model": model,
"mask_filler": filler}
def _bert_for(self, lang: str) -> dict:
if lang not in self._bert:
self._load_bert(lang)
return self._bert[lang]
# ------------------------------------------------------------------
# Embedding e similarità
# ------------------------------------------------------------------
def get_embedding(self, text: str, lang: str = "en") -> np.ndarray:
b = self._bert_for(lang)
inputs = b["tokenizer"](text, return_tensors="pt",
padding=True, truncation=True)
if self.device == 0:
inputs = {k: v.to("cuda") for k, v in inputs.items()}
with torch.no_grad():
outputs = b["model"].bert(inputs["input_ids"],
attention_mask=inputs["attention_mask"])
emb = outputs.last_hidden_state
mask = inputs["attention_mask"].unsqueeze(-1).expand(emb.size()).float()
pooled = torch.sum(emb * mask, 1) / torch.clamp(mask.sum(1), min=1e-9)
pooled = F.normalize(pooled, p=2, dim=1)
return pooled.cpu().numpy()[0]
def _cosine_sim(self, v1: np.ndarray, v2: np.ndarray) -> float:
return float(cosine_similarity(v1.reshape(1, -1), v2.reshape(1, -1))[0][0])
# ------------------------------------------------------------------
# Placeholder per nomi propri
# ------------------------------------------------------------------
def _get_placeholder(self, token, lang: str = "en") -> str:
pm = PLACEHOLDER_MAP.get(lang, PLACEHOLDER_MAP["en"])
is_proper = (token.pos_ == "PROPN"
or (token.text[0].isupper() and token.i > 0)
or token.ent_type_ != "")
if not is_proper:
return token.text.lower().strip()
ent = token.ent_type_
if ent in pm:
return pm[ent]
# Fallback: nome proprio senza entità riconosciuta
if token.pos_ == "PROPN":
return pm["DEFAULT_PROPN"]
return pm["DEFAULT"]
def _is_unrecognized_subject(self, token) -> bool:
return (token.pos_ == "PROPN"
or (token.text[0].isupper() and token.i > 0)
or token.ent_type_ != "")
# ------------------------------------------------------------------
# Best-match BERT (MLM + embedding)
# ------------------------------------------------------------------
def calculate_best_match(self, sent_obj, target_token, lang: str = "en") -> float:
try:
actual_text = target_token.text.lower().strip()
placeholder = self._get_placeholder(target_token, lang)
tokens_text = [t.text if t.i != target_token.i else "[MASK]"
for t in sent_obj]
masked_sentence = " ".join(tokens_text)
b = self._bert_for(lang)
candidates = b["mask_filler"](masked_sentence, top_k=self.TOP_K)
cand_texts = [c["token_str"].strip().lower() for c in candidates]
cand_texts = [c for c in cand_texts if c != actual_text]
actual_vector = self.get_embedding(placeholder, lang)
max_similarity = 0.0
for cand in cand_texts[:10]:
sim = self._cosine_sim(actual_vector, self.get_embedding(cand, lang))
if sim > max_similarity:
max_similarity = sim
return max_similarity
except Exception as e:
print(f"Error in calculate_best_match: {e}")
return -1.0
# ------------------------------------------------------------------
# Valutazione coppia (soggetto / testa)
# ------------------------------------------------------------------
def evaluate_pair(self, sent, arg, head, lang: str = "en"):
s1 = self.calculate_best_match(sent, arg, lang)
s2 = self.calculate_best_match(sent, head, lang)
if s1 == -1: s1 = 1.0
if s2 == -1: s2 = 1.0
is_propn = self._is_unrecognized_subject(arg)
thr = 0.85 if is_propn else 0.90
copula = COPULA_VERBS.get(lang, COPULA_VERBS["en"])
head_text = head.text.lower()
is_copula = head_text in copula
if is_propn and is_copula:
return False, s1, s2, "PROPN_DESCRIPTION"
if s1 < thr:
reason = "COPULA ANOMALY" if is_copula else "ACTION ANOMALY"
return True, s1, s2, reason
return False, s1, s2, "LITERAL"
# ------------------------------------------------------------------
# Analisi principale
# ------------------------------------------------------------------
def analyze(self, doc, sample_rate: float = 1.0, lang: str = "en") -> dict:
# Lazy-load del modello BERT per la lingua richiesta
self._load_bert(lang)
copula_set = COPULA_VERBS.get(lang, COPULA_VERBS["en"])
subj_deps = SUBJ_OBJ_DEPS.get(lang, SUBJ_OBJ_DEPS["en"])
pred_deps = PREDICATE_DEPS.get(lang, PREDICATE_DEPS["en"])
all_sentences = list(doc.sents)
if sample_rate < 1.0:
n = max(1, int(len(all_sentences) * sample_rate))
sampled = random.sample(all_sentences, n)
else:
sampled = all_sentences
num_sents_analyzed = len(sampled)
total_words_analyzed = sum(1 for s in sampled for t in s if not t.is_punct)
detections = []
for sent in sampled:
for token in sent:
target_head = None
is_meta = False
if token.dep_ not in subj_deps:
continue
head = token.head
if head.pos_ not in {"VERB", "NOUN", "ADJ", "AUX", "PROPN"}:
continue
if lang == "it":
# In italiano UD la copula ("è") è figlia del predicato nominale
# con dep_="cop". Il predicato nominale è già `head`.
has_cop = any(child.dep_ == "cop" for child in head.children)
if has_cop:
target_head = head # il predicato nominale è già la testa
is_meta, s1, s2, reason = self.evaluate_pair(
sent, token, target_head, lang)
else:
is_meta, s1, s2, reason = self.evaluate_pair(
sent, token, head, lang)
else:
# Inglese: copula è la testa; il predicato nominale è un figlio
if head.text.lower() in copula_set or head.pos_ == "AUX":
for child in head.children:
if child.dep_ in pred_deps:
target_head = child
break
if target_head:
is_meta, s1, s2, reason = self.evaluate_pair(
sent, token, target_head, lang)
else:
is_meta, s1, s2, reason = self.evaluate_pair(
sent, token, head, lang)
else:
is_meta, s1, s2, reason = self.evaluate_pair(
sent, token, head, lang)
if not is_meta:
continue
if reason == "ACTION ANOMALY" and self.action_vehicle_weight == 0:
continue
display_head = target_head.text if target_head else head.text
display_head_lemma = target_head.lemma_.lower() if target_head else head.lemma_.lower()
is_propn = self._is_unrecognized_subject(token)
thr = 0.85 if is_propn else 0.90
diff = max(0, thr - s1)
prob = min(99.0, 50 + diff * 150)
if prob < MIN_PROBABILITY:
continue
pmi_adj = 0.0
if self.pmi is not None and prob < GRAY_ZONE_UPPER:
pmi_adj = self.pmi.adjustment(
token.lemma_.lower(), display_head_lemma, lang)
prob = round(min(99.0, prob + pmi_adj), 1)
if prob < MIN_PROBABILITY:
continue
detections.append({
"sentence": sent.text.strip(),
"term": token.text,
"term_lemma": token.lemma_.lower(),
"head": display_head,
"head_lemma": display_head_lemma,
"s1": round(float(s1), 4),
"s2": round(float(s2), 4),
"reason": reason,
"probability": round(prob, 1),
"pmi_adj": pmi_adj,
})
return {
"detections": detections,
"mds_s": len(detections) / num_sents_analyzed if num_sents_analyzed > 0 else 0,
"mds_w": len(detections) / total_words_analyzed * 1000 if total_words_analyzed > 0 else 0,
"total": len(detections),
"is_sample": sample_rate < 1.0,
}