Spaces:
Build error
Build error
| import os | |
| from typing import Optional, Union, Tuple, Dict, Any, Literal | |
| import numpy as np | |
| try: | |
| import tensorflow as tf | |
| from tensorflow.keras import layers, models, optimizers, callbacks | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.layers import ( | |
| Input, Embedding, Dense, Dropout, GlobalMaxPooling1D, | |
| Conv1D, LSTM, GRU, Bidirectional, Attention, GlobalAveragePooling1D | |
| ) | |
| TF_AVAILABLE = True | |
| except ImportError: | |
| TF_AVAILABLE = False | |
| try: | |
| import torch | |
| import torch.nn as nn | |
| from torch.nn.utils.rnn import pad_sequence | |
| from transformers import ( | |
| AutoTokenizer, AutoModel, AutoConfig, | |
| BertForSequenceClassification, RobertaForSequenceClassification, | |
| DistilBertForSequenceClassification, Trainer, TrainingArguments | |
| ) | |
| from transformers.tokenization_utils_base import BatchEncoding | |
| TORCH_AVAILABLE = True | |
| except ImportError: | |
| TORCH_AVAILABLE = False | |
| class AttentionLayer(tf.keras.layers.Layer): | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| def build(self, input_shape): | |
| self.W = self.add_weight( | |
| shape=(input_shape[-1], 1), | |
| initializer='random_normal', | |
| trainable=True, | |
| name='attention_weight' | |
| ) | |
| self.b = self.add_weight( | |
| shape=(input_shape[1], 1), | |
| initializer='zeros', | |
| trainable=True, | |
| name='attention_bias' | |
| ) | |
| super().build(input_shape) | |
| def call(self, inputs, **kwargs): | |
| e = tf.keras.activations.tanh(tf.matmul(inputs, self.W) + self.b) | |
| e = tf.squeeze(e, axis=-1) | |
| a = tf.nn.softmax(e, axis=1) | |
| a = tf.expand_dims(a, axis=-1) | |
| weighted_input = inputs * a | |
| return tf.reduce_sum(weighted_input, axis=1) | |
| def build_mlp( | |
| input_dim: int, | |
| num_classes: int, | |
| hidden_dims: list = [256, 128], | |
| dropout: float = 0.3, | |
| activation: str = 'relu' | |
| ) -> 'tf.keras.Model': | |
| if not TF_AVAILABLE: | |
| raise ImportError("TensorFlow not available") | |
| inputs = Input(shape=(input_dim,)) | |
| x = inputs | |
| for dim in hidden_dims: | |
| x = Dense(dim, activation=activation)(x) | |
| x = Dropout(dropout)(x) | |
| outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) | |
| return models.Model(inputs, outputs) | |
| def build_kim_cnn( | |
| max_len: int, | |
| vocab_size: int, | |
| embed_dim: int, | |
| num_classes: int, | |
| filter_sizes: list = [3, 4, 5], | |
| num_filters: int = 100, | |
| dropout: float = 0.5, | |
| pre_embed_matrix: Optional[np.ndarray] = None | |
| ) -> 'tf.keras.Model': | |
| if not TF_AVAILABLE: | |
| raise ImportError("TensorFlow not available") | |
| inputs = Input(shape=(max_len,)) | |
| if pre_embed_matrix is not None: | |
| embedding = Embedding( | |
| vocab_size, embed_dim, | |
| weights=[pre_embed_matrix], | |
| trainable=False | |
| )(inputs) | |
| else: | |
| embedding = Embedding(vocab_size, embed_dim)(inputs) | |
| pooled_outputs = [] | |
| for fs in filter_sizes: | |
| x = Conv1D(num_filters, fs, activation='relu')(embedding) | |
| x = GlobalMaxPooling1D()(x) | |
| pooled_outputs.append(x) | |
| merged = tf.concat(pooled_outputs, axis=1) | |
| x = Dropout(dropout)(merged) | |
| outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) | |
| return models.Model(inputs, outputs) | |
| def build_lstm( | |
| max_len: int, | |
| vocab_size: int, | |
| embed_dim: int, | |
| num_classes: int, | |
| lstm_units: int = 128, | |
| dropout: float = 0.3, | |
| bidirectional: bool = False, | |
| pre_embed_matrix: Optional[np.ndarray] = None | |
| ) -> 'tf.keras.Model': | |
| if not TF_AVAILABLE: | |
| raise ImportError("TensorFlow not available") | |
| inputs = Input(shape=(max_len,)) | |
| if pre_embed_matrix is not None: | |
| x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) | |
| else: | |
| x = Embedding(vocab_size, embed_dim)(inputs) | |
| rnn_layer = LSTM(lstm_units, dropout=dropout, recurrent_dropout=dropout) | |
| if bidirectional: | |
| x = Bidirectional(rnn_layer)(x) | |
| else: | |
| x = rnn_layer(x) | |
| outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) | |
| return models.Model(inputs, outputs) | |
| def build_cnn_lstm( | |
| max_len: int, | |
| vocab_size: int, | |
| embed_dim: int, | |
| num_classes: int, | |
| filter_size: int = 3, | |
| num_filters: int = 128, | |
| lstm_units: int = 64, | |
| dropout: float = 0.3, | |
| pre_embed_matrix: Optional[np.ndarray] = None | |
| ) -> 'tf.keras.Model': | |
| if not TF_AVAILABLE: | |
| raise ImportError("TensorFlow not available") | |
| inputs = Input(shape=(max_len,)) | |
| if pre_embed_matrix is not None: | |
| x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) | |
| else: | |
| x = Embedding(vocab_size, embed_dim)(inputs) | |
| x = Conv1D(num_filters, filter_size, activation='relu', padding='same')(x) | |
| x = LSTM(lstm_units, dropout=dropout)(x) | |
| outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) | |
| return models.Model(inputs, outputs) | |
| def build_birnn_attention( | |
| max_len: int, | |
| vocab_size: int, | |
| embed_dim: int, | |
| num_classes: int, | |
| rnn_units: int = 64, | |
| dropout: float = 0.3, | |
| pre_embed_matrix: Optional[np.ndarray] = None | |
| ) -> 'tf.keras.Model': | |
| if not TF_AVAILABLE: | |
| raise ImportError("TensorFlow not available") | |
| inputs = Input(shape=(max_len,)) | |
| if pre_embed_matrix is not None: | |
| x = Embedding(vocab_size, embed_dim, weights=[pre_embed_matrix], trainable=False)(inputs) | |
| else: | |
| x = Embedding(vocab_size, embed_dim)(inputs) | |
| x = Bidirectional(LSTM(rnn_units, return_sequences=True, dropout=dropout))(x) | |
| x = AttentionLayer()(x) | |
| outputs = Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')(x) | |
| return models.Model(inputs, outputs) | |
| _RUSSIAN_TRANSFORMERS = { | |
| "rubert": "DeepPavlov/rubert-base-cased", | |
| "ruroberta": "sberbank-ai/ruRoberta-large", | |
| "distilbert-multilingual": "distilbert-base-multilingual-cased" | |
| } | |
| def get_transformer_classifier( | |
| model_name: str = "rubert", | |
| num_classes: int = 2, | |
| problem_type: Literal["single_label", "multi_label"] = "single_label" | |
| ) -> Tuple[Any, Any]: | |
| if not TORCH_AVAILABLE: | |
| raise ImportError("PyTorch or transformers not available") | |
| if model_name not in _RUSSIAN_TRANSFORMERS: | |
| raise ValueError(f"Unknown model_name. Choose from: {list(_RUSSIAN_TRANSFORMERS.keys())}") | |
| model_id = _RUSSIAN_TRANSFORMERS[model_name] | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| if "roberta" in model_id.lower(): | |
| model = RobertaForSequenceClassification.from_pretrained( | |
| model_id, num_labels=num_classes | |
| ) | |
| elif "distilbert" in model_id.lower(): | |
| model = DistilBertForSequenceClassification.from_pretrained( | |
| model_id, num_labels=num_classes | |
| ) | |
| else: | |
| model = BertForSequenceClassification.from_pretrained( | |
| model_id, num_labels=num_classes | |
| ) | |
| if problem_type == "multi_label": | |
| model.config.problem_type = "multi_label_classification" | |
| else: | |
| model.config.problem_type = "single_label_classification" | |
| return model, tokenizer | |
| def quantize_pytorch_model(model: 'torch.nn.Module', backend: str = "qnnpack") -> 'torch.nn.Module': | |
| if not TORCH_AVAILABLE: | |
| raise ImportError("PyTorch not available") | |
| model.eval() | |
| model.qconfig = torch.quantization.get_default_qconfig(backend) | |
| torch.quantization.prepare(model, inplace=True) | |
| torch.quantization.convert(model, inplace=True) | |
| return model | |
| def prune_keras_model(model: 'tf.keras.Model', sparsity: float = 0.5) -> 'tf.keras.Model': | |
| try: | |
| import tensorflow_model_optimization as tfmot | |
| except ImportError: | |
| raise ImportError("Install tensorflow-model-optimization for pruning") | |
| pruning_params = { | |
| 'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay( | |
| initial_sparsity=0.0, final_sparsity=sparsity, begin_step=0, end_step=1000 | |
| ) | |
| } | |
| model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params) | |
| return model_for_pruning | |
| def prepare_keras_inputs( | |
| texts: list, | |
| tokenizer=None, | |
| max_len: int = 128, | |
| vocab: Optional[dict] = None | |
| ) -> np.ndarray: | |
| if tokenizer is not None: | |
| encodings = tokenizer(texts, truncation=True, padding=True, max_length=max_len, return_tensors="np") | |
| return encodings['input_ids'] | |
| else: | |
| from tensorflow.keras.preprocessing.text import Tokenizer | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| tk = Tokenizer(oov_token="<OOV>") | |
| if vocab: | |
| tk.word_index = vocab | |
| else: | |
| tk.fit_on_texts(texts) | |
| sequences = tk.texts_to_sequences(texts) | |
| return pad_sequences(sequences, maxlen=max_len) | |
| def compile_keras_model( | |
| model: 'tf.keras.Model', | |
| learning_rate: float = 2e-5, | |
| num_classes: int = 2 | |
| ): | |
| loss = 'sparse_categorical_crossentropy' if num_classes > 2 else 'binary_crossentropy' | |
| model.compile( | |
| optimizer=optimizers.Adam(learning_rate=learning_rate), | |
| loss=loss, | |
| metrics=['accuracy'] | |
| ) | |
| return model |