NeerajCodz's picture
fix: remaining v3 bugs - TFT n_feat detection, simulate URL, features from artifacts
b42ba2c
"""
src.models.deep.itransformer
=============================
iTransformer family models for SOH prediction (TensorFlow/Keras).
Architectures:
1. iTransformer β€” Feature-wise MHA β†’ Token-wise MHA β†’ Conv1D FF
2. Physics-Informed iTransformer β€” Dual-head with physics branch
3. Dynamic-Graph iTransformer β€” GNN fusion with dynamic adjacency
All models:
- Input: (batch, seq_len, n_features)
- Output: (batch, 1) β€” SOH prediction
"""
from __future__ import annotations
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# ═════════════════════════════════════════════════════════════════════════════
# Building blocks
# ═════════════════════════════════════════════════════════════════════════════
class FeatureWiseMHA(layers.Layer):
"""Feature-wise (inverted) Multi-Head Attention.
Transposes so that features attend to each other across time.
Input: (B, T, F) β†’ transpose to (B, F, T) β†’ MHA over F dim β†’ transpose back.
"""
def __init__(self, d_model: int, n_heads: int = 4, dropout: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.mha = layers.MultiHeadAttention(
num_heads=n_heads, key_dim=d_model // n_heads, dropout=dropout,
)
self.norm = layers.LayerNormalization()
self.dropout = layers.Dropout(dropout)
def build(self, input_shape):
super().build(input_shape)
def call(self, x, training=False):
# x: (B, T, F) β†’ transpose to (B, F, T) for feature-wise attention
x_t = tf.transpose(x, perm=[0, 2, 1]) # (B, F, T)
attn = self.mha(x_t, x_t, training=training)
attn = self.dropout(attn, training=training)
out = self.norm(x_t + attn)
return tf.transpose(out, perm=[0, 2, 1]) # back to (B, T, F)
class TokenWiseMHA(layers.Layer):
"""Token-wise (standard) Multi-Head Attention along time axis."""
def __init__(self, d_model: int, n_heads: int = 4, dropout: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.mha = layers.MultiHeadAttention(
num_heads=n_heads, key_dim=d_model // n_heads, dropout=dropout,
)
self.norm = layers.LayerNormalization()
self.dropout = layers.Dropout(dropout)
def build(self, input_shape):
super().build(input_shape)
def call(self, x, training=False):
attn = self.mha(x, x, training=training)
attn = self.dropout(attn, training=training)
return self.norm(x + attn)
class Conv1DFeedForward(layers.Layer):
"""Conv1D feed-forward network with residual connection."""
def __init__(self, d_model: int, d_ff: int | None = None, dropout: float = 0.1, **kwargs):
super().__init__(**kwargs)
d_ff = d_ff or d_model * 4
self.conv1 = layers.Conv1D(d_ff, kernel_size=1, activation="gelu")
self.conv2 = layers.Conv1D(d_model, kernel_size=1)
self.norm = layers.LayerNormalization()
self.dropout = layers.Dropout(dropout)
def build(self, input_shape):
super().build(input_shape)
def call(self, x, training=False):
ff = self.conv1(x)
ff = self.dropout(ff, training=training)
ff = self.conv2(ff)
ff = self.dropout(ff, training=training)
return self.norm(x + ff)
# ═════════════════════════════════════════════════════════════════════════════
# 1. iTransformer
# ═════════════════════════════════════════════════════════════════════════════
def build_itransformer(
seq_len: int,
n_features: int,
d_model: int = 64,
n_heads: int = 4,
n_blocks: int = 2,
d_ff: int = 256,
dropout: float = 0.1,
) -> keras.Model:
"""Build iTransformer model for SOH prediction.
Architecture: input β†’ [FeatureWise-MHA β†’ TokenWise-MHA β†’ Conv1D-FF] Γ— N β†’ GAP β†’ Dense β†’ 1
"""
inputs = keras.Input(shape=(seq_len, n_features), name="input_seq")
# Project features to d_model
x = layers.Dense(d_model, name="input_proj")(inputs)
for i in range(n_blocks):
x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"feat_mha_{i}")(x)
x = TokenWiseMHA(d_model, n_heads, dropout, name=f"token_mha_{i}")(x)
x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"conv_ff_{i}")(x)
# Global average pooling
x = layers.GlobalAveragePooling1D(name="gap")(x)
x = layers.Dense(128, activation="relu", name="fc1")(x)
x = layers.Dropout(dropout, name="fc_drop")(x)
output = layers.Dense(1, name="soh_output")(x)
model = keras.Model(inputs, output, name="iTransformer")
return model
# ═════════════════════════════════════════════════════════════════════════════
# 2. Physics-Informed iTransformer
# ═════════════════════════════════════════════════════════════════════════════
def build_physics_itransformer(
seq_len: int,
n_features: int,
d_model: int = 64,
n_heads: int = 4,
n_blocks: int = 2,
d_ff: int = 256,
dropout: float = 0.1,
lambda_phy: float = 0.3,
) -> keras.Model:
"""Physics-Informed iTransformer with dual output heads.
ML Head: iTransformer blocks β†’ Dense β†’ SOH_ml
Physics Head: |cumulative_current| β†’ MLP β†’ SOH_phy
Training loss: L = L_ml + Ξ»_phy Γ— L_phy
"""
inputs = keras.Input(shape=(seq_len, n_features), name="input_seq")
# ── ML Branch (iTransformer) ─────────────────
x = layers.Dense(d_model, name="ml_proj")(inputs)
for i in range(n_blocks):
x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"ml_feat_{i}")(x)
x = TokenWiseMHA(d_model, n_heads, dropout, name=f"ml_token_{i}")(x)
x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"ml_ff_{i}")(x)
x = layers.GlobalAveragePooling1D(name="ml_gap")(x)
x = layers.Dense(128, activation="relu", name="ml_fc")(x)
x = layers.Dropout(dropout)(x)
soh_ml = layers.Dense(1, name="soh_ml")(x)
# ── Physics Branch ───────────────────────────
# Extract current feature (index 1) β†’ abs cumulative sum
current = AbsCumCurrentLayer(name="abs_cum_current")(inputs)
p = layers.GlobalAveragePooling1D(name="phy_gap")(current)
p = layers.Dense(64, activation="relu", name="phy_fc1")(p)
p = layers.Dense(32, activation="relu", name="phy_fc2")(p)
soh_phy = layers.Dense(1, name="soh_phy")(p)
model = keras.Model(inputs, [soh_ml, soh_phy], name="PhysicsInformed_iTransformer")
return model
class AbsCumCurrentLayer(layers.Layer):
"""Extracts current feature (index 1) and computes abs cumulative sum."""
def call(self, x, training=False):
return tf.abs(tf.cumsum(x[:, :, 1:2], axis=1))
def get_config(self):
return super().get_config()
class PhysicsInformedLoss(keras.losses.Loss):
"""Combined ML + Physics loss with Ξ» weighting."""
def __init__(self, lambda_phy: float = 0.3, **kwargs):
super().__init__(**kwargs)
self.lambda_phy = lambda_phy
self.mae = keras.losses.MeanAbsoluteError()
def call(self, y_true, y_pred_list):
soh_ml, soh_phy = y_pred_list
loss_ml = self.mae(y_true, soh_ml)
loss_phy = self.mae(y_true, soh_phy)
return loss_ml + self.lambda_phy * loss_phy
# ═════════════════════════════════════════════════════════════════════════════
# 3. Dynamic-Graph iTransformer
# ═════════════════════════════════════════════════════════════════════════════
class DynamicGraphConv(layers.Layer):
"""Dynamic graph convolution with correlation-based adjacency."""
def __init__(self, d_model: int, **kwargs):
super().__init__(**kwargs)
self.proj = layers.Dense(d_model)
self.norm = layers.LayerNormalization()
def build(self, input_shape):
super().build(input_shape)
def call(self, x, training=False):
"""
x: (B, T, F) β€” compute feature correlation matrix (F, F) as adjacency
"""
# Feature-level correlation as adjacency
# x_t: (B, F, T) β†’ compute (B, F, F) correlation
x_t = tf.transpose(x, perm=[0, 2, 1])
x_norm = x_t - tf.reduce_mean(x_t, axis=-1, keepdims=True)
std = tf.math.reduce_std(x_t, axis=-1, keepdims=True) + 1e-8
x_norm = x_norm / std
adj = tf.matmul(x_norm, x_norm, transpose_b=True) / tf.cast(tf.shape(x)[-2], tf.float32)
adj = tf.nn.softmax(adj, axis=-1) # (B, F, F)
# Graph convolution: aggregate features
x_agg = tf.matmul(adj, x_t) # (B, F, T)
x_agg = tf.transpose(x_agg, perm=[0, 2, 1]) # (B, T, F)
out = self.proj(x_agg)
return self.norm(x + out)
def build_dynamic_graph_itransformer(
seq_len: int,
n_features: int,
d_model: int = 64,
n_heads: int = 4,
n_blocks: int = 2,
d_ff: int = 256,
dropout: float = 0.1,
) -> keras.Model:
"""Dynamic-Graph iTransformer with GNN-Transformer fusion.
Architecture: input β†’ DynGraphConv β†’ [FeatureWise-MHA β†’ TokenWise-MHA β†’ Conv1D-FF] Γ— N β†’ GAP β†’ Dense β†’ 1
"""
inputs = keras.Input(shape=(seq_len, n_features))
# Dynamic graph convolution
x = DynamicGraphConv(n_features, name="dyn_graph")(inputs)
# Project to d_model
x = layers.Dense(d_model, name="proj")(x)
for i in range(n_blocks):
x = FeatureWiseMHA(d_model, n_heads, dropout, name=f"dg_feat_{i}")(x)
x = TokenWiseMHA(d_model, n_heads, dropout, name=f"dg_token_{i}")(x)
x = Conv1DFeedForward(d_model, d_ff, dropout, name=f"dg_ff_{i}")(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dense(128, activation="relu")(x)
x = layers.Dropout(dropout)(x)
output = layers.Dense(1)(x)
return keras.Model(inputs, output, name="DynamicGraph_iTransformer")