AgX-2 / turbo.py
AGofficial's picture
Upload 8 files
767f47f verified
import math
import random
import re
import pickle
import os
from data import corpus
from agxpre import correct_grammar
class AgX:
def __init__(self, model_name='AgX-2', max_length=10000, model_file='AgX_model.pkl'):
self.ModelName = model_name
self.max_length = max_length
self.user = 'user'
self.ai = 'ai'
self.minNgram = 1
self.maxNgram = 5
self.RED = '\033[91m'
self.GREEN = '\033[92m'
self.BLUE = '\033[94m'
self.RESET = '\033[0m'
self.model_file = model_file
if os.path.exists(self.model_file):
print(f'{self.RED}Loading saved model from {self.model_file}...{self.RESET}')
self.ngram_models = self.load_model()
else:
print(f'{self.RED}No saved model found. Training new model...{self.RESET}')
self.ngram_models = self.train_model(corpus)
self.save_model()
def mat_mul(self, A, B):
result = []
for i in range(len(A)):
result.append([sum(A[i][k] * B[k][j] for k in range(len(B))) for j in range(len(B[0]))])
return result
def softmax(self, x):
exp_x = [math.exp(v - max(x)) for v in x]
sum_exp_x = sum(exp_x)
return [e / sum_exp_x for e in exp_x]
def self_attention(self, Q, K, V):
scores = [[sum(Q[i][idx] * K[j][idx] for idx in range(len(Q[i]))) for j in range(len(K))] for i in range(len(Q))]
attention_weights = [self.softmax(row) for row in scores]
output = [[sum(attention_weights[i][k] * V[k][j] for k in range(len(V))) for j in range(len(V[0]))] for i in range(len(V))]
return output
def multi_head_attention(self, Q, K, V, num_heads):
d_model = len(Q[0])
head_size = d_model // num_heads
outputs = []
for head in range(num_heads):
q_head = [row[head * head_size:(head + 1) * head_size] for row in Q]
k_head = [row[head * head_size:(head + 1) * head_size] for row in K]
v_head = [row[head * head_size:(head + 1) * head_size] for row in V]
attention_output = self.self_attention(q_head, k_head, v_head)
outputs.extend(attention_output)
return outputs
def positional_encoding(self, seq_len, d_model):
encoding = [[math.sin(pos / (10000 ** (i / d_model))) if i % 2 == 0 else math.cos(pos / (10000 ** (i / d_model))) for i in range(d_model)] for pos in range(seq_len)]
return encoding
def add_positional_encoding(self, embeddings, positional_encodings):
return [[val + positional_encodings[i][j] for j, val in enumerate(row)] for i, row in enumerate(embeddings)]
def feed_forward_network(self, x):
input_dim = len(x[0])
hidden_dim = 10
output_dim = 10
W1 = [[1 if i == j else 0 for j in range(hidden_dim)] for i in range(input_dim)]
b1 = [0] * hidden_dim
W2 = [[1 for _ in range(output_dim)] for _ in range(hidden_dim)]
b2 = [0] * output_dim
hidden = [[max(0, sum(x[i][k] * W1[k][j] for k in range(len(W1))) + b1[j]) for j in range(hidden_dim)] for i in range(len(x))]
output = [[sum(hidden[i][k] * W2[k][j] for k in range(len(W2))) + b2[j] for j in range(output_dim)] for i in range(len(hidden))]
return output
def tokenize(self, text):
return text.lower().split()
def embed_tokens(self, tokens):
return [[random.random() for _ in range(3)] for _ in tokens]
def build_ngram_models(self, corpus, min_n=1, max_n=5):
ngram_models = {}
words = self.tokenize(corpus)
for n in range(min_n, max_n + 1):
model = {}
for i in range(len(words) - n):
context = ' '.join(words[i:i+n-1])
next_word = words[i+n-1]
if context not in model:
model[context] = []
model[context].append(next_word)
ngram_models[f"{n}gram_model"] = model
return ngram_models
def predict_next_word(self, text, models):
words = self.tokenize(text)
for n in range(self.maxNgram, self.minNgram - 1, -1):
if len(words) >= n - 1:
context = ' '.join(words[-(n-1):])
model = models.get(f"{n}gram_model", {})
if context in model:
return random.choice(model[context])
return ''
def predict_next_word_with_attention(self, text):
tokens = self.tokenize(text)
d_model = 3
embeddings = self.embed_tokens(tokens)
positional_encodings = self.positional_encoding(len(tokens), d_model)
encoded_embeddings = self.add_positional_encoding(embeddings, positional_encodings)
num_heads = 1 if len(tokens) > 25 else max(1, len(tokens))
attention_output = self.multi_head_attention(encoded_embeddings, encoded_embeddings, encoded_embeddings, num_heads)
ff_output = self.feed_forward_network(attention_output)
ngram_prediction = self.predict_next_word(text, self.ngram_models)
return ngram_prediction
def clean_user_input(self, text):
return text.lower()
def print_progress(self, progress, total):
percent = (progress / total) * 100
bar_length = 40
filled_length = int(bar_length * progress // total)
bar = '|' * filled_length + '-' * (bar_length - filled_length)
print(f'{self.RED}\r[{bar}] {percent:.2f}% Complete{self.RESET}', end='')
def train_model(self, corpus):
print(f'{self.RED}\nTraining for {self.ModelName} has begun.{self.RESET}')
cleaned_corpus = re.sub(r'[\r\n]+', ' ', corpus.strip())
self.print_progress(0, 3)
cleaned_corpus = re.sub(r'[.,!?]', '', cleaned_corpus)
self.print_progress(1, 3)
ngram_models = self.build_ngram_models(cleaned_corpus)
self.print_progress(2, 3)
self.print_progress(3, 3)
print(f'{self.RED}\nTraining complete.{self.RESET}')
return ngram_models
def save_model(self):
with open(self.model_file, 'wb') as f:
pickle.dump(self.ngram_models, f)
print(f'{self.RED}Model saved to {self.model_file}{self.RESET}')
def load_model(self):
with open(self.model_file, 'rb') as f:
return pickle.load(f)
def predict_sentence_with_attention(self, input_text, output_length):
cleaned_input = self.clean_user_input(input_text)
sentence = cleaned_input
for _ in range(output_length):
prediction = self.predict_next_word_with_attention(sentence)
if prediction == '<|endoftext|>':
break
sentence += ' ' + prediction
if cleaned_input in sentence:
sentence = sentence.replace(cleaned_input, '', 1).strip()
return sentence
def remove_duplicates(self, text):
words = text.split()
unique_words = list(dict.fromkeys(words))
return ' '.join(unique_words)
def AskAgGPT8TURBO(self, input_text):
input_text = str(input_text).lower()
raw_response = self.predict_sentence_with_attention(self.user + ": " + input_text.lower() + "\n" + self.ai + ": ", self.max_length)
raw_response = str(raw_response)
response = raw_response.replace(self.user + ": ", "").replace(self.ai + ": ", "")
response = self.remove_duplicates(response)
return response
def run(self):
while True:
input_text = input(f'{self.GREEN}\nType a message (type exit to leave): {self.RESET}')
if input_text.lower() == 'exit':
break
print(f"{self.BLUE}{self.ModelName}: {self.RESET}", end="")
response = self.AskAgGPT8TURBO(input_text)
response = correct_grammar(response)
print(f"{self.BLUE}{response}{self.RESET}")
if __name__ == "__main__":
model = AgX()
model.run()