| |
| import gradio as gr |
| import keras |
| import tensorflow as tf |
| import numpy as np |
| import pickle |
| import os |
| from tensorflow.keras.preprocessing.sequence import pad_sequences |
| from tensorflow.keras.models import Model |
| from tensorflow.keras.layers import Input, Concatenate |
|
|
| |
| VOCAB_SIZE = 13000 |
| MAX_LEN = 32 |
| LATENT_DIM = 256 |
| temp = 0.5 |
| topk = 40 |
| MoE = False |
|
|
| |
| @keras.saving.register_keras_serializable(package="Custom") |
| class MaskLayer(tf.keras.layers.Layer): |
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
| self.supports_masking = True |
|
|
| def call(self, inputs): |
| return inputs[0] |
|
|
| def compute_mask(self, inputs, mask=None): |
| if mask is not None: |
| return mask[0] |
| return None |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class ThresholdEarlyStopping(keras.callbacks.Callback): |
| def __init__(self, loss_thresh=0.2, val_loss_thresh=0.2, verbose=1): |
| super().__init__() |
| self.loss_thresh = float(loss_thresh) |
| self.val_loss_thresh = float(val_loss_thresh) |
| self.verbose = verbose |
| def get_config(self): |
| return {"loss_thresh": self.loss_thresh, "val_loss_thresh": self.val_loss_thresh, "verbose": self.verbose} |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class WarmUpLR(tf.keras.optimizers.schedules.LearningRateSchedule): |
| def __init__(self, max_lr, warmup_steps): |
| super().__init__() |
| self.max_lr = float(max_lr) |
| self.warmup_steps = float(warmup_steps) |
| def __call__(self, step): |
| step = tf.cast(step, tf.float32) |
| return self.max_lr * tf.minimum(1.0, step / self.warmup_steps) |
| def get_config(self): |
| return {"max_lr": self.max_lr, "warmup_steps": self.warmup_steps} |
| @classmethod |
| def from_config(cls, config): |
| if "config" in config: |
| config = config["config"] |
| return cls(**config) |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class WarmUpLRWrapper(tf.keras.optimizers.schedules.LearningRateSchedule): |
| def __init__(self, base_schedule, initial_lr): |
| super().__init__() |
| if isinstance(base_schedule, dict): |
| try: |
| self.base_schedule = tf.keras.utils.deserialize_keras_object(base_schedule) |
| except Exception: |
| cfg = base_schedule.get("config", base_schedule) |
| self.base_schedule = WarmUpLR(max_lr=cfg.get("max_lr", 0.01), warmup_steps=cfg.get("warmup_steps", 500)) |
| else: |
| self.base_schedule = base_schedule |
| self.initial_lr = float(initial_lr) |
| def __call__(self, step): |
| step_f = tf.cast(step, tf.float32) |
| return self.initial_lr + self.base_schedule(step_f) |
| def get_config(self): |
| return {"base_schedule": tf.keras.layers.serialize(self.base_schedule), "initial_lr": self.initial_lr} |
| @classmethod |
| def from_config(cls, config): |
| if "config" in config: |
| config = config["config"] |
| return cls(base_schedule=config.get("base_schedule"), initial_lr=config.get("initial_lr", 0.0)) |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class SmoothRepPenalty(keras.callbacks.Callback): |
| def __init__(self, threshold=1.5, base_penalty=1.0, max_penalty=2.0, adapt_rate=0.05): |
| super().__init__() |
| self.threshold = threshold |
| self.base_penalty = base_penalty |
| self.max_penalty = max_penalty |
| self.adapt_rate = adapt_rate |
| def get_config(self): |
| return {"threshold": self.threshold, "base_penalty": self.base_penalty, "max_penalty": self.max_penalty, "adapt_rate": self.adapt_rate} |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class MathSymbologgerbutlogingenalty(keras.callbacks.Callback): |
| def __init__(self, tokenizer=None, math_symbols=None, penalty=0.1): |
| super().__init__() |
| self.penalty = float(penalty) |
| def get_config(self): |
| return {"penalty": self.penalty} |
|
|
| @keras.saving.register_keras_serializable(package="Custom") |
| class SymbolCheckPenalty(keras.callbacks.Callback): |
| def __init__(self, tokenizer=None, max_len=64, symbol_checks=None, penalty_factor=0.05, check_loss_thresh=1.5, check_val_loss_thresh=3.0): |
| super().__init__() |
| self.max_len = int(max_len) |
| self.penalty_factor = float(penalty_factor) |
| self.check_loss_thresh = check_loss_thresh |
| self.check_val_loss_thresh = check_val_loss_thresh |
| def get_config(self): |
| return {"max_len": self.max_len, "penalty_factor": self.penalty_factor, "check_loss_thresh": self.check_loss_thresh, "check_val_loss_thresh": self.check_val_loss_thresh} |
|
|
| |
| |
| try: |
| with open("tokenizer.pkl", "rb") as f: |
| TOK = pickle.load(f) |
| WORD2IDX = TOK.word_index |
| IDX2WORD = {i: w for w, i in WORD2IDX.items()} |
| print("Tokenizer loaded successfully.") |
| except Exception as e: |
| print(f"Failed to load tokenizer (Did you upload tokenizer.pkl?): {e}") |
|
|
| try: |
| custom_objects = { |
| "WarmUpLR": WarmUpLR, |
| "WarmUpLRWrapper": WarmUpLRWrapper, |
| "MaskLayer": MaskLayer, |
| "ThresholdEarlyStopping": ThresholdEarlyStopping, |
| "SmoothRepPenalty": SmoothRepPenalty, |
| "SymbolCheckPenalty": SymbolCheckPenalty, |
| "tf": tf, |
| } |
| |
| model = keras.models.load_model("chatbot.keras", custom_objects=custom_objects, compile=False) |
| print("Model loaded successfully.") |
| except Exception as e: |
| print(f"Failed to load model (dev note: is chatbot.keras uploaded yet): {e}") |
|
|
| |
| def build_inference_models(trained_model): |
| print("Building compiled inference models with Masking Support...") |
| enc_inp = trained_model.input[0] |
| enc_emb_layer = trained_model.get_layer("enc_emb") |
| enc_lstm_layer = trained_model.get_layer("enc_lstm") |
| dec_emb_layer = trained_model.get_layer("dec_emb") |
| dec_lstm_layer = trained_model.get_layer("dec_lstm") |
| att_layer = trained_model.get_layer("bahdanau_attention") |
| dec_bn_layer = trained_model.get_layer("dec_bn") |
| dec_dense_layer = trained_model.get_layer("dec_dense") |
|
|
| enc_emb_out = enc_emb_layer(enc_inp) |
| enc_mask = enc_emb_layer.compute_mask(enc_inp) |
| |
| enc_lstm_out = enc_lstm_layer(enc_emb_out) |
| enc_seq = enc_lstm_out[0] |
| fh, fc, bh, bc = enc_lstm_out[1:] |
| s_h = Concatenate()([fh, bh]) |
| s_c = Concatenate()([fc, bc]) |
| |
| inf_enc = Model(inputs=enc_inp, outputs=[enc_seq, s_h, s_c, enc_mask], name="inference_encoder") |
|
|
| d_token = Input(shape=(1,), dtype='int32', name="inf_dec_token") |
| e_seq_in = Input(shape=(MAX_LEN, LATENT_DIM*2), name="inf_enc_seq") |
| e_mask_in = Input(shape=(MAX_LEN,), dtype='bool', name="inf_enc_mask") |
| d_h_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_h") |
| d_c_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_c") |
|
|
| d_emb = dec_emb_layer(d_token) |
| d_mask = dec_emb_layer.compute_mask(d_token) |
|
|
| dec_out, d_h, d_c = dec_lstm_layer(d_emb, initial_state=[d_h_in, d_c_in]) |
| context = att_layer([dec_out, e_seq_in], mask=[d_mask, e_mask_in]) |
| dec_concat = Concatenate()([dec_out, context]) |
| dec_bn_out = dec_bn_layer(dec_concat) |
| dec_logits = dec_dense_layer(dec_bn_out) |
|
|
| inf_dec = Model([d_token, e_seq_in, e_mask_in, d_h_in, d_c_in], [dec_logits, d_h, d_c]) |
| return inf_enc, inf_dec |
|
|
| INF_ENCODER, INF_DECODER = build_inference_models(model) |
|
|
| def fast_decode_step(token, e_seq, h, c, decoder_model, e_mask=None): |
| token = tf.convert_to_tensor(token, dtype=tf.int32) |
| e_seq = tf.convert_to_tensor(e_seq, dtype=tf.float32) |
| h = tf.convert_to_tensor(h, dtype=tf.float32) |
| c = tf.convert_to_tensor(c, dtype=tf.float32) |
| return decoder_model([token, e_seq, e_mask, h, c], training=False) |
|
|
| def reply(text, max_decode_len=MAX_LEN, rep_penalty=1.3, beam_width=3, length_penalty=0.7, temperature=0.7): |
| text = text.strip() |
| if MoE: |
| if not text.startswith("<TASK_"): |
| if any(c in text for c in "0123456789+-*/="): |
| text = f"<TASK_MATH> {text}" |
| else: |
| text = f"<TASK_CHAT> {text}" |
|
|
| clean_text = text.lower().strip() |
| seq = TOK.texts_to_sequences([clean_text]) |
| enc_in = pad_sequences(seq, maxlen=MAX_LEN, padding='post', dtype='int32') |
| e_seq, h, c, e_mask = INF_ENCODER(enc_in, training=False) |
| h = tf.convert_to_tensor(h, dtype=tf.float32) |
| c = tf.convert_to_tensor(c, dtype=tf.float32) |
| start_token = WORD2IDX.get("<start>", 1) |
| end_token = WORD2IDX.get("<end>", 2) |
| oov_token = WORD2IDX.get("<oov>", 3) |
| |
| beams = [{'score': 0.0, 'tokens': [start_token], 'h': h, 'c': c}] |
| completed_beams = [] |
| |
| for i in range(max_decode_len): |
| new_candidates = [] |
| for beam in beams: |
| current_token = tf.constant([[beam['tokens'][-1]]], dtype=tf.int32) |
| logits_tensor, new_h, new_c = fast_decode_step( |
| current_token, e_seq, beam['h'], beam['c'], INF_DECODER, e_mask=e_mask |
| ) |
| logits = logits_tensor[0, -1, :] |
| |
| if len(beam['tokens']) < 4: |
| logits = tf.tensor_scatter_nd_update(logits, [[end_token]], [logits[end_token] - 20.0]) |
| |
| unique_prev = list(set(beam['tokens'])) |
| filtered_tokens = [t for t in unique_prev if t < VOCAB_SIZE] |
| if filtered_tokens: |
| indices = [[t] for t in filtered_tokens] |
| updates = [] |
| for t in filtered_tokens: |
| val = logits[t] |
| if val > 0: |
| updates.append(val / rep_penalty) |
| else: |
| updates.append(val * rep_penalty) |
| logits = tf.tensor_scatter_nd_update(logits, indices, updates) |
| |
| logits = tf.tensor_scatter_nd_update(logits, [[oov_token]], [logits[oov_token] - 15.0]) |
| |
| safe_temp = max(temperature, 1e-6) |
| log_probs = tf.nn.log_softmax(logits / safe_temp) |
| |
| top_k_log_probs, top_k_indices = tf.nn.top_k(log_probs, k=beam_width) |
| |
| for j in range(beam_width): |
| token_id = int(top_k_indices[j].numpy()) |
| step_score = float(top_k_log_probs[j].numpy()) |
|
|
| new_candidate = { |
| 'score': beam['score'] + step_score, |
| 'tokens': beam['tokens'] + [token_id], |
| 'h': new_h, |
| 'c': new_c |
| } |
| new_candidates.append(new_candidate) |
| |
| new_candidates = sorted(new_candidates, key=lambda x: x['score'], reverse=True) |
|
|
| beams = [] |
| for candidate in new_candidates: |
| last_token = candidate['tokens'][-1] |
| if last_token in [0, oov_token, end_token]: |
| length_norm = (len(candidate['tokens']) ** length_penalty) |
| candidate['norm_score'] = candidate['score'] / length_norm |
| completed_beams.append(candidate) |
| else: |
| if len(beams) < beam_width: |
| beams.append(candidate) |
| if len(beams) == beam_width: |
| break |
| if not beams: |
| break |
| |
| if not completed_beams: |
| for b in beams: |
| b['norm_score'] = b['score'] / (len(b['tokens']) ** length_penalty) |
| completed_beams.append(b) |
| |
| best_beam = max(completed_beams, key=lambda x: x['norm_score']) |
| decoded_tokens = best_beam['tokens'][1:] |
| |
| if decoded_tokens and decoded_tokens[-1] in [0, oov_token, end_token]: |
| decoded_tokens = decoded_tokens[:-1] |
| |
| response_words = [IDX2WORD.get(t, "") for t in decoded_tokens] |
| clean_words = [w for w in response_words if w not in ["<start>", "<end>", "", None]] |
|
|
| return " ".join(clean_words).strip() |
| |
| def respond(message, history, max_tokens, temperature, top_p): |
| response = reply( |
| message, |
| max_decode_len=max_tokens, |
| temperature=temperature, |
| rep_penalty=1.3 |
| ) |
| yield response |
|
|
| with gr.Blocks() as demo: |
| with gr.Sidebar(): |
| gr.LoginButton() |
| gr.Markdown("### Model Info\nIAMAM v1.1.0\n40M Parameters\nLSTM + Attention") |
|
|
| gr.ChatInterface( |
| fn=respond, |
| title="A Math AutoBiography maker!", |
| description="Try asking it to make a autobiography on 2 + 2! (Runs IAMAM v1.1.0)", |
| additional_inputs=[ |
| gr.Slider(minimum=1, maximum=MAX_LEN, value=32, step=1, label="Max new tokens"), |
| gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), |
| gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"), |
| ], |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |