| import math |
| import random |
| import re |
| import os |
| import pandas as pd |
| from collections import defaultdict, Counter |
| from AgGPT_Feather import save_model, load_model |
|
|
| class AgGPT16: |
| def __init__(self, model_file='AgGPT16.feather', max_n=5, output_length=50): |
| self.model_name = 'AgGPT16' |
| self.model_file = model_file |
| self.max_n = max_n |
| self.output_length = output_length |
| self.vocabulary = set() |
| self.word_to_id = {} |
| self.id_to_word = {} |
| self.vocab_size = 0 |
| self.models = self._load_or_train() |
|
|
| def _build_vocab_mapping(self): |
| if self.vocabulary: |
| vocab_list = sorted(list(self.vocabulary)) |
| self.word_to_id = {word: i for i, word in enumerate(vocab_list)} |
| self.id_to_word = {i: word for i, word in enumerate(vocab_list)} |
| self.vocab_size = len(vocab_list) |
|
|
| def _words_to_ids(self, words): |
| return [self.word_to_id.get(word, 0) for word in words] |
|
|
| def _ids_to_words(self, ids): |
| return [self.id_to_word.get(id, '<UNK>') for id in ids] |
|
|
| @staticmethod |
| def _tokenize(text): |
| tokens = re.findall(r"<\|[\w\s]*\|>|\w+|[^\w\s]", text.lower()) |
| return [token.strip() for token in tokens if token.strip()] |
|
|
| def _build_models(self, corpus_text): |
| print("Tokenizing...") |
| words = self._tokenize(corpus_text) |
| self.vocabulary = set(words) |
| self._build_vocab_mapping() |
| |
| word_ids = self._words_to_ids(words) |
| models = defaultdict(lambda: defaultdict(Counter)) |
| models[1] = Counter(word_ids) |
| |
| print("Building n-grams...") |
| for n in range(2, self.max_n + 1): |
| for i in range(len(word_ids) - n + 1): |
| prefix = tuple(word_ids[i: i + n - 1]) |
| suffix = word_ids[i + n - 1] |
| models[n][prefix][suffix] += 1 |
| |
| return models |
|
|
| def _predict_next_id(self, id_sequence): |
| if not id_sequence: |
| return 0 |
| |
| max_n = min(self.max_n, len(id_sequence) + 1) |
| for n in range(max_n, 1, -1): |
| if len(id_sequence) >= n - 1: |
| prefix = tuple(id_sequence[-(n - 1):]) |
| candidates = self.models[n].get(prefix) |
| if candidates: |
| ids = list(candidates.keys()) |
| weights = list(candidates.values()) |
| total_weight = sum(weights) |
| r = random.random() * total_weight |
| cumulative = 0 |
| for i, weight in enumerate(weights): |
| cumulative += weight |
| if r <= cumulative: |
| return ids[i] |
| |
| if self.models[1]: |
| ids = list(self.models[1].keys()) |
| weights = list(self.models[1].values()) |
| total_weight = sum(weights) |
| if total_weight > 0: |
| r = random.random() * total_weight |
| cumulative = 0 |
| for i, weight in enumerate(weights): |
| cumulative += weight |
| if r <= cumulative: |
| return ids[i] |
| |
| return 0 |
|
|
| def train(self, corpus_text): |
| print(f'Training {self.model_name}...') |
| cleaned_corpus = re.sub(r'[\r\n\s]+', ' ', corpus_text.strip()) |
| self.models = self._build_models(cleaned_corpus) |
| save_model(self.models, self.model_file, self.word_to_id, self.id_to_word) |
| print(f'Training complete. Vocabulary: {self.vocab_size} words') |
|
|
| def _load_or_train(self): |
| if os.path.exists(self.model_file): |
| result = load_model(self.model_file) |
| if isinstance(result, tuple) and len(result) == 3: |
| models, word_to_id, id_to_word = result |
| self.word_to_id = word_to_id |
| self.id_to_word = id_to_word |
| self.vocabulary = set(word_to_id.keys()) |
| self.vocab_size = len(self.vocabulary) |
| return models |
| else: |
| return result |
| else: |
| from corpus import corpus |
| self.train(corpus) |
| return self.models |
|
|
| def generate_response(self, input_text): |
| tokens = self._tokenize(input_text.lower()) |
| if not tokens: |
| return "Please say something." |
| |
| input_ids = self._words_to_ids(tokens) |
| generated_ids = [] |
| current_ids = input_ids[-20:] if len(input_ids) > 20 else input_ids |
| |
| for i in range(min(self.output_length, 80)): |
| next_id = self._predict_next_id(current_ids) |
| if next_id == 0: |
| break |
| |
| generated_ids.append(next_id) |
| current_ids.append(next_id) |
| |
| if len(current_ids) > 20: |
| current_ids = current_ids[-20:] |
| |
| if len(generated_ids) >= 3 and len(set(generated_ids[-3:])) == 1: |
| break |
| |
| end_token_id = self.word_to_id.get('<|endoftext|>', -1) |
| if end_token_id != -1 and next_id == end_token_id: |
| break |
| |
| if i > 10: |
| period_id = self.word_to_id.get('.', -1) |
| exclaim_id = self.word_to_id.get('!', -1) |
| question_id = self.word_to_id.get('?', -1) |
| if next_id in [period_id, exclaim_id, question_id]: |
| break |
| |
| if not generated_ids: |
| return "I'm not sure how to respond." |
| |
| response_words = self._ids_to_words(generated_ids) |
| response = ' '.join(response_words) |
| response = re.sub(r'\s+', ' ', response) |
| response = re.sub(r'\s+([,.!?;:])', r'\1', response) |
| response = re.sub(r'<\|endoftext\|>', '', response) |
| |
| if response and response[0].islower(): |
| response = response[0].upper() + response[1:] |
| |
| return response.strip() |
|
|
| def ask(prompt: str) -> str: |
| if not prompt.strip(): |
| return "Please ask me something!" |
| |
| formatted_prompt = "user: " + prompt.strip() + " ai: " |
| |
| if not hasattr(ask, 'model'): |
| ask.model = AgGPT16() |
| |
| model = ask.model |
| response = model.generate_response(formatted_prompt) |
| |
| if '<|endoftext|>' in response: |
| response = response.split('<|endoftext|>')[0] |
| |
| response = re.sub(r'^\s*(ai|user)\s*:\s*', '', response, flags=re.IGNORECASE) |
| response = response.strip() |
| |
| if not response or len(response.strip()) < 2: |
| fallback_responses = [ |
| "Could you rephrase that?", |
| "Tell me more.", |
| "I'm not sure I understand.", |
| "Let me think about that." |
| ] |
| response = random.choice(fallback_responses) |
| |
| return response |
|
|
| if __name__ == "__main__": |
| while True: |
| user_input = input("You: ") |
| if user_input.lower() in {'exit', 'quit'}: |
| print("Goodbye!") |
| break |
| reply = ask(user_input) |
| print(f"AI: {reply}") |
|
|