AgGPT-16 / AgGPT16.py
AGofficial's picture
Upload 7 files
cb29702 verified
import math
import random
import re
import os
import pandas as pd
from collections import defaultdict, Counter
from AgGPT_Feather import save_model, load_model
class AgGPT16:
def __init__(self, model_file='AgGPT16.feather', max_n=5, output_length=50):
self.model_name = 'AgGPT16'
self.model_file = model_file
self.max_n = max_n
self.output_length = output_length
self.vocabulary = set()
self.word_to_id = {}
self.id_to_word = {}
self.vocab_size = 0
self.models = self._load_or_train()
def _build_vocab_mapping(self):
if self.vocabulary:
vocab_list = sorted(list(self.vocabulary))
self.word_to_id = {word: i for i, word in enumerate(vocab_list)}
self.id_to_word = {i: word for i, word in enumerate(vocab_list)}
self.vocab_size = len(vocab_list)
def _words_to_ids(self, words):
return [self.word_to_id.get(word, 0) for word in words]
def _ids_to_words(self, ids):
return [self.id_to_word.get(id, '<UNK>') for id in ids]
@staticmethod
def _tokenize(text):
tokens = re.findall(r"<\|[\w\s]*\|>|\w+|[^\w\s]", text.lower())
return [token.strip() for token in tokens if token.strip()]
def _build_models(self, corpus_text):
print("Tokenizing...")
words = self._tokenize(corpus_text)
self.vocabulary = set(words)
self._build_vocab_mapping()
word_ids = self._words_to_ids(words)
models = defaultdict(lambda: defaultdict(Counter))
models[1] = Counter(word_ids)
print("Building n-grams...")
for n in range(2, self.max_n + 1):
for i in range(len(word_ids) - n + 1):
prefix = tuple(word_ids[i: i + n - 1])
suffix = word_ids[i + n - 1]
models[n][prefix][suffix] += 1
return models
def _predict_next_id(self, id_sequence):
if not id_sequence:
return 0
max_n = min(self.max_n, len(id_sequence) + 1)
for n in range(max_n, 1, -1):
if len(id_sequence) >= n - 1:
prefix = tuple(id_sequence[-(n - 1):])
candidates = self.models[n].get(prefix)
if candidates:
ids = list(candidates.keys())
weights = list(candidates.values())
total_weight = sum(weights)
r = random.random() * total_weight
cumulative = 0
for i, weight in enumerate(weights):
cumulative += weight
if r <= cumulative:
return ids[i]
if self.models[1]:
ids = list(self.models[1].keys())
weights = list(self.models[1].values())
total_weight = sum(weights)
if total_weight > 0:
r = random.random() * total_weight
cumulative = 0
for i, weight in enumerate(weights):
cumulative += weight
if r <= cumulative:
return ids[i]
return 0
def train(self, corpus_text):
print(f'Training {self.model_name}...')
cleaned_corpus = re.sub(r'[\r\n\s]+', ' ', corpus_text.strip())
self.models = self._build_models(cleaned_corpus)
save_model(self.models, self.model_file, self.word_to_id, self.id_to_word)
print(f'Training complete. Vocabulary: {self.vocab_size} words')
def _load_or_train(self):
if os.path.exists(self.model_file):
result = load_model(self.model_file)
if isinstance(result, tuple) and len(result) == 3:
models, word_to_id, id_to_word = result
self.word_to_id = word_to_id
self.id_to_word = id_to_word
self.vocabulary = set(word_to_id.keys())
self.vocab_size = len(self.vocabulary)
return models
else:
return result
else:
from corpus import corpus
self.train(corpus)
return self.models
def generate_response(self, input_text):
tokens = self._tokenize(input_text.lower())
if not tokens:
return "Please say something."
input_ids = self._words_to_ids(tokens)
generated_ids = []
current_ids = input_ids[-20:] if len(input_ids) > 20 else input_ids
for i in range(min(self.output_length, 80)):
next_id = self._predict_next_id(current_ids)
if next_id == 0:
break
generated_ids.append(next_id)
current_ids.append(next_id)
if len(current_ids) > 20:
current_ids = current_ids[-20:]
if len(generated_ids) >= 3 and len(set(generated_ids[-3:])) == 1:
break
end_token_id = self.word_to_id.get('<|endoftext|>', -1)
if end_token_id != -1 and next_id == end_token_id:
break
if i > 10:
period_id = self.word_to_id.get('.', -1)
exclaim_id = self.word_to_id.get('!', -1)
question_id = self.word_to_id.get('?', -1)
if next_id in [period_id, exclaim_id, question_id]:
break
if not generated_ids:
return "I'm not sure how to respond."
response_words = self._ids_to_words(generated_ids)
response = ' '.join(response_words)
response = re.sub(r'\s+', ' ', response)
response = re.sub(r'\s+([,.!?;:])', r'\1', response)
response = re.sub(r'<\|endoftext\|>', '', response)
if response and response[0].islower():
response = response[0].upper() + response[1:]
return response.strip()
def ask(prompt: str) -> str:
if not prompt.strip():
return "Please ask me something!"
formatted_prompt = "user: " + prompt.strip() + " ai: "
if not hasattr(ask, 'model'):
ask.model = AgGPT16()
model = ask.model
response = model.generate_response(formatted_prompt)
if '<|endoftext|>' in response:
response = response.split('<|endoftext|>')[0]
response = re.sub(r'^\s*(ai|user)\s*:\s*', '', response, flags=re.IGNORECASE)
response = response.strip()
if not response or len(response.strip()) < 2:
fallback_responses = [
"Could you rephrase that?",
"Tell me more.",
"I'm not sure I understand.",
"Let me think about that."
]
response = random.choice(fallback_responses)
return response
if __name__ == "__main__":
while True:
user_input = input("You: ")
if user_input.lower() in {'exit', 'quit'}:
print("Goodbye!")
break
reply = ask(user_input)
print(f"AI: {reply}")