| import random |
| import re |
| from data import corpus |
| from transform import base_dict |
| from typing import List, Dict, Set, Optional, Any |
| from tests import * |
|
|
| class AgGPT9m: |
| def __init__(self): |
| self.synonym_dict: Dict[str, List[str]] = self._build_synonym_dict() |
| self.corpus: str = corpus |
|
|
| def get_ai_response( |
| self, |
| user_input: str, |
| threshold: float = 0.3, |
| num_variations: int = 10, |
| use_synonyms_in_response: bool = True, |
| synonym_chance: float = 0.25 |
| ) -> str: |
| if not user_input or not isinstance(user_input, str): |
| return self._format_response("I sense the silence between words. What would you like to explore?", False, 0) |
|
|
| input_variations = self._generate_input_variations(user_input, num_variations) |
| pairs = self._parse_corpus(self.corpus) |
|
|
| best_match: Optional[Dict[str, str]] = None |
| best_score: float = 0.0 |
|
|
| for pair in pairs: |
| question = pair['question'] |
| max_sim_for_pair = 0.0 |
| for variation in input_variations: |
| similarity = self._calculate_similarity(variation, question) |
| if similarity > max_sim_for_pair: |
| max_sim_for_pair = similarity |
|
|
| if max_sim_for_pair > best_score: |
| best_score = max_sim_for_pair |
| best_match = pair |
|
|
| if best_match and best_score >= threshold: |
| response = best_match['answer'] |
| else: |
| response = "The patterns of your inquiry dance beyond my current understanding. Perhaps you could illuminate your thoughts differently?" |
|
|
| return self._format_response(response, use_synonyms=use_synonyms_in_response, synonym_chance=synonym_chance) |
|
|
| def _build_synonym_dict(self) -> Dict[str, List[str]]: |
| bidirectional_dict: Dict[str, List[str]] = {} |
| for key, synonyms in base_dict.items(): |
| all_words = [key] + synonyms |
| for word in all_words: |
| bidirectional_dict[word] = [w for w in all_words if w != word] |
| return bidirectional_dict |
|
|
| def _generate_input_variations(self, text: str, num_variations: int) -> Set[str]: |
| words = re.split(r'(\s+|[.,;:!?])', text) |
| variations: Set[str] = {text} |
|
|
| for _ in range(num_variations): |
| new_words: List[str] = [] |
| for word_part in words: |
| word_lower = word_part.lower().strip() |
| if word_lower in self.synonym_dict and self.synonym_dict[word_lower] and random.random() < 0.5: |
| random_synonym = random.choice(self.synonym_dict[word_lower]) |
| if word_part and word_part[0].isupper(): |
| new_words.append(random_synonym.capitalize()) |
| else: |
| new_words.append(random_synonym) |
| else: |
| new_words.append(word_part) |
| variations.add("".join(new_words)) |
|
|
| return variations |
|
|
| def _parse_corpus(self, corpus_text: str) -> List[Dict[str, str]]: |
| pairs: List[Dict[str, str]] = [] |
| entries = corpus_text.split('<|endoftext|>') |
|
|
| for entry in entries: |
| entry = entry.strip() |
| if not entry: |
| continue |
|
|
| lines = [line.strip() for line in entry.split('\n') if line.strip()] |
| user_question = '' |
| ai_response = '' |
|
|
| for line in lines: |
| if line.startswith('user:'): |
| user_question = line[5:].strip() |
| elif line.startswith('ai:'): |
| ai_response = line[3:].strip() |
|
|
| if user_question and ai_response: |
| pairs.append({'question': user_question, 'answer': ai_response}) |
|
|
| return pairs |
|
|
| def _calculate_similarity(self, str1: str, str2: str) -> float: |
| s1 = str1.lower().strip() |
| s2 = str2.lower().strip() |
|
|
| if s1 == s2: |
| return 1.0 |
|
|
| len1, len2 = len(s1), len(s2) |
| if len1 == 0 or len2 == 0: |
| return 0.0 |
|
|
| matrix = [[0] * (len1 + 1) for _ in range(len2 + 1)] |
|
|
| for i in range(len1 + 1): |
| matrix[0][i] = i |
| for j in range(len2 + 1): |
| matrix[j][0] = j |
|
|
| for j in range(1, len2 + 1): |
| for i in range(1, len1 + 1): |
| cost = 0 if s1[i - 1] == s2[j - 1] else 1 |
| matrix[j][i] = min(matrix[j - 1][i] + 1, |
| matrix[j][i - 1] + 1, |
| matrix[j - 1][i - 1] + cost) |
|
|
| max_len = max(len1, len2) |
| return 1.0 - (matrix[len2][len1] / max_len) |
|
|
| def _apply_synonyms(self, text: str, chance: float) -> str: |
| if not text: |
| return text |
|
|
| words = re.split(r'(\s+|[.,;:!?])', text) |
|
|
| for i, word_part in enumerate(words): |
| word = word_part.lower().strip() |
|
|
| if not word or not word_part.strip(): |
| continue |
|
|
| if word in self.synonym_dict and self.synonym_dict[word] and random.random() < chance: |
| random_synonym = random.choice(self.synonym_dict[word]) |
| if word_part and word_part[0].isupper(): |
| words[i] = random_synonym.capitalize() |
| else: |
| words[i] = random_synonym |
|
|
| return "".join(words) |
|
|
| def _format_response(self, text: str, use_synonyms: bool, synonym_chance: float) -> str: |
| result = text |
|
|
| if use_synonyms: |
| result = self._apply_synonyms(result, synonym_chance) |
|
|
| result = result.strip() |
| if not result: |
| return "" |
|
|
| result = result[0].upper() + result[1:] |
| result = re.sub(r'([.!?]\s+)([a-z])', lambda m: m.group(1) + m.group(2).upper(), result) |
|
|
| if not re.search(r'[.!?]$', result): |
| result += '.' |
|
|
| result = re.sub(r'\s+', ' ', result).strip() |
|
|
| return result |
| |
| def ask(self, prompt): |
| response = self.get_ai_response( |
| prompt, |
| threshold=0.5, |
| num_variations=100, |
| use_synonyms_in_response=True, |
| synonym_chance=1.0 |
| ) |
| return response |
| |
| def direct_ask(self, prompt): |
| response = self.get_ai_response( |
| prompt, |
| threshold=0.5, |
| num_variations=100, |
| use_synonyms_in_response=False, |
| synonym_chance=0.0 |
| ) |
| return response |
|
|
| if __name__ == '__main__': |
| test() |
|
|