| | import numpy as np |
| | import tensorflow as tf |
| | from tensorflow.keras.utils import to_categorical |
| | from tensorflow.keras.layers import Layer, Dense, Dropout, LayerNormalization, MultiHeadAttention |
| | from tensorflow.keras.models import Sequential |
| | from sklearn.preprocessing import LabelEncoder |
| | import gradio as gr |
| | import os |
| |
|
| | |
| | class EnhancedTransformerBlock(Layer): |
| | def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): |
| | super(EnhancedTransformerBlock, self).__init__(**kwargs) |
| | self.embed_dim = embed_dim |
| | self.num_heads = num_heads |
| | self.ff_dim = ff_dim |
| | self.rate = rate |
| | |
| | self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) |
| | self.ffn = Sequential([ |
| | Dense(ff_dim, activation="relu"), |
| | Dense(embed_dim), |
| | ]) |
| | self.layernorm1 = LayerNormalization(epsilon=1e-6) |
| | self.layernorm2 = LayerNormalization(epsilon=1e-6) |
| | self.dropout1 = Dropout(rate) |
| | self.dropout2 = Dropout(rate) |
| | self.self_attention = MultiHeadAttention(num_heads=1, key_dim=embed_dim) |
| |
|
| | def call(self, inputs, training=False): |
| | attn_output = self.att(inputs, inputs) |
| | attn_output = self.dropout1(attn_output, training=training) |
| | out1 = self.layernorm1(inputs + attn_output) |
| | ffn_output = self.ffn(out1) |
| | ffn_output = self.dropout2(ffn_output, training=training) |
| | out2 = self.layernorm2(out1 + ffn_output) |
| | self_attn_output = self.self_attention(out2, out2) |
| | return self.layernorm2(out2 + self_attn_output) |
| |
|
| | def get_config(self): |
| | config = super().get_config() |
| | config.update({ |
| | "embed_dim": self.embed_dim, |
| | "num_heads": self.num_heads, |
| | "ff_dim": self.ff_dim, |
| | "rate": self.rate, |
| | }) |
| | return config |
| |
|
| | |
| | sequence_length = 10 |
| | data = ["Single big", "Double big", "Double big", "Single small", "Single big", |
| | "Double small", "Single big", "Double small", "Single small", "Single small", |
| | "Single big", "Single small", "triple", "single big", "double small", "double small", "double small", "double small"] |
| | encoder = LabelEncoder() |
| |
|
| | |
| | try: |
| | model = tf.keras.models.load_model("enhanced_adaptive_model.keras", custom_objects={'EnhancedTransformerBlock': EnhancedTransformerBlock}) |
| | encoder.classes_ = np.load('label_encoder_classes.npy', allow_pickle=True) |
| | except FileNotFoundError: |
| | print("Model or encoder classes file not found. Using a dummy model and encoder for demonstration.") |
| | |
| | model = tf.keras.Sequential([tf.keras.layers.Dense(10, input_shape=(sequence_length,), activation='softmax')]) |
| | encoder.classes_ = np.array(['Single small', 'Single big', 'Double small', 'Double big', 'Triple']) |
| |
|
| | def update_data(data, new_outcome): |
| | data.append(new_outcome) |
| | if len(data) > sequence_length: |
| | data.pop(0) |
| | return data |
| |
|
| | def enhanced_predict_next(model, data, sequence_length, encoder): |
| | last_sequence = data[-sequence_length:] |
| | last_sequence = np.array(encoder.transform(last_sequence)).reshape((1, sequence_length)) |
| | |
| | |
| | predictions = [] |
| | for _ in range(100): |
| | prediction = model(last_sequence, training=True) |
| | predictions.append(prediction) |
| | |
| | mean_prediction = np.mean(predictions, axis=0) |
| | std_prediction = np.std(predictions, axis=0) |
| | |
| | predicted_label = encoder.inverse_transform([np.argmax(mean_prediction)]) |
| | uncertainty = np.mean(std_prediction) |
| | |
| | return predicted_label[0], uncertainty |
| |
|
| | def gradio_predict(outcome): |
| | global data |
| |
|
| | if outcome not in encoder.classes_: |
| | return "Invalid outcome. Please try again." |
| |
|
| | data = update_data(data, outcome) |
| |
|
| | if len(data) < sequence_length: |
| | return f"Not enough data to make a prediction. Please enter {sequence_length - len(data)} more outcomes." |
| |
|
| | predicted_next, uncertainty = enhanced_predict_next(model, data, sequence_length, encoder) |
| | return f'Predicted next outcome: {predicted_next} (Uncertainty: {uncertainty:.4f})' |
| |
|
| | def gradio_update(actual_next): |
| | global data, model |
| |
|
| | if actual_next not in encoder.classes_: |
| | return "Invalid outcome. Please try again." |
| |
|
| | data = update_data(data, actual_next) |
| |
|
| | if len(data) < sequence_length: |
| | return f"Not enough data to update the model. Please enter {sequence_length - len(data)} more outcomes." |
| |
|
| | encoded_actual_next = encoder.transform([actual_next])[0] |
| | new_X = np.array(encoder.transform(data[-sequence_length:])).reshape((1, sequence_length)) |
| | new_y = to_categorical(encoded_actual_next, num_classes=len(encoder.classes_)) |
| |
|
| | model.fit(new_X, new_y, epochs=1, verbose=0) |
| | return "Model updated with new data." |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("## Enhanced Outcome Prediction Model") |
| | gr.Markdown(f"Enter a sequence of {sequence_length} outcomes to get started.") |
| | gr.Markdown(f"Valid outcomes: {', '.join(encoder.classes_)}") |
| | |
| | with gr.Row(): |
| | outcome_input = gr.Textbox(label="Enter an outcome") |
| | predict_button = gr.Button("Predict Next") |
| | predicted_output = gr.Textbox(label="Prediction") |
| | |
| | with gr.Row(): |
| | actual_input = gr.Textbox(label="Enter actual next outcome") |
| | update_button = gr.Button("Update Model") |
| | update_output = gr.Textbox(label="Update Status") |
| |
|
| | predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) |
| | update_button.click(gradio_update, inputs=actual_input, outputs=update_output) |
| |
|
| | demo.launch() |