DJF-on-arm's picture
Update app.py
b573147 verified
# why does this look like its going to train a model? its not, its missing a main loops and model.fit!
import gradio as gr
import keras
import tensorflow as tf
import numpy as np
import pickle
import os
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Concatenate
#globals required
VOCAB_SIZE = 13000
MAX_LEN = 32
LATENT_DIM = 256
temp = 0.5
topk = 40
MoE = False
#reg custom classes
@keras.saving.register_keras_serializable(package="Custom")
class MaskLayer(tf.keras.layers.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.supports_masking = True
def call(self, inputs):
return inputs[0]
def compute_mask(self, inputs, mask=None):
if mask is not None:
return mask[0]
return None
@keras.saving.register_keras_serializable(package="Custom")
class ThresholdEarlyStopping(keras.callbacks.Callback):
def __init__(self, loss_thresh=0.2, val_loss_thresh=0.2, verbose=1):
super().__init__()
self.loss_thresh = float(loss_thresh)
self.val_loss_thresh = float(val_loss_thresh)
self.verbose = verbose
def get_config(self):
return {"loss_thresh": self.loss_thresh, "val_loss_thresh": self.val_loss_thresh, "verbose": self.verbose}
@keras.saving.register_keras_serializable(package="Custom")
class WarmUpLR(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, max_lr, warmup_steps):
super().__init__()
self.max_lr = float(max_lr)
self.warmup_steps = float(warmup_steps)
def __call__(self, step):
step = tf.cast(step, tf.float32)
return self.max_lr * tf.minimum(1.0, step / self.warmup_steps)
def get_config(self):
return {"max_lr": self.max_lr, "warmup_steps": self.warmup_steps}
@classmethod
def from_config(cls, config):
if "config" in config:
config = config["config"]
return cls(**config)
@keras.saving.register_keras_serializable(package="Custom")
class WarmUpLRWrapper(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, base_schedule, initial_lr):
super().__init__()
if isinstance(base_schedule, dict):
try:
self.base_schedule = tf.keras.utils.deserialize_keras_object(base_schedule)
except Exception:
cfg = base_schedule.get("config", base_schedule)
self.base_schedule = WarmUpLR(max_lr=cfg.get("max_lr", 0.01), warmup_steps=cfg.get("warmup_steps", 500))
else:
self.base_schedule = base_schedule
self.initial_lr = float(initial_lr)
def __call__(self, step):
step_f = tf.cast(step, tf.float32)
return self.initial_lr + self.base_schedule(step_f)
def get_config(self):
return {"base_schedule": tf.keras.layers.serialize(self.base_schedule), "initial_lr": self.initial_lr}
@classmethod
def from_config(cls, config):
if "config" in config:
config = config["config"]
return cls(base_schedule=config.get("base_schedule"), initial_lr=config.get("initial_lr", 0.0))
@keras.saving.register_keras_serializable(package="Custom")
class SmoothRepPenalty(keras.callbacks.Callback):
def __init__(self, threshold=1.5, base_penalty=1.0, max_penalty=2.0, adapt_rate=0.05):
super().__init__()
self.threshold = threshold
self.base_penalty = base_penalty
self.max_penalty = max_penalty
self.adapt_rate = adapt_rate
def get_config(self):
return {"threshold": self.threshold, "base_penalty": self.base_penalty, "max_penalty": self.max_penalty, "adapt_rate": self.adapt_rate}
@keras.saving.register_keras_serializable(package="Custom")
class MathSymbologgerbutlogingenalty(keras.callbacks.Callback):
def __init__(self, tokenizer=None, math_symbols=None, penalty=0.1):
super().__init__()
self.penalty = float(penalty)
def get_config(self):
return {"penalty": self.penalty}
@keras.saving.register_keras_serializable(package="Custom")
class SymbolCheckPenalty(keras.callbacks.Callback):
def __init__(self, tokenizer=None, max_len=64, symbol_checks=None, penalty_factor=0.05, check_loss_thresh=1.5, check_val_loss_thresh=3.0):
super().__init__()
self.max_len = int(max_len)
self.penalty_factor = float(penalty_factor)
self.check_loss_thresh = check_loss_thresh
self.check_val_loss_thresh = check_val_loss_thresh
def get_config(self):
return {"max_len": self.max_len, "penalty_factor": self.penalty_factor, "check_loss_thresh": self.check_loss_thresh, "check_val_loss_thresh": self.check_val_loss_thresh}
#load or die
# nore: "chatbot.keras" and "tokenizer.pkl" are uploaded
try:
with open("tokenizer.pkl", "rb") as f:
TOK = pickle.load(f)
WORD2IDX = TOK.word_index
IDX2WORD = {i: w for w, i in WORD2IDX.items()}
print("Tokenizer loaded successfully.")
except Exception as e:
print(f"Failed to load tokenizer (Did you upload tokenizer.pkl?): {e}")
try:
custom_objects = {
"WarmUpLR": WarmUpLR,
"WarmUpLRWrapper": WarmUpLRWrapper,
"MaskLayer": MaskLayer,
"ThresholdEarlyStopping": ThresholdEarlyStopping,
"SmoothRepPenalty": SmoothRepPenalty,
"SymbolCheckPenalty": SymbolCheckPenalty,
"tf": tf,
}
# Compile=False is req
model = keras.models.load_model("chatbot.keras", custom_objects=custom_objects, compile=False)
print("Model loaded successfully.")
except Exception as e:
print(f"Failed to load model (dev note: is chatbot.keras uploaded yet): {e}")
#inferance
def build_inference_models(trained_model):
print("Building compiled inference models with Masking Support...")
enc_inp = trained_model.input[0]
enc_emb_layer = trained_model.get_layer("enc_emb")
enc_lstm_layer = trained_model.get_layer("enc_lstm")
dec_emb_layer = trained_model.get_layer("dec_emb")
dec_lstm_layer = trained_model.get_layer("dec_lstm")
att_layer = trained_model.get_layer("bahdanau_attention")
dec_bn_layer = trained_model.get_layer("dec_bn")
dec_dense_layer = trained_model.get_layer("dec_dense")
enc_emb_out = enc_emb_layer(enc_inp)
enc_mask = enc_emb_layer.compute_mask(enc_inp)
enc_lstm_out = enc_lstm_layer(enc_emb_out)
enc_seq = enc_lstm_out[0]
fh, fc, bh, bc = enc_lstm_out[1:]
s_h = Concatenate()([fh, bh])
s_c = Concatenate()([fc, bc])
inf_enc = Model(inputs=enc_inp, outputs=[enc_seq, s_h, s_c, enc_mask], name="inference_encoder")
d_token = Input(shape=(1,), dtype='int32', name="inf_dec_token")
e_seq_in = Input(shape=(MAX_LEN, LATENT_DIM*2), name="inf_enc_seq")
e_mask_in = Input(shape=(MAX_LEN,), dtype='bool', name="inf_enc_mask")
d_h_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_h")
d_c_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_c")
d_emb = dec_emb_layer(d_token)
d_mask = dec_emb_layer.compute_mask(d_token)
dec_out, d_h, d_c = dec_lstm_layer(d_emb, initial_state=[d_h_in, d_c_in])
context = att_layer([dec_out, e_seq_in], mask=[d_mask, e_mask_in])
dec_concat = Concatenate()([dec_out, context])
dec_bn_out = dec_bn_layer(dec_concat)
dec_logits = dec_dense_layer(dec_bn_out)
inf_dec = Model([d_token, e_seq_in, e_mask_in, d_h_in, d_c_in], [dec_logits, d_h, d_c])
return inf_enc, inf_dec
INF_ENCODER, INF_DECODER = build_inference_models(model)
def fast_decode_step(token, e_seq, h, c, decoder_model, e_mask=None):
token = tf.convert_to_tensor(token, dtype=tf.int32)
e_seq = tf.convert_to_tensor(e_seq, dtype=tf.float32)
h = tf.convert_to_tensor(h, dtype=tf.float32)
c = tf.convert_to_tensor(c, dtype=tf.float32)
return decoder_model([token, e_seq, e_mask, h, c], training=False)
def reply(text, max_decode_len=MAX_LEN, rep_penalty=1.3, beam_width=3, length_penalty=0.7, temperature=0.7):
text = text.strip()
if MoE:
if not text.startswith("<TASK_"):
if any(c in text for c in "0123456789+-*/="):
text = f"<TASK_MATH> {text}"
else:
text = f"<TASK_CHAT> {text}"
clean_text = text.lower().strip()
seq = TOK.texts_to_sequences([clean_text])
enc_in = pad_sequences(seq, maxlen=MAX_LEN, padding='post', dtype='int32')
e_seq, h, c, e_mask = INF_ENCODER(enc_in, training=False)
h = tf.convert_to_tensor(h, dtype=tf.float32)
c = tf.convert_to_tensor(c, dtype=tf.float32)
start_token = WORD2IDX.get("<start>", 1)
end_token = WORD2IDX.get("<end>", 2)
oov_token = WORD2IDX.get("<oov>", 3)
beams = [{'score': 0.0, 'tokens': [start_token], 'h': h, 'c': c}]
completed_beams = []
for i in range(max_decode_len):
new_candidates = []
for beam in beams:
current_token = tf.constant([[beam['tokens'][-1]]], dtype=tf.int32)
logits_tensor, new_h, new_c = fast_decode_step(
current_token, e_seq, beam['h'], beam['c'], INF_DECODER, e_mask=e_mask
)
logits = logits_tensor[0, -1, :]
if len(beam['tokens']) < 4:
logits = tf.tensor_scatter_nd_update(logits, [[end_token]], [logits[end_token] - 20.0])
unique_prev = list(set(beam['tokens']))
filtered_tokens = [t for t in unique_prev if t < VOCAB_SIZE]
if filtered_tokens:
indices = [[t] for t in filtered_tokens]
updates = []
for t in filtered_tokens:
val = logits[t]
if val > 0:
updates.append(val / rep_penalty)
else:
updates.append(val * rep_penalty)
logits = tf.tensor_scatter_nd_update(logits, indices, updates)
logits = tf.tensor_scatter_nd_update(logits, [[oov_token]], [logits[oov_token] - 15.0])
safe_temp = max(temperature, 1e-6)
log_probs = tf.nn.log_softmax(logits / safe_temp)
top_k_log_probs, top_k_indices = tf.nn.top_k(log_probs, k=beam_width)
for j in range(beam_width):
token_id = int(top_k_indices[j].numpy())
step_score = float(top_k_log_probs[j].numpy())
new_candidate = {
'score': beam['score'] + step_score,
'tokens': beam['tokens'] + [token_id],
'h': new_h,
'c': new_c
}
new_candidates.append(new_candidate)
new_candidates = sorted(new_candidates, key=lambda x: x['score'], reverse=True)
beams = []
for candidate in new_candidates:
last_token = candidate['tokens'][-1]
if last_token in [0, oov_token, end_token]:
length_norm = (len(candidate['tokens']) ** length_penalty)
candidate['norm_score'] = candidate['score'] / length_norm
completed_beams.append(candidate)
else:
if len(beams) < beam_width:
beams.append(candidate)
if len(beams) == beam_width:
break
if not beams:
break
if not completed_beams:
for b in beams:
b['norm_score'] = b['score'] / (len(b['tokens']) ** length_penalty)
completed_beams.append(b)
best_beam = max(completed_beams, key=lambda x: x['norm_score'])
decoded_tokens = best_beam['tokens'][1:]
if decoded_tokens and decoded_tokens[-1] in [0, oov_token, end_token]:
decoded_tokens = decoded_tokens[:-1]
response_words = [IDX2WORD.get(t, "") for t in decoded_tokens]
clean_words = [w for w in response_words if w not in ["<start>", "<end>", "", None]]
return " ".join(clean_words).strip()
#grad
def respond(message, history, max_tokens, temperature, top_p):
response = reply(
message,
max_decode_len=max_tokens,
temperature=temperature,
rep_penalty=1.3
)
yield response
with gr.Blocks() as demo:
with gr.Sidebar():
gr.LoginButton()
gr.Markdown("### Model Info\nIAMAM v1.1.0\n40M Parameters\nLSTM + Attention")
gr.ChatInterface(
fn=respond,
title="A Math AutoBiography maker!",
description="Try asking it to make a autobiography on 2 + 2! (Runs IAMAM v1.1.0)",
additional_inputs=[
gr.Slider(minimum=1, maximum=MAX_LEN, value=32, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"),
],
)
if __name__ == "__main__":
demo.launch()