import os from typing import Optional, Union, Tuple, Dict, Any, Literal import numpy as np try: import tensorflow as tf from tensorflow.keras import layers, models, optimizers, callbacks from tensorflow.keras.models import Model from tensorflow.keras.layers import ( Input, Embedding, Dense, Dropout, GlobalMaxPooling1D, Conv1D, LSTM, GRU, Bidirectional, Attention, GlobalAveragePooling1D ) TF_AVAILABLE = True except ImportError: TF_AVAILABLE = False try: import torch import torch.nn as nn from torch.nn.utils.rnn import pad_sequence from transformers import ( AutoTokenizer, AutoModel, AutoConfig, BertForSequenceClassification, RobertaForSequenceClassification, DistilBertForSequenceClassification, Trainer, TrainingArguments ) from transformers.tokenization_utils_base import BatchEncoding TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False class AttentionLayer(tf.keras.layers.Layer): def __init__(self, **kwargs): super().__init__(**kwargs) def build(self, input_shape): self.W = self.add_weight( shape=(input_shape[-1], 1), initializer='random_normal', trainable=True, name='attention_weight' ) self.b = self.add_weight( shape=(input_shape[1], 1), initializer='zeros', trainable=True, name='attention_bias' ) super().build(input_shape) def call(self, inputs, **kwargs): e = tf.keras.activations.tanh(tf.matmul(inputs, self.W) + self.b) e = tf.squeeze(e, axis=-1) a = tf.nn.softmax(e, axis=1) a = tf.expand_dims(a, axis=-1) weighted_input = inputs * a return tf.reduce_sum(weighted_input, axis=1) def build_mlp( input_dim: int, num_classes: int, hidden_dims: list = [256, 128], dropout: float = 0.3, activation: str = 'relu' ) -> 'tf.keras.Model': if not TF_AVAILABLE: raise ImportError("TensorFlow not available") inputs = Input(shape=(input_dim,)) x = inputs for dim in hidden_dims: x = Dense(dim, activation=activation)(x) x = Dropout(dropout)(x) outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) return models.Model(inputs, outputs) def build_kim_cnn( max_len: int, vocab_size: int, embed_dim: int, num_classes: int, filter_sizes: list = [3, 4, 5], num_filters: int = 100, dropout: float = 0.5, pre_embed_matrix: Optional[np.ndarray] = None ) -> 'tf.keras.Model': if not TF_AVAILABLE: raise ImportError("TensorFlow not available") inputs = Input(shape=(max_len,)) if pre_embed_matrix is not None: embedding = Embedding( vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False )(inputs) else: embedding = Embedding(vocab_size, embed_dim)(inputs) pooled_outputs = [] for fs in filter_sizes: x = Conv1D(num_filters, fs, activation='relu')(embedding) x = GlobalMaxPooling1D()(x) pooled_outputs.append(x) merged = tf.concat(pooled_outputs, axis=1) x = Dropout(dropout)(merged) outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) return models.Model(inputs, outputs) def build_lstm( max_len: int, vocab_size: int, embed_dim: int, num_classes: int, lstm_units: int = 128, dropout: float = 0.3, bidirectional: bool = False, pre_embed_matrix: Optional[np.ndarray] = None ) -> 'tf.keras.Model': if not TF_AVAILABLE: raise ImportError("TensorFlow not available") inputs = Input(shape=(max_len,)) if pre_embed_matrix is not None: x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) else: x = Embedding(vocab_size, embed_dim)(inputs) rnn_layer = LSTM(lstm_units, dropout=dropout, recurrent_dropout=dropout) if bidirectional: x = Bidirectional(rnn_layer)(x) else: x = rnn_layer(x) outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) return models.Model(inputs, outputs) def build_cnn_lstm( max_len: int, vocab_size: int, embed_dim: int, num_classes: int, filter_size: int = 3, num_filters: int = 128, lstm_units: int = 64, dropout: float = 0.3, pre_embed_matrix: Optional[np.ndarray] = None ) -> 'tf.keras.Model': if not TF_AVAILABLE: raise ImportError("TensorFlow not available") inputs = Input(shape=(max_len,)) if pre_embed_matrix is not None: x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) else: x = Embedding(vocab_size, embed_dim)(inputs) x = Conv1D(num_filters, filter_size, activation='relu', padding='same')(x) x = LSTM(lstm_units, dropout=dropout)(x) outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) return models.Model(inputs, outputs) def build_birnn_attention( max_len: int, vocab_size: int, embed_dim: int, num_classes: int, rnn_units: int = 64, dropout: float = 0.3, pre_embed_matrix: Optional[np.ndarray] = None ) -> 'tf.keras.Model': if not TF_AVAILABLE: raise ImportError("TensorFlow not available") inputs = Input(shape=(max_len,)) if pre_embed_matrix is not None: x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) else: x = Embedding(vocab_size, embed_dim)(inputs) x = Bidirectional(LSTM(rnn_units, return_sequences=True, dropout=dropout))(x) x = AttentionLayer()(x) outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) return models.Model(inputs, outputs) _RUSSIAN_TRANSFORMERS = { "rubert": "DeepPavlov/rubert-base-cased", "ruroberta": "sberbank-ai/ruRoberta-large", "distilbert-multilingual": "distilbert-base-multilingual-cased" } def get_transformer_classifier( model_name: str = "rubert", num_classes: int = 2, problem_type: Literal["single_label", "multi_label"] = "single_label" ) -> Tuple[Any, Any]: if not TORCH_AVAILABLE: raise ImportError("PyTorch or transformers not available") if model_name not in _RUSSIAN_TRANSFORMERS: raise ValueError(f"Unknown model_name. Choose from: {list(_RUSSIAN_TRANSFORMERS.keys())}") model_id = _RUSSIAN_TRANSFORMERS[model_name] tokenizer = AutoTokenizer.from_pretrained(model_id) if "roberta" in model_id.lower(): model = RobertaForSequenceClassification.from_pretrained( model_id, num_labels=num_classes ) elif "distilbert" in model_id.lower(): model = DistilBertForSequenceClassification.from_pretrained( model_id, num_labels=num_classes ) else: model = BertForSequenceClassification.from_pretrained( model_id, num_labels=num_classes ) if problem_type == "multi_label": model.config.problem_type = "multi_label_classification" else: model.config.problem_type = "single_label_classification" return model, tokenizer def quantize_pytorch_model(model: 'torch.nn.Module', backend: str = "qnnpack") -> 'torch.nn.Module': if not TORCH_AVAILABLE: raise ImportError("PyTorch not available") model.eval() model.qconfig = torch.quantization.get_default_qconfig(backend) torch.quantization.prepare(model, inplace=True) torch.quantization.convert(model, inplace=True) return model def prune_keras_model(model: 'tf.keras.Model', sparsity: float = 0.5) -> 'tf.keras.Model': try: import tensorflow_model_optimization as tfmot except ImportError: raise ImportError("Install tensorflow-model-optimization for pruning") pruning_params = { 'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay( initial_sparsity=0.0, final_sparsity=sparsity, begin_step=0, end_step=1000 ) } model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params) return model_for_pruning def prepare_keras_inputs( texts: list, tokenizer=None, max_len: int = 128, vocab: Optional[dict] = None ) -> np.ndarray: if tokenizer is not None: encodings = tokenizer(texts, truncation=True, padding=True, max_length=max_len, return_tensors="np") return encodings['input_ids'] else: from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences tk = Tokenizer(oov_token="") if vocab: tk.word_index = vocab else: tk.fit_on_texts(texts) sequences = tk.texts_to_sequences(texts) return pad_sequences(sequences, maxlen=max_len) def compile_keras_model( model: 'tf.keras.Model', learning_rate: float = 2e-5, num_classes: int = 2 ): loss = 'sparse_categorical_crossentropy' if num_classes > 2 else 'binary_crossentropy' model.compile( optimizer=optimizers.Adam(learning_rate=learning_rate), loss=loss, metrics=['accuracy'] ) return model