Spaces:
Running
Running
| """ | |
| src.models.deep.itransformer | |
| ============================= | |
| iTransformer family models for SOH prediction (TensorFlow/Keras). | |
| Architectures: | |
| 1. iTransformer β Feature-wise MHA β Token-wise MHA β Conv1D FF | |
| 2. Physics-Informed iTransformer β Dual-head with physics branch | |
| 3. Dynamic-Graph iTransformer β GNN fusion with dynamic adjacency | |
| All models: | |
| - Input: (batch, seq_len, n_features) | |
| - Output: (batch, 1) β SOH prediction | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Building blocks | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FeatureWiseMHA(layers.Layer): | |
| """Feature-wise (inverted) Multi-Head Attention. | |
| Transposes so that features attend to each other across time. | |
| Input: (B, T, F) β transpose to (B, F, T) β MHA over F dim β transpose back. | |
| """ | |
| def __init__(self, d_model: int, n_heads: int = 4, dropout: float = 0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.mha = layers.MultiHeadAttention( | |
| num_heads=n_heads, key_dim=d_model // n_heads, dropout=dropout, | |
| ) | |
| self.norm = layers.LayerNormalization() | |
| self.dropout = layers.Dropout(dropout) | |
| def build(self, input_shape): | |
| super().build(input_shape) | |
| def call(self, x, training=False): | |
| # x: (B, T, F) β transpose to (B, F, T) for feature-wise attention | |
| x_t = tf.transpose(x, perm=[0, 2, 1]) # (B, F, T) | |
| attn = self.mha(x_t, x_t, training=training) | |
| attn = self.dropout(attn, training=training) | |
| out = self.norm(x_t + attn) | |
| return tf.transpose(out, perm=[0, 2, 1]) # back to (B, T, F) | |
| class TokenWiseMHA(layers.Layer): | |
| """Token-wise (standard) Multi-Head Attention along time axis.""" | |
| def __init__(self, d_model: int, n_heads: int = 4, dropout: float = 0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.mha = layers.MultiHeadAttention( | |
| num_heads=n_heads, key_dim=d_model // n_heads, dropout=dropout, | |
| ) | |
| self.norm = layers.LayerNormalization() | |
| self.dropout = layers.Dropout(dropout) | |
| def build(self, input_shape): | |
| super().build(input_shape) | |
| def call(self, x, training=False): | |
| attn = self.mha(x, x, training=training) | |
| attn = self.dropout(attn, training=training) | |
| return self.norm(x + attn) | |
| class Conv1DFeedForward(layers.Layer): | |
| """Conv1D feed-forward network with residual connection.""" | |
| def __init__(self, d_model: int, d_ff: int | None = None, dropout: float = 0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| d_ff = d_ff or d_model * 4 | |
| self.conv1 = layers.Conv1D(d_ff, kernel_size=1, activation="gelu") | |
| self.conv2 = layers.Conv1D(d_model, kernel_size=1) | |
| self.norm = layers.LayerNormalization() | |
| self.dropout = layers.Dropout(dropout) | |
| def build(self, input_shape): | |
| super().build(input_shape) | |
| def call(self, x, training=False): | |
| ff = self.conv1(x) | |
| ff = self.dropout(ff, training=training) | |
| ff = self.conv2(ff) | |
| ff = self.dropout(ff, training=training) | |
| return self.norm(x + ff) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. iTransformer | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_itransformer( | |
| seq_len: int, | |
| n_features: int, | |
| d_model: int = 64, | |
| n_heads: int = 4, | |
| n_blocks: int = 2, | |
| d_ff: int = 256, | |
| dropout: float = 0.1, | |
| ) -> keras.Model: | |
| """Build iTransformer model for SOH prediction. | |
| Architecture: input β [FeatureWise-MHA β TokenWise-MHA β Conv1D-FF] Γ N β GAP β Dense β 1 | |
| """ | |
| inputs = keras.Input(shape=(seq_len, n_features), name="input_seq") | |
| # Project features to d_model | |
| x = layers.Dense(d_model, name="input_proj")(inputs) | |
| for i in range(n_blocks): | |
| x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"feat_mha_{i}")(x) | |
| x = TokenWiseMHA(d_model, n_heads, dropout, name=f"token_mha_{i}")(x) | |
| x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"conv_ff_{i}")(x) | |
| # Global average pooling | |
| x = layers.GlobalAveragePooling1D(name="gap")(x) | |
| x = layers.Dense(128, activation="relu", name="fc1")(x) | |
| x = layers.Dropout(dropout, name="fc_drop")(x) | |
| output = layers.Dense(1, name="soh_output")(x) | |
| model = keras.Model(inputs, output, name="iTransformer") | |
| return model | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. Physics-Informed iTransformer | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_physics_itransformer( | |
| seq_len: int, | |
| n_features: int, | |
| d_model: int = 64, | |
| n_heads: int = 4, | |
| n_blocks: int = 2, | |
| d_ff: int = 256, | |
| dropout: float = 0.1, | |
| lambda_phy: float = 0.3, | |
| ) -> keras.Model: | |
| """Physics-Informed iTransformer with dual output heads. | |
| ML Head: iTransformer blocks β Dense β SOH_ml | |
| Physics Head: |cumulative_current| β MLP β SOH_phy | |
| Training loss: L = L_ml + Ξ»_phy Γ L_phy | |
| """ | |
| inputs = keras.Input(shape=(seq_len, n_features), name="input_seq") | |
| # ββ ML Branch (iTransformer) βββββββββββββββββ | |
| x = layers.Dense(d_model, name="ml_proj")(inputs) | |
| for i in range(n_blocks): | |
| x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"ml_feat_{i}")(x) | |
| x = TokenWiseMHA(d_model, n_heads, dropout, name=f"ml_token_{i}")(x) | |
| x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"ml_ff_{i}")(x) | |
| x = layers.GlobalAveragePooling1D(name="ml_gap")(x) | |
| x = layers.Dense(128, activation="relu", name="ml_fc")(x) | |
| x = layers.Dropout(dropout)(x) | |
| soh_ml = layers.Dense(1, name="soh_ml")(x) | |
| # ββ Physics Branch βββββββββββββββββββββββββββ | |
| # Extract current feature (index 1) β abs cumulative sum | |
| current = AbsCumCurrentLayer(name="abs_cum_current")(inputs) | |
| p = layers.GlobalAveragePooling1D(name="phy_gap")(current) | |
| p = layers.Dense(64, activation="relu", name="phy_fc1")(p) | |
| p = layers.Dense(32, activation="relu", name="phy_fc2")(p) | |
| soh_phy = layers.Dense(1, name="soh_phy")(p) | |
| model = keras.Model(inputs, [soh_ml, soh_phy], name="PhysicsInformed_iTransformer") | |
| return model | |
| class AbsCumCurrentLayer(layers.Layer): | |
| """Extracts current feature (index 1) and computes abs cumulative sum.""" | |
| def call(self, x, training=False): | |
| return tf.abs(tf.cumsum(x[:, :, 1:2], axis=1)) | |
| def get_config(self): | |
| return super().get_config() | |
| class PhysicsInformedLoss(keras.losses.Loss): | |
| """Combined ML + Physics loss with Ξ» weighting.""" | |
| def __init__(self, lambda_phy: float = 0.3, **kwargs): | |
| super().__init__(**kwargs) | |
| self.lambda_phy = lambda_phy | |
| self.mae = keras.losses.MeanAbsoluteError() | |
| def call(self, y_true, y_pred_list): | |
| soh_ml, soh_phy = y_pred_list | |
| loss_ml = self.mae(y_true, soh_ml) | |
| loss_phy = self.mae(y_true, soh_phy) | |
| return loss_ml + self.lambda_phy * loss_phy | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. Dynamic-Graph iTransformer | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DynamicGraphConv(layers.Layer): | |
| """Dynamic graph convolution with correlation-based adjacency.""" | |
| def __init__(self, d_model: int, **kwargs): | |
| super().__init__(**kwargs) | |
| self.proj = layers.Dense(d_model) | |
| self.norm = layers.LayerNormalization() | |
| def build(self, input_shape): | |
| super().build(input_shape) | |
| def call(self, x, training=False): | |
| """ | |
| x: (B, T, F) β compute feature correlation matrix (F, F) as adjacency | |
| """ | |
| # Feature-level correlation as adjacency | |
| # x_t: (B, F, T) β compute (B, F, F) correlation | |
| x_t = tf.transpose(x, perm=[0, 2, 1]) | |
| x_norm = x_t - tf.reduce_mean(x_t, axis=-1, keepdims=True) | |
| std = tf.math.reduce_std(x_t, axis=-1, keepdims=True) + 1e-8 | |
| x_norm = x_norm / std | |
| adj = tf.matmul(x_norm, x_norm, transpose_b=True) / tf.cast(tf.shape(x)[-2], tf.float32) | |
| adj = tf.nn.softmax(adj, axis=-1) # (B, F, F) | |
| # Graph convolution: aggregate features | |
| x_agg = tf.matmul(adj, x_t) # (B, F, T) | |
| x_agg = tf.transpose(x_agg, perm=[0, 2, 1]) # (B, T, F) | |
| out = self.proj(x_agg) | |
| return self.norm(x + out) | |
| def build_dynamic_graph_itransformer( | |
| seq_len: int, | |
| n_features: int, | |
| d_model: int = 64, | |
| n_heads: int = 4, | |
| n_blocks: int = 2, | |
| d_ff: int = 256, | |
| dropout: float = 0.1, | |
| ) -> keras.Model: | |
| """Dynamic-Graph iTransformer with GNN-Transformer fusion. | |
| Architecture: input β DynGraphConv β [FeatureWise-MHA β TokenWise-MHA β Conv1D-FF] Γ N β GAP β Dense β 1 | |
| """ | |
| inputs = keras.Input(shape=(seq_len, n_features)) | |
| # Dynamic graph convolution | |
| x = DynamicGraphConv(n_features, name="dyn_graph")(inputs) | |
| # Project to d_model | |
| x = layers.Dense(d_model, name="proj")(x) | |
| for i in range(n_blocks): | |
| x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"dg_feat_{i}")(x) | |
| x = TokenWiseMHA(d_model, n_heads, dropout, name=f"dg_token_{i}")(x) | |
| x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"dg_ff_{i}")(x) | |
| x = layers.GlobalAveragePooling1D()(x) | |
| x = layers.Dense(128, activation="relu")(x) | |
| x = layers.Dropout(dropout)(x) | |
| output = layers.Dense(1)(x) | |
| return keras.Model(inputs, output, name="DynamicGraph_iTransformer") | |