| |
| """ |
| CISA-BERTurk-Sentiment: Cross-Individual Sentiment Analysis for Historical Turkish |
| DECA-EBSA (Dual-Encoder Context-Aware Entity-Based Sentiment Analysis) Architecture |
| |
| İzmir Institute of Technology - Digital Humanities and AI Laboratory |
| TÜBİTAK Project No: 323K372 |
| |
| Usage: |
| from transformers import AutoTokenizer |
| from modeling_cisa import CISAModel |
| |
| tokenizer = AutoTokenizer.from_pretrained("dbbiyte/CISA-BERTurk-sentiment") |
| model = CISAModel.from_pretrained("dbbiyte/CISA-BERTurk-sentiment", tokenizer=tokenizer) |
| result = model.predict( |
| text="Ali Bey'in vefatı bizleri elem-i azîme sevk etmişti.", |
| entity_text="Ali Bey", |
| entity_start=0, |
| entity_end=7, |
| ) |
| print(result) # {'sentiment': 2, 'sentiment_label': 'Positive', ...} |
| """ |
|
|
| import os |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from transformers import AutoModel, AutoTokenizer |
| from huggingface_hub import hf_hub_download |
| from dataclasses import dataclass |
| from typing import Optional, List |
|
|
| SENTIMENT_LABELS = {0: "Negative", 1: "Neutral", 2: "Positive"} |
| RELATION_LABELS = {0: "Indirect", 1: "Direct"} |
|
|
|
|
| |
| |
| |
|
|
| class AdaptiveFocalLoss(nn.Module): |
| def __init__(self, alpha=None, gamma=2.0, size_average=True, difficulty_weight=True): |
| super().__init__() |
| self.alpha, self.gamma = alpha, gamma |
| self.size_average, self.difficulty_weight = size_average, difficulty_weight |
|
|
| def forward(self, inputs, targets): |
| ce = F.cross_entropy(inputs, targets, reduction='none') |
| pt = torch.exp(-ce) |
| alpha_t = 1.0 |
| if self.alpha is not None: |
| alpha_t = self.alpha if isinstance(self.alpha, (float, int)) \ |
| else self.alpha.gather(0, targets.data.view(-1)) |
| fw = (1 - pt) ** self.gamma |
| if self.difficulty_weight: |
| fw = fw * (1 + torch.exp(-pt * 2)) |
| loss = alpha_t * fw * ce |
| return loss.mean() if self.size_average else loss.sum() |
|
|
|
|
| class TurkishLinguisticFeatures(nn.Module): |
| def __init__(self, hidden_size): |
| super().__init__() |
| self.adjective_noun_attention = nn.MultiheadAttention( |
| hidden_size, num_heads=8, dropout=0.1, batch_first=True) |
| self.historical_word_projection = nn.Linear(hidden_size, hidden_size) |
| self.respect_pattern_detector = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(hidden_size // 2, 64)) |
| self.formality_detector = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 4), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(hidden_size // 4, 32)) |
| self.morphological_analyzer = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size // 2), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(hidden_size // 2, 48)) |
| self.linguistic_fusion = nn.Sequential( |
| nn.Linear(144, 128), nn.LayerNorm(128), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(128, 64)) |
|
|
| def forward(self, text_repr, entity_repr): |
| enhanced, _ = self.adjective_noun_attention( |
| text_repr.unsqueeze(1), text_repr.unsqueeze(1), text_repr.unsqueeze(1)) |
| enhanced = enhanced.squeeze(1) |
| combined = enhanced + self.historical_word_projection(enhanced) |
| return self.linguistic_fusion(torch.cat([ |
| self.respect_pattern_detector(combined), |
| self.formality_detector(combined), |
| self.morphological_analyzer(combined), |
| ], dim=-1)) |
|
|
|
|
| class EnhancedEntityContextAttention(nn.Module): |
| def __init__(self, hidden_size, num_heads=12, dropout=0.1): |
| super().__init__() |
| self.entity_context_attention = nn.MultiheadAttention( |
| hidden_size, num_heads, dropout=dropout, batch_first=True) |
| self.position_embedding = nn.Embedding(512, hidden_size) |
| self.local_context_attention = nn.MultiheadAttention( |
| hidden_size, 8, dropout=dropout, batch_first=True) |
| self.hierarchical_attention = nn.Sequential( |
| nn.Linear(hidden_size * 3, hidden_size // 2), nn.Tanh(), |
| nn.Linear(hidden_size // 2, 3)) |
| self.layer_norm1 = nn.LayerNorm(hidden_size) |
| self.layer_norm2 = nn.LayerNorm(hidden_size) |
|
|
| def _pos_weights(self, entity_positions, seq_len, device): |
| B = len(entity_positions) |
| W = torch.ones(B, seq_len, device=device) |
| for i, (s, e) in enumerate(entity_positions): |
| W[i, s:e+1] = 3.0 |
| cs, ce = max(0, s-3), min(seq_len, e+4) |
| W[i, cs:s] = 2.0; W[i, e+1:ce] = 2.0 |
| W[i, :cs] = 0.5; W[i, ce:] = 0.5 |
| return W |
|
|
| def forward(self, entity_repr, text_sequence, entity_positions, attention_mask): |
| B, L, H = text_sequence.shape |
| dev = text_sequence.device |
| pos_ids = torch.arange(L, device=dev).unsqueeze(0).expand(B, -1) |
| enhanced = self.layer_norm1(text_sequence + self.position_embedding(pos_ids)) |
| pw = self._pos_weights(entity_positions, L, dev) |
|
|
| eq = entity_repr.unsqueeze(1) |
| g_att, g_w = self.entity_context_attention( |
| eq, enhanced, enhanced, key_padding_mask=~attention_mask.bool()) |
| g_att = g_att.squeeze(1) |
| w_att = torch.bmm((g_w.squeeze(1) * pw).unsqueeze(1), enhanced).squeeze(1) |
|
|
| local = [] |
| for i, (s, e) in enumerate(entity_positions): |
| lc = enhanced[i, max(0,s-5):min(L,e+6)].unsqueeze(0) |
| if lc.size(1) > 0: |
| la, _ = self.local_context_attention(eq[i:i+1], lc, lc) |
| local.append(la.squeeze(1)) |
| else: |
| local.append(entity_repr[i:i+1]) |
| local = torch.cat(local, 0) |
|
|
| hw = F.softmax(self.hierarchical_attention( |
| torch.cat([g_att, w_att, local], -1)).view(-1, 3), -1) |
| final = hw[:,0:1]*g_att + hw[:,1:2]*w_att + hw[:,2:3]*local |
| return self.layer_norm2(final), g_w |
|
|
|
|
| class ContextualSentimentEncoder(nn.Module): |
| def __init__(self, hidden_size): |
| super().__init__() |
| self.context_lstm = nn.LSTM( |
| hidden_size, hidden_size//2, num_layers=2, |
| bidirectional=True, dropout=0.1, batch_first=True) |
| self.sentiment_pooling = nn.MultiheadAttention(hidden_size, 8, batch_first=True) |
| self.context_type_classifier = nn.Sequential( |
| nn.Linear(hidden_size, hidden_size//2), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(hidden_size//2, 2)) |
|
|
| def forward(self, context_sequence, entity_position_mask): |
| lstm_out, _ = self.context_lstm(context_sequence) |
| if entity_position_mask is not None: |
| pooled = (lstm_out * entity_position_mask.unsqueeze(-1).float()).mean(1) |
| else: |
| pooled = lstm_out.mean(1) |
| sc, _ = self.sentiment_pooling(pooled.unsqueeze(1), lstm_out, lstm_out) |
| return sc.squeeze(1), self.context_type_classifier(pooled) |
|
|
|
|
| |
| |
| |
|
|
| class PositionAwareDualEncoderEBSA(nn.Module): |
| """Training koduyla birebir aynı mimari.""" |
|
|
| def __init__(self, model_name='dbmdz/bert-base-turkish-cased', |
| num_sentiment_labels=3, dropout_rate=0.1, |
| use_r_drop=True, stochastic_depth_rate=0.1): |
| super().__init__() |
| self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| self.text_encoder = AutoModel.from_pretrained(model_name, add_pooling_layer=False) |
| self.entity_encoder = AutoModel.from_pretrained(model_name, add_pooling_layer=False) |
| self.config = self.text_encoder.config |
|
|
| self.dropout_rate = dropout_rate |
| self.use_r_drop = use_r_drop |
| self.clip_grad_norm = 1.0 |
| self.stochastic_depth_rate = stochastic_depth_rate |
| H = self.text_encoder.config.hidden_size |
|
|
| self.enhanced_attention = EnhancedEntityContextAttention(H, 12, dropout_rate) |
| self.turkish_linguistic = TurkishLinguisticFeatures(H) |
| self.contextual_encoder = ContextualSentimentEncoder(H) |
|
|
| self.position_embedding = nn.Embedding(512, H) |
| self.entity_position_proj = nn.Linear(H, H) |
|
|
| self.layer_norm1 = nn.LayerNorm(H) |
| self.layer_norm2 = nn.LayerNorm(H) |
| self.layer_norm3 = nn.LayerNorm(H * 2) |
| self.dropout = nn.Dropout(dropout_rate) |
|
|
| self.enhanced_fusion = nn.Sequential( |
| nn.Linear(H*3+64, H*2), nn.LayerNorm(H*2), nn.GELU(), nn.Dropout(dropout_rate), |
| nn.Linear(H*2, H), nn.LayerNorm(H)) |
|
|
| self.sentiment_classifier = nn.Sequential( |
| nn.Linear(H+2, H//2), nn.LayerNorm(H//2), nn.GELU(), nn.Dropout(dropout_rate), |
| nn.Linear(H//2, num_sentiment_labels)) |
|
|
| self.relation_classifier = nn.Sequential( |
| nn.Linear(H+2, H//2), nn.LayerNorm(H//2), nn.GELU(), nn.Dropout(dropout_rate), |
| nn.Linear(H//2, 2)) |
|
|
| self.label_smoothing = 0.1 |
| num_layers = len(self.text_encoder.encoder.layer) |
| self.layer_drop_probs = [stochastic_depth_rate * i / num_layers for i in range(num_layers)] |
|
|
| def _weighted_layers(self, encoder, input_ids, attn_mask): |
| out = encoder(input_ids=input_ids, attention_mask=attn_mask, output_hidden_states=True) |
| last4 = torch.stack(out.hidden_states[-4:]) |
| w = torch.tensor([0.1,0.2,0.3,0.4], device=input_ids.device).view(4,1,1,1) |
| return (last4 * w).sum(0), out.last_hidden_state |
|
|
| def _entity_repr(self, text_out, entity_positions, position_mask): |
| reprs = [] |
| for i in range(text_out.size(0)): |
| pm = torch.tensor(position_mask[i], device=text_out.device, dtype=torch.bool) |
| toks = text_out[i][pm] |
| reprs.append(toks.mean(0) if toks.size(0) > 0 else text_out[i, 0]) |
| return torch.stack(reprs) |
|
|
| def forward(self, text_input_ids, text_attention_mask, |
| entity_input_ids, entity_attention_mask, |
| entity_positions, position_mask, |
| sentiment_label=None, relation_label=None, **kwargs): |
|
|
| dev = text_input_ids.device |
| text_out, _ = self._weighted_layers(self.text_encoder, text_input_ids, text_attention_mask) |
| entity_out, _ = self._weighted_layers(self.entity_encoder, entity_input_ids, entity_attention_mask) |
|
|
| text_cls = text_out[:, 0, :] |
| entity_cls = entity_out[:, 0, :] |
|
|
| cross_att, att_w = self.enhanced_attention( |
| entity_cls, text_out, entity_positions, text_attention_mask) |
| turkish_feat = self.turkish_linguistic(text_cls, entity_cls) |
|
|
| pm_tensor = torch.stack([torch.tensor(m, device=dev) for m in position_mask]) |
| ctx_repr, ctx_type_logits = self.contextual_encoder(text_out, pm_tensor) |
|
|
| text_cls = F.normalize(text_cls, p=2, dim=1) |
| entity_cls = F.normalize(entity_cls, p=2, dim=1) |
| cross_att = F.normalize(cross_att, p=2, dim=1) |
|
|
| entity_cls = self.layer_norm1(entity_cls) |
| cross_att = self.layer_norm2(cross_att + entity_cls) |
|
|
| fused = self.enhanced_fusion( |
| torch.cat([text_cls, entity_cls, cross_att, turkish_feat], 1)) |
| ctx_probs = F.softmax(ctx_type_logits, -1) |
| clf_input = torch.cat([fused, ctx_probs], 1) |
|
|
| sentiment_logits = self.sentiment_classifier(clf_input) |
| relation_logits = self.relation_classifier(clf_input) |
|
|
| loss = None |
| if sentiment_label is not None and relation_label is not None: |
| loss_fn = AdaptiveFocalLoss(alpha=0.25, gamma=2.0, difficulty_weight=True) |
| loss = loss_fn(sentiment_logits, sentiment_label) + \ |
| loss_fn(relation_logits, relation_label) |
|
|
| return { |
| 'loss': loss, |
| 'sentiment_logits': sentiment_logits, |
| 'relation_logits': relation_logits, |
| 'attention_weights': att_w, |
| 'context_type_logits': ctx_type_logits, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class CISAModel: |
| """ |
| Kullanıcı arayüzü. AutoModel gerektirmez, trust_remote_code gerektirmez. |
| |
| from modeling_cisa import CISAModel |
| model = CISAModel.from_pretrained("dbbiyte/CISA-BERTurk-sentiment") |
| result = model.predict("Ali Bey'in vefatı...", "Ali Bey", 0, 7) |
| """ |
|
|
| SENTIMENT_LABELS = {0: "Negative", 1: "Neutral", 2: "Positive"} |
| RELATION_LABELS = {0: "Indirect", 1: "Direct"} |
|
|
| def __init__(self, model: PositionAwareDualEncoderEBSA, tokenizer): |
| self.model = model |
| self.tokenizer = tokenizer |
| self.device = next(model.parameters()).device |
|
|
| |
|
|
| @classmethod |
| def from_pretrained(cls, repo_id: str = "dbbiyte/CISA-BERTurk-sentiment", |
| device: Optional[torch.device] = None): |
| """ |
| HuggingFace repo'sundan model + tokenizer yükle. |
| pytorch_model.bin doğrudan state_dict olarak yüklenir. |
| """ |
| if device is None: |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| print(f"Tokenizer yükleniyor: {repo_id}") |
| tokenizer = AutoTokenizer.from_pretrained(repo_id) |
|
|
| print("Model mimarisi oluşturuluyor...") |
| core = PositionAwareDualEncoderEBSA( |
| model_name='dbmdz/bert-base-turkish-cased', |
| num_sentiment_labels=3, |
| dropout_rate=0.1, |
| use_r_drop=False, |
| stochastic_depth_rate=0.1, |
| ) |
|
|
| print("Ağırlıklar indiriliyor...") |
| weights_path = hf_hub_download(repo_id=repo_id, filename="pytorch_model.bin") |
| state_dict = torch.load(weights_path, map_location=device) |
|
|
| missing, unexpected = core.load_state_dict(state_dict, strict=False) |
| if missing: |
| print(f" Eksik key'ler : {missing}") |
| if unexpected: |
| print(f" Beklenmeyen key: {unexpected}") |
|
|
| core.to(device).eval() |
| print("Model hazır.") |
| return cls(core, tokenizer) |
|
|
| |
|
|
| @torch.no_grad() |
| def predict(self, text: str, entity_text: str, |
| entity_start: int, entity_end: int, |
| max_length: int = 256) -> dict: |
| """ |
| Args: |
| text: Tam metin pasajı. |
| entity_text: Kişi adı (metinde geçtiği haliyle). |
| entity_start: entity_text'in metindeki karakter başlangıç ofseti. |
| entity_end: entity_text'in metindeki karakter bitiş ofseti. |
| max_length: Maksimum token uzunluğu (varsayılan 256). |
| |
| Returns: |
| { |
| "sentiment": int, # 0=Negative 1=Neutral 2=Positive |
| "sentiment_label": str, |
| "sentiment_probs": list[float], |
| "relation": int, # 0=Indirect 1=Direct |
| "relation_label": str, |
| "relation_probs": list[float], |
| } |
| """ |
| |
| txt_tok = self.tokenizer( |
| text, padding='max_length', truncation=True, |
| max_length=max_length, return_tensors="pt", |
| return_token_type_ids=False, return_offsets_mapping=True) |
| offset_map = txt_tok.pop("offset_mapping")[0] |
|
|
| |
| context = text[entity_end: min(len(text), entity_end + 1800)] |
| ent_input = f"[CLS] {entity_text} [SEP] {context} [SEP]" |
| ent_tok = self.tokenizer( |
| ent_input, padding='max_length', truncation=True, |
| max_length=max_length // 2, return_tensors="pt", |
| return_token_type_ids=False) |
|
|
| |
| s_tok, e_tok = 0, 0 |
| for idx, (cs, ce) in enumerate(offset_map.tolist()): |
| if cs <= entity_start < ce: |
| s_tok = idx |
| if cs < entity_end <= ce: |
| e_tok = idx |
| break |
|
|
| pos_mask = [0] * max_length |
| for idx in range(s_tok, min(e_tok + 1, max_length)): |
| pos_mask[idx] = 1 |
|
|
| inputs = { |
| "text_input_ids": txt_tok["input_ids"].to(self.device), |
| "text_attention_mask": txt_tok["attention_mask"].to(self.device), |
| "entity_input_ids": ent_tok["input_ids"].to(self.device), |
| "entity_attention_mask": ent_tok["attention_mask"].to(self.device), |
| "entity_positions": [[s_tok, e_tok]], |
| "position_mask": [pos_mask], |
| } |
|
|
| out = self.model(**inputs) |
|
|
| s_probs = F.softmax(out["sentiment_logits"], -1)[0].cpu().tolist() |
| r_probs = F.softmax(out["relation_logits"], -1)[0].cpu().tolist() |
| s_pred = int(torch.argmax(out["sentiment_logits"], -1).item()) |
| r_pred = int(torch.argmax(out["relation_logits"], -1).item()) |
|
|
| return { |
| "sentiment": s_pred, |
| "sentiment_label": self.SENTIMENT_LABELS[s_pred], |
| "sentiment_probs": s_probs, |
| "relation": r_pred, |
| "relation_label": self.RELATION_LABELS[r_pred], |
| "relation_probs": r_probs, |
| } |