| import math | |
| import random | |
| import re | |
| import pickle | |
| import os | |
| from data import corpus | |
| from agxpre import correct_grammar | |
| class AgX: | |
| def __init__(self, model_name='AgX-2', max_length=10000, model_file='AgX_model.pkl'): | |
| self.ModelName = model_name | |
| self.max_length = max_length | |
| self.user = 'user' | |
| self.ai = 'ai' | |
| self.minNgram = 1 | |
| self.maxNgram = 5 | |
| self.RED = '\033[91m' | |
| self.GREEN = '\033[92m' | |
| self.BLUE = '\033[94m' | |
| self.RESET = '\033[0m' | |
| self.model_file = model_file | |
| if os.path.exists(self.model_file): | |
| print(f'{self.RED}Loading saved model from {self.model_file}...{self.RESET}') | |
| self.ngram_models = self.load_model() | |
| else: | |
| print(f'{self.RED}No saved model found. Training new model...{self.RESET}') | |
| self.ngram_models = self.train_model(corpus) | |
| self.save_model() | |
| def mat_mul(self, A, B): | |
| result = [] | |
| for i in range(len(A)): | |
| result.append([sum(A[i][k] * B[k][j] for k in range(len(B))) for j in range(len(B[0]))]) | |
| return result | |
| def softmax(self, x): | |
| exp_x = [math.exp(v - max(x)) for v in x] | |
| sum_exp_x = sum(exp_x) | |
| return [e / sum_exp_x for e in exp_x] | |
| def self_attention(self, Q, K, V): | |
| scores = [[sum(Q[i][idx] * K[j][idx] for idx in range(len(Q[i]))) for j in range(len(K))] for i in range(len(Q))] | |
| attention_weights = [self.softmax(row) for row in scores] | |
| output = [[sum(attention_weights[i][k] * V[k][j] for k in range(len(V))) for j in range(len(V[0]))] for i in range(len(V))] | |
| return output | |
| def multi_head_attention(self, Q, K, V, num_heads): | |
| d_model = len(Q[0]) | |
| head_size = d_model // num_heads | |
| outputs = [] | |
| for head in range(num_heads): | |
| q_head = [row[head * head_size:(head + 1) * head_size] for row in Q] | |
| k_head = [row[head * head_size:(head + 1) * head_size] for row in K] | |
| v_head = [row[head * head_size:(head + 1) * head_size] for row in V] | |
| attention_output = self.self_attention(q_head, k_head, v_head) | |
| outputs.extend(attention_output) | |
| return outputs | |
| def positional_encoding(self, seq_len, d_model): | |
| encoding = [[math.sin(pos / (10000 ** (i / d_model))) if i % 2 == 0 else math.cos(pos / (10000 ** (i / d_model))) for i in range(d_model)] for pos in range(seq_len)] | |
| return encoding | |
| def add_positional_encoding(self, embeddings, positional_encodings): | |
| return [[val + positional_encodings[i][j] for j, val in enumerate(row)] for i, row in enumerate(embeddings)] | |
| def feed_forward_network(self, x): | |
| input_dim = len(x[0]) | |
| hidden_dim = 10 | |
| output_dim = 10 | |
| W1 = [[1 if i == j else 0 for j in range(hidden_dim)] for i in range(input_dim)] | |
| b1 = [0] * hidden_dim | |
| W2 = [[1 for _ in range(output_dim)] for _ in range(hidden_dim)] | |
| b2 = [0] * output_dim | |
| hidden = [[max(0, sum(x[i][k] * W1[k][j] for k in range(len(W1))) + b1[j]) for j in range(hidden_dim)] for i in range(len(x))] | |
| output = [[sum(hidden[i][k] * W2[k][j] for k in range(len(W2))) + b2[j] for j in range(output_dim)] for i in range(len(hidden))] | |
| return output | |
| def tokenize(self, text): | |
| return text.lower().split() | |
| def embed_tokens(self, tokens): | |
| return [[random.random() for _ in range(3)] for _ in tokens] | |
| def build_ngram_models(self, corpus, min_n=1, max_n=5): | |
| ngram_models = {} | |
| words = self.tokenize(corpus) | |
| for n in range(min_n, max_n + 1): | |
| model = {} | |
| for i in range(len(words) - n): | |
| context = ' '.join(words[i:i+n-1]) | |
| next_word = words[i+n-1] | |
| if context not in model: | |
| model[context] = [] | |
| model[context].append(next_word) | |
| ngram_models[f"{n}gram_model"] = model | |
| return ngram_models | |
| def predict_next_word(self, text, models): | |
| words = self.tokenize(text) | |
| for n in range(self.maxNgram, self.minNgram - 1, -1): | |
| if len(words) >= n - 1: | |
| context = ' '.join(words[-(n-1):]) | |
| model = models.get(f"{n}gram_model", {}) | |
| if context in model: | |
| return random.choice(model[context]) | |
| return '' | |
| def predict_next_word_with_attention(self, text): | |
| tokens = self.tokenize(text) | |
| d_model = 3 | |
| embeddings = self.embed_tokens(tokens) | |
| positional_encodings = self.positional_encoding(len(tokens), d_model) | |
| encoded_embeddings = self.add_positional_encoding(embeddings, positional_encodings) | |
| num_heads = 1 if len(tokens) > 25 else max(1, len(tokens)) | |
| attention_output = self.multi_head_attention(encoded_embeddings, encoded_embeddings, encoded_embeddings, num_heads) | |
| ff_output = self.feed_forward_network(attention_output) | |
| ngram_prediction = self.predict_next_word(text, self.ngram_models) | |
| return ngram_prediction | |
| def clean_user_input(self, text): | |
| return text.lower() | |
| def print_progress(self, progress, total): | |
| percent = (progress / total) * 100 | |
| bar_length = 40 | |
| filled_length = int(bar_length * progress // total) | |
| bar = '|' * filled_length + '-' * (bar_length - filled_length) | |
| print(f'{self.RED}\r[{bar}] {percent:.2f}% Complete{self.RESET}', end='') | |
| def train_model(self, corpus): | |
| print(f'{self.RED}\nTraining for {self.ModelName} has begun.{self.RESET}') | |
| cleaned_corpus = re.sub(r'[\r\n]+', ' ', corpus.strip()) | |
| self.print_progress(0, 3) | |
| cleaned_corpus = re.sub(r'[.,!?]', '', cleaned_corpus) | |
| self.print_progress(1, 3) | |
| ngram_models = self.build_ngram_models(cleaned_corpus) | |
| self.print_progress(2, 3) | |
| self.print_progress(3, 3) | |
| print(f'{self.RED}\nTraining complete.{self.RESET}') | |
| return ngram_models | |
| def save_model(self): | |
| with open(self.model_file, 'wb') as f: | |
| pickle.dump(self.ngram_models, f) | |
| print(f'{self.RED}Model saved to {self.model_file}{self.RESET}') | |
| def load_model(self): | |
| with open(self.model_file, 'rb') as f: | |
| return pickle.load(f) | |
| def predict_sentence_with_attention(self, input_text, output_length): | |
| cleaned_input = self.clean_user_input(input_text) | |
| sentence = cleaned_input | |
| for _ in range(output_length): | |
| prediction = self.predict_next_word_with_attention(sentence) | |
| if prediction == '<|endoftext|>': | |
| break | |
| sentence += ' ' + prediction | |
| if cleaned_input in sentence: | |
| sentence = sentence.replace(cleaned_input, '', 1).strip() | |
| return sentence | |
| def remove_duplicates(self, text): | |
| words = text.split() | |
| unique_words = list(dict.fromkeys(words)) | |
| return ' '.join(unique_words) | |
| def AskAgGPT8TURBO(self, input_text): | |
| input_text = str(input_text).lower() | |
| raw_response = self.predict_sentence_with_attention(self.user + ": " + input_text.lower() + "\n" + self.ai + ": ", self.max_length) | |
| raw_response = str(raw_response) | |
| response = raw_response.replace(self.user + ": ", "").replace(self.ai + ": ", "") | |
| response = self.remove_duplicates(response) | |
| return response | |
| def run(self): | |
| while True: | |
| input_text = input(f'{self.GREEN}\nType a message (type exit to leave): {self.RESET}') | |
| if input_text.lower() == 'exit': | |
| break | |
| print(f"{self.BLUE}{self.ModelName}: {self.RESET}", end="") | |
| response = self.AskAgGPT8TURBO(input_text) | |
| response = correct_grammar(response) | |
| print(f"{self.BLUE}{response}{self.RESET}") | |
| if __name__ == "__main__": | |
| model = AgX() | |
| model.run() |