yusr9
/

RADAR / modeling_radar.py
yusr9's picture
Duplicate from MahmoodAnaam/RADAR
d3ad200
Raw
History Blame Contribute Delete
8.74 kB
from collections import OrderedDict
from dataclasses import dataclass
from typing import Optional
import numpy as np
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModel, PreTrainedModel
from transformers.utils import ModelOutput
from .configuration_radar import RADARConfig
# ---------------------------------------------------------------------------
# Cross-Attention Fusion Layer
# ---------------------------------------------------------------------------
class CrossAttentionFusion(torch.nn.Module):
"""
Fuses semantic (ARSE) and stylometric (SLPE) representations.
The semantic embedding attends to the projected style embedding,
then the attended output is concatenated with the original style
features and passed through a projection MLP.
"""
def __init__(
self,
semantic_dim: int,
style_dim: int,
style_proj_dim: int,
fusion_dim: int,
num_heads: int = 8,
dropout: float = 0.1,
):
super().__init__()
# Project style features to semantic_dim for attention compatibility
self.style_to_semantic = torch.nn.Linear(style_proj_dim, semantic_dim)
# Multi-head cross-attention: query = semantic, key/value = projected style
self.cross_attn = torch.nn.MultiheadAttention(
embed_dim=semantic_dim,
num_heads=num_heads,
dropout=dropout,
batch_first=True,
)
self.attn_norm = torch.nn.LayerNorm(semantic_dim)
# Fusion MLP: concat attended semantic + raw style projection → fusion_dim
self.fusion_mlp = torch.nn.Sequential(
torch.nn.Linear(semantic_dim + style_proj_dim, fusion_dim),
torch.nn.LayerNorm(fusion_dim),
torch.nn.GELU(),
torch.nn.Dropout(dropout),
)
def forward(
self, h_semantic: torch.Tensor, h_style_proj: torch.Tensor
) -> torch.Tensor:
"""
Parameters
----------
h_semantic : [B, semantic_dim] CLS embedding from ARSE
h_style_proj : [B, style_proj_dim] projected stylometric features
Returns
-------
h_fused : [B, fusion_dim]
"""
# Project style to match semantic dimension for attention
style_as_semantic = self.style_to_semantic(h_style_proj) # [B, semantic_dim]
# Add sequence dimension: [B, 1, dim]
query = h_semantic.unsqueeze(1)
kv = style_as_semantic.unsqueeze(1)
# Cross-attention: semantic query attends to style key/value
attended, _ = self.cross_attn(query, kv, kv)
attended = attended.squeeze(1) # [B, semantic_dim]
# Residual connection + norm
attended = self.attn_norm(attended + h_semantic)
# Concatenate attended semantic + original style projection
h_concat = torch.cat(
[attended, h_style_proj], dim=-1
) # [B, semantic_dim + style_proj_dim]
# Project to fusion dimension
h_fused = self.fusion_mlp(h_concat) # [B, fusion_dim]
return h_fused
# ---------------------------------------------------------------------------
# RADAR Model
# ---------------------------------------------------------------------------
@dataclass
class RADAROutput(ModelOutput):
"""
Output dataclass for RADARModel forward pass.
"""
logits: torch.FloatTensor = None
h_fused: Optional[torch.FloatTensor] = None # exposed for auxiliary losses
class RADARModel(PreTrainedModel):
"""
Full RADAR detection model.
Components:
1. ARSE: ModernBERT-large encoder → h_semantic (hidden_size)
2. SLPE: Learnable projection of 38 stylometric features → h_style (128)
3. CrossAttentionFusion: h_semantic × h_style → h_fused (512)
Forward returns a dict with keys:
- logits raw logit from primary head
"""
config_class = RADARConfig
all_tied_weights_keys = OrderedDict()
base_model_prefix = "encoder"
supports_gradient_checkpointing = True
def __init__(self, config: RADARConfig):
super().__init__(config)
# --- ARSE: Semantic Encoder ---
encoder_config = AutoConfig.from_pretrained(config.base_model_name)
self.encoder = AutoModel.from_config(encoder_config)
hidden_size = encoder_config.hidden_size
# --- SLPE: Stylometric Feature Projection ---
self.style_proj = nn.Sequential(
nn.Linear(config.style_dim, config.style_proj_dim),
nn.LayerNorm(config.style_proj_dim),
nn.GELU(),
nn.Dropout(config.dropout),
)
# --- Cross-Attention Fusion ---
self.fusion = CrossAttentionFusion(
semantic_dim=hidden_size,
style_dim=config.style_dim,
style_proj_dim=config.style_proj_dim,
fusion_dim=config.fusion_dim,
num_heads=config.num_attention_heads,
dropout=config.dropout,
)
# --- UACC: Dual-Head Classifier ---
self.classifier = nn.Linear(config.fusion_dim, 1)
self.post_init()
def _init_weights(self, module: nn.Module) -> None:
if isinstance(module, nn.Linear):
nn.init.trunc_normal_(module.weight, std=0.02)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.LayerNorm):
nn.init.ones_(module.weight)
nn.init.zeros_(module.bias)
def get_encoder(self) -> nn.Module:
return self.encoder
def encode(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
style_features: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
Encode text (and optionally style features) into a fused representation.
Returns h_fused : [B, fusion_dim]
"""
# Run transformer encoder
encoder_output = self.encoder(
input_ids=input_ids,
attention_mask=attention_mask,
)
# Use CLS token representation
h_semantic = encoder_output.last_hidden_state[:, 0, :] # [B, hidden_size]
if style_features is not None:
# Project stylometric features
h_style_proj = self.style_proj(style_features) # [B, style_proj_dim]
# Fuse
h_fused = self.fusion(h_semantic, h_style_proj) # [B, fusion_dim]
else:
# If no style features provided, project semantic directly
B, H = h_semantic.shape
dummy_style = h_semantic.new_zeros(B, self.config.style_proj_dim)
h_fused = self.fusion(h_semantic, dummy_style)
return h_fused
def forward(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
style_features: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
**kwargs,
) -> RADAROutput:
"""
Forward pass.
Parameters
----------
input_ids : [B, seq_len]
attention_mask : [B, seq_len]
style_features : [B, 38] optional stylometric features
labels : [B] float labels 0.0 / 1.0 for loss computation
Returns
-------
RADAROutput with fields:
- loss (if labels provided) weighted BCE on calibrated score
- logits raw logit from primary head (before sigmoid)
- score calibrated probability score
- h_fused fused representation (exposed for auxiliary losses)
"""
h_fused = self.encode(input_ids, attention_mask, style_features)
# Primary classification logit
logit = self.classifier(h_fused).squeeze(-1) # [B]
return RADAROutput(
logits=logit,
h_fused=h_fused, # exposed for AIT triplet / invariance losses
)
@classmethod
def from_pretrained_encoder(
cls, encoder_name: str = "answerdotai/ModernBERT-large", **kwargs
) -> "RADARModel":
"""
Create a RADAR model and load pretrained encoder weights.
All other weights are randomly initialized.
"""
config = RADARConfig(base_model_name=encoder_name, **kwargs)
model = cls(config)
encoder = AutoModel.from_pretrained(encoder_name)
model.encoder.load_state_dict(encoder.state_dict(), strict=False)
return model