# why does this look like its going to train a model? its not, its missing a main loops and model.fit! import gradio as gr import keras import tensorflow as tf import numpy as np import pickle import os from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Concatenate #globals required VOCAB_SIZE = 13000 MAX_LEN = 32 LATENT_DIM = 256 temp = 0.5 topk = 40 MoE = False #reg custom classes @keras.saving.register_keras_serializable(package="Custom") class MaskLayer(tf.keras.layers.Layer): def __init__(self, **kwargs): super().__init__(**kwargs) self.supports_masking = True def call(self, inputs): return inputs[0] def compute_mask(self, inputs, mask=None): if mask is not None: return mask[0] return None @keras.saving.register_keras_serializable(package="Custom") class ThresholdEarlyStopping(keras.callbacks.Callback): def __init__(self, loss_thresh=0.2, val_loss_thresh=0.2, verbose=1): super().__init__() self.loss_thresh = float(loss_thresh) self.val_loss_thresh = float(val_loss_thresh) self.verbose = verbose def get_config(self): return {"loss_thresh": self.loss_thresh, "val_loss_thresh": self.val_loss_thresh, "verbose": self.verbose} @keras.saving.register_keras_serializable(package="Custom") class WarmUpLR(tf.keras.optimizers.schedules.LearningRateSchedule): def __init__(self, max_lr, warmup_steps): super().__init__() self.max_lr = float(max_lr) self.warmup_steps = float(warmup_steps) def __call__(self, step): step = tf.cast(step, tf.float32) return self.max_lr * tf.minimum(1.0, step / self.warmup_steps) def get_config(self): return {"max_lr": self.max_lr, "warmup_steps": self.warmup_steps} @classmethod def from_config(cls, config): if "config" in config: config = config["config"] return cls(**config) @keras.saving.register_keras_serializable(package="Custom") class WarmUpLRWrapper(tf.keras.optimizers.schedules.LearningRateSchedule): def __init__(self, base_schedule, initial_lr): super().__init__() if isinstance(base_schedule, dict): try: self.base_schedule = tf.keras.utils.deserialize_keras_object(base_schedule) except Exception: cfg = base_schedule.get("config", base_schedule) self.base_schedule = WarmUpLR(max_lr=cfg.get("max_lr", 0.01), warmup_steps=cfg.get("warmup_steps", 500)) else: self.base_schedule = base_schedule self.initial_lr = float(initial_lr) def __call__(self, step): step_f = tf.cast(step, tf.float32) return self.initial_lr + self.base_schedule(step_f) def get_config(self): return {"base_schedule": tf.keras.layers.serialize(self.base_schedule), "initial_lr": self.initial_lr} @classmethod def from_config(cls, config): if "config" in config: config = config["config"] return cls(base_schedule=config.get("base_schedule"), initial_lr=config.get("initial_lr", 0.0)) @keras.saving.register_keras_serializable(package="Custom") class SmoothRepPenalty(keras.callbacks.Callback): def __init__(self, threshold=1.5, base_penalty=1.0, max_penalty=2.0, adapt_rate=0.05): super().__init__() self.threshold = threshold self.base_penalty = base_penalty self.max_penalty = max_penalty self.adapt_rate = adapt_rate def get_config(self): return {"threshold": self.threshold, "base_penalty": self.base_penalty, "max_penalty": self.max_penalty, "adapt_rate": self.adapt_rate} @keras.saving.register_keras_serializable(package="Custom") class MathSymbologgerbutlogingenalty(keras.callbacks.Callback): def __init__(self, tokenizer=None, math_symbols=None, penalty=0.1): super().__init__() self.penalty = float(penalty) def get_config(self): return {"penalty": self.penalty} @keras.saving.register_keras_serializable(package="Custom") class SymbolCheckPenalty(keras.callbacks.Callback): def __init__(self, tokenizer=None, max_len=64, symbol_checks=None, penalty_factor=0.05, check_loss_thresh=1.5, check_val_loss_thresh=3.0): super().__init__() self.max_len = int(max_len) self.penalty_factor = float(penalty_factor) self.check_loss_thresh = check_loss_thresh self.check_val_loss_thresh = check_val_loss_thresh def get_config(self): return {"max_len": self.max_len, "penalty_factor": self.penalty_factor, "check_loss_thresh": self.check_loss_thresh, "check_val_loss_thresh": self.check_val_loss_thresh} #load or die # nore: "chatbot.keras" and "tokenizer.pkl" are uploaded try: with open("tokenizer.pkl", "rb") as f: TOK = pickle.load(f) WORD2IDX = TOK.word_index IDX2WORD = {i: w for w, i in WORD2IDX.items()} print("Tokenizer loaded successfully.") except Exception as e: print(f"Failed to load tokenizer (Did you upload tokenizer.pkl?): {e}") try: custom_objects = { "WarmUpLR": WarmUpLR, "WarmUpLRWrapper": WarmUpLRWrapper, "MaskLayer": MaskLayer, "ThresholdEarlyStopping": ThresholdEarlyStopping, "SmoothRepPenalty": SmoothRepPenalty, "SymbolCheckPenalty": SymbolCheckPenalty, "tf": tf, } # Compile=False is req model = keras.models.load_model("chatbot.keras", custom_objects=custom_objects, compile=False) print("Model loaded successfully.") except Exception as e: print(f"Failed to load model (dev note: is chatbot.keras uploaded yet): {e}") #inferance def build_inference_models(trained_model): print("Building compiled inference models with Masking Support...") enc_inp = trained_model.input[0] enc_emb_layer = trained_model.get_layer("enc_emb") enc_lstm_layer = trained_model.get_layer("enc_lstm") dec_emb_layer = trained_model.get_layer("dec_emb") dec_lstm_layer = trained_model.get_layer("dec_lstm") att_layer = trained_model.get_layer("bahdanau_attention") dec_bn_layer = trained_model.get_layer("dec_bn") dec_dense_layer = trained_model.get_layer("dec_dense") enc_emb_out = enc_emb_layer(enc_inp) enc_mask = enc_emb_layer.compute_mask(enc_inp) enc_lstm_out = enc_lstm_layer(enc_emb_out) enc_seq = enc_lstm_out[0] fh, fc, bh, bc = enc_lstm_out[1:] s_h = Concatenate()([fh, bh]) s_c = Concatenate()([fc, bc]) inf_enc = Model(inputs=enc_inp, outputs=[enc_seq, s_h, s_c, enc_mask], name="inference_encoder") d_token = Input(shape=(1,), dtype='int32', name="inf_dec_token") e_seq_in = Input(shape=(MAX_LEN, LATENT_DIM*2), name="inf_enc_seq") e_mask_in = Input(shape=(MAX_LEN,), dtype='bool', name="inf_enc_mask") d_h_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_h") d_c_in = Input(shape=(LATENT_DIM*2,), name="inf_dec_c") d_emb = dec_emb_layer(d_token) d_mask = dec_emb_layer.compute_mask(d_token) dec_out, d_h, d_c = dec_lstm_layer(d_emb, initial_state=[d_h_in, d_c_in]) context = att_layer([dec_out, e_seq_in], mask=[d_mask, e_mask_in]) dec_concat = Concatenate()([dec_out, context]) dec_bn_out = dec_bn_layer(dec_concat) dec_logits = dec_dense_layer(dec_bn_out) inf_dec = Model([d_token, e_seq_in, e_mask_in, d_h_in, d_c_in], [dec_logits, d_h, d_c]) return inf_enc, inf_dec INF_ENCODER, INF_DECODER = build_inference_models(model) def fast_decode_step(token, e_seq, h, c, decoder_model, e_mask=None): token = tf.convert_to_tensor(token, dtype=tf.int32) e_seq = tf.convert_to_tensor(e_seq, dtype=tf.float32) h = tf.convert_to_tensor(h, dtype=tf.float32) c = tf.convert_to_tensor(c, dtype=tf.float32) return decoder_model([token, e_seq, e_mask, h, c], training=False) def reply(text, max_decode_len=MAX_LEN, rep_penalty=1.3, beam_width=3, length_penalty=0.7, temperature=0.7): text = text.strip() if MoE: if not text.startswith(" {text}" else: text = f" {text}" clean_text = text.lower().strip() seq = TOK.texts_to_sequences([clean_text]) enc_in = pad_sequences(seq, maxlen=MAX_LEN, padding='post', dtype='int32') e_seq, h, c, e_mask = INF_ENCODER(enc_in, training=False) h = tf.convert_to_tensor(h, dtype=tf.float32) c = tf.convert_to_tensor(c, dtype=tf.float32) start_token = WORD2IDX.get("", 1) end_token = WORD2IDX.get("", 2) oov_token = WORD2IDX.get("", 3) beams = [{'score': 0.0, 'tokens': [start_token], 'h': h, 'c': c}] completed_beams = [] for i in range(max_decode_len): new_candidates = [] for beam in beams: current_token = tf.constant([[beam['tokens'][-1]]], dtype=tf.int32) logits_tensor, new_h, new_c = fast_decode_step( current_token, e_seq, beam['h'], beam['c'], INF_DECODER, e_mask=e_mask ) logits = logits_tensor[0, -1, :] if len(beam['tokens']) < 4: logits = tf.tensor_scatter_nd_update(logits, [[end_token]], [logits[end_token] - 20.0]) unique_prev = list(set(beam['tokens'])) filtered_tokens = [t for t in unique_prev if t < VOCAB_SIZE] if filtered_tokens: indices = [[t] for t in filtered_tokens] updates = [] for t in filtered_tokens: val = logits[t] if val > 0: updates.append(val / rep_penalty) else: updates.append(val * rep_penalty) logits = tf.tensor_scatter_nd_update(logits, indices, updates) logits = tf.tensor_scatter_nd_update(logits, [[oov_token]], [logits[oov_token] - 15.0]) safe_temp = max(temperature, 1e-6) log_probs = tf.nn.log_softmax(logits / safe_temp) top_k_log_probs, top_k_indices = tf.nn.top_k(log_probs, k=beam_width) for j in range(beam_width): token_id = int(top_k_indices[j].numpy()) step_score = float(top_k_log_probs[j].numpy()) new_candidate = { 'score': beam['score'] + step_score, 'tokens': beam['tokens'] + [token_id], 'h': new_h, 'c': new_c } new_candidates.append(new_candidate) new_candidates = sorted(new_candidates, key=lambda x: x['score'], reverse=True) beams = [] for candidate in new_candidates: last_token = candidate['tokens'][-1] if last_token in [0, oov_token, end_token]: length_norm = (len(candidate['tokens']) ** length_penalty) candidate['norm_score'] = candidate['score'] / length_norm completed_beams.append(candidate) else: if len(beams) < beam_width: beams.append(candidate) if len(beams) == beam_width: break if not beams: break if not completed_beams: for b in beams: b['norm_score'] = b['score'] / (len(b['tokens']) ** length_penalty) completed_beams.append(b) best_beam = max(completed_beams, key=lambda x: x['norm_score']) decoded_tokens = best_beam['tokens'][1:] if decoded_tokens and decoded_tokens[-1] in [0, oov_token, end_token]: decoded_tokens = decoded_tokens[:-1] response_words = [IDX2WORD.get(t, "") for t in decoded_tokens] clean_words = [w for w in response_words if w not in ["", "", "", None]] return " ".join(clean_words).strip() #grad def respond(message, history, max_tokens, temperature, top_p): response = reply( message, max_decode_len=max_tokens, temperature=temperature, rep_penalty=1.3 ) yield response with gr.Blocks() as demo: with gr.Sidebar(): gr.LoginButton() gr.Markdown("### Model Info\nIAMAM v1.1.0\n40M Parameters\nLSTM + Attention") gr.ChatInterface( fn=respond, title="A Math AutoBiography maker!", description="Try asking it to make a autobiography on 2 + 2! (Runs IAMAM v1.1.0)", additional_inputs=[ gr.Slider(minimum=1, maximum=MAX_LEN, value=32, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"), ], ) if __name__ == "__main__": demo.launch()