File size: 12,244 Bytes
4bc1d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import os
import re
import random
from typing import List, Dict, Tuple, Any
from collections import defaultdict
import math
from feather import FeatherManager, similarity_score
from train import GrammarRules, PatternExtractor

class ResponseGenerator: 
    def __init__(self, feather_manager: FeatherManager):
        self.feather_manager = feather_manager
        self.pattern_extractor = PatternExtractor()
        self.grammar_rules = GrammarRules()
        self.models = []
        self.context_window = []
        self.max_context_length = 10
        
    def load_models(self):
        print("Loading mini-models...")
        self.models = self.feather_manager.load_all_models()
        print(f"Loaded {len(self.models)} mini-models")
        
        if not self.models:
            print("No trained models found! Please run train.py first.")
            return False
        
        return True
    
    def calculate_model_scores(self, user_input: str) -> List[Tuple[Dict[str, Any], float]]:
        if not self.models:
            return []
        
        input_pattern = self.pattern_extractor.create_pattern(user_input)
        input_keywords = set(self.pattern_extractor.extract_keywords(user_input))
        
        model_scores = []
        
        for model in self.models:
            score = 0.0
            pattern_matches = 0
            keyword_matches = 0
            
            for pattern in model.get('patterns', []):
                pattern_sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
                score += pattern_sim
                if pattern_sim > 0.3:
                    pattern_matches += 1
            
            model_keywords = set(model.get('keywords', []))
            if model_keywords and input_keywords:
                keyword_overlap = len(input_keywords.intersection(model_keywords))
                keyword_total = len(input_keywords.union(model_keywords))
                keyword_score = keyword_overlap / keyword_total if keyword_total > 0 else 0
                score += keyword_score * 2
                keyword_matches = keyword_overlap
            
            confidence = model.get('confidence', 0.5)
            score *= confidence
            
            training_samples = model.get('training_samples', 1)
            training_bonus = min(0.2, training_samples / 100)
            score += training_bonus
            
            context_bonus = self._calculate_context_bonus(user_input, model)
            score += context_bonus
            
            model_scores.append((model, score))
        
        model_scores.sort(key=lambda x: x[1], reverse=True)
        
        return model_scores
    
    def _calculate_context_bonus(self, user_input: str, model: Dict[str, Any]) -> float:
        if not self.context_window:
            return 0.0
        
        context_bonus = 0.0
        
        for prev_input, prev_response in self.context_window[-3:]:
            for pattern in model.get('patterns', [])[:5]:
                pattern_sim = similarity_score(prev_input, pattern.strip())
                context_bonus += pattern_sim * 0.1
        
        return min(context_bonus, 0.3)
    
    def select_top_models(self, model_scores: List[Tuple[Dict[str, Any], float]], top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]:
        valid_models = [(model, score) for model, score in model_scores if score > 0.01]
        
        if not valid_models:
            valid_models = random.sample(model_scores, min(3, len(model_scores)))
        
        return valid_models[:top_k]
    
    def generate_responses_from_models(self, user_input: str, top_models: List[Tuple[Dict[str, Any], float]]) -> List[Tuple[str, float]]:
        responses = []
        input_pattern = self.pattern_extractor.create_pattern(user_input)
        
        for model, model_score in top_models:
            model_responses = []
            best_similarity = 0.0
            
            patterns = model.get('patterns', [])
            model_responses_list = model.get('responses', [])
            
            if not patterns or not model_responses_list:
                continue
            
            best_matches = []
            for i, pattern in enumerate(patterns):
                if i < len(model_responses_list):
                    sim = self.pattern_extractor.calculate_pattern_similarity(input_pattern, pattern)
                    if sim > 0.1:
                        best_matches.append((model_responses_list[i], sim))
            
            best_matches.sort(key=lambda x: x[1], reverse=True)
            
            selected_responses = best_matches[:3] if best_matches else [(random.choice(model_responses_list), 0.1)]
            
            for response, pattern_sim in selected_responses:
                weight = model_score * (0.7 + pattern_sim * 0.3)
                responses.append((response, weight))
        
        return responses
    
    def combine_responses(self, responses: List[Tuple[str, float]]) -> str:
        if not responses:
            return "I'm not sure how to respond to that."
        
        filtered_responses = [(resp, weight) for resp, weight in responses if weight > 0.05]
        if not filtered_responses:
            filtered_responses = responses[:1]
        
        response_groups = defaultdict(list)
        for response, weight in filtered_responses:
            key = ' '.join(response.split()[:3]).lower()
            response_groups[key].append((response, weight))
        
        best_responses = []
        for group in response_groups.values():
            best_resp, best_weight = max(group, key=lambda x: x[1])
            best_responses.append((best_resp, best_weight))
        
        if len(best_responses) > 1:
            total_weight = sum(weight for _, weight in best_responses)
            if total_weight > 0:
                normalized_weights = [weight / total_weight for _, weight in best_responses]
                
                rand_val = random.random()
                cumsum = 0.0
                for i, norm_weight in enumerate(normalized_weights):
                    cumsum += norm_weight
                    if rand_val <= cumsum:
                        selected_response = best_responses[i][0]
                        break
                else:
                    selected_response = best_responses[0][0]
            else:
                selected_response = best_responses[0][0]
        else:
            selected_response = best_responses[0][0]
        
        final_response = selected_response
        
        if not final_response.endswith('<eos>'):
            final_response += ' <eos>'
        
        return final_response
    
    def generate_response(self, user_input: str) -> str:
        if not user_input.strip():
            return "Please say something! <eos>"
        
        model_scores = self.calculate_model_scores(user_input)
        
        if not model_scores:
            return "I need to learn more before I can respond properly. <eos>"
        
        top_models = self.select_top_models(model_scores, top_k=5)
        
        responses = self.generate_responses_from_models(user_input, top_models)
        
        final_response = self.combine_responses(responses)
        
        self.context_window.append((user_input, final_response))
        if len(self.context_window) > self.max_context_length:
            self.context_window.pop(0)
        
        return final_response
    
    def get_model_statistics(self) -> Dict[str, Any]:
        if not self.models:
            return {"total_models": 0}
        
        total_patterns = sum(len(model.get('patterns', [])) for model in self.models)
        total_responses = sum(len(model.get('responses', [])) for model in self.models)
        avg_confidence = sum(model.get('confidence', 0) for model in self.models) / len(self.models)
        total_training_samples = sum(model.get('training_samples', 0) for model in self.models)
        
        return {
            "total_models": len(self.models),
            "total_patterns": total_patterns,
            "total_responses": total_responses,
            "average_confidence": avg_confidence,
            "total_training_samples": total_training_samples
        }


class AgGPTChat:
    
    def __init__(self, models_dir: str = "models"):
        self.feather_manager = FeatherManager(models_dir)
        self.response_generator = ResponseGenerator(self.feather_manager)
        self.conversation_history = []
    
    def initialize(self) -> bool:
        print("AgGPT-20 Scalable Feather Architecture Chat")
        print("=" * 50)
        
        success = self.response_generator.load_models()
        if success:
            stats = self.response_generator.get_model_statistics()
            print(f"Model Statistics:")
            print(f"   Mini-models loaded: {stats['total_models']}")
            print(f"   Total patterns: {stats['total_patterns']}")
            print(f"   Total responses: {stats['total_responses']}")
            print(f"   Average confidence: {stats['average_confidence']:.3f}")
            print(f"   Training samples: {stats['total_training_samples']}")
            print("=" * 50)
            print("Chat initialized! Type 'quit' to exit.")
            print("Large context window active - I'll remember our conversation!")
            print()
        
        return success
    
    def chat_loop(self):
        if not self.initialize():
            return
        
        while True:
            try:
                user_input = input("You: ").strip()
                
                if not user_input:
                    continue
                
                if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']:
                    print("AgGPT: Goodbye! Thanks for chatting with me! <eos>")
                    break
                
                if user_input.lower() in ['stats', 'statistics']:
                    stats = self.response_generator.get_model_statistics()
                    print("Current Statistics:")
                    for key, value in stats.items():
                        print(f"   {key}: {value}")
                    continue
                
                if user_input.lower() in ['clear', 'reset']:
                    self.response_generator.context_window = []
                    print("Context cleared!")
                    continue
                
                print("AgGPT: ", end="", flush=True)
                response = self.response_generator.generate_response(user_input)
                
                display_response = response.replace(' <eos>', '').replace('<eos>', '')
                print(display_response)
                print()
                
                self.conversation_history.append({
                    'user': user_input,
                    'assistant': display_response
                })
                
            except KeyboardInterrupt:
                print("\n\nAgGPT: Chat interrupted. Goodbye!")
                break
            except Exception as e:
                print(f"\nError: {e}")
                print("Let me try again...")
                continue
    
    def batch_test(self, test_inputs: List[str]):
        if not self.initialize():
            return
        
        print("Running batch test...")
        print("=" * 50)
        
        for i, test_input in enumerate(test_inputs, 1):
            print(f"Test {i}: {test_input}")
            response = self.response_generator.generate_response(test_input)
            display_response = response.replace(' <eos>', '').replace('<eos>', '')
            print(f"Response: {display_response}")
            print("-" * 30)


def main():
    chat = AgGPTChat()
    
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "test":
        test_inputs = [
            "hi",
            "hello there",
            "how are you?",
            "what's your favorite color?",
            "tell me a joke",
            "thank you",
            "goodbye"
        ]
        chat.batch_test(test_inputs)
    else:
        chat.chat_loop()


if __name__ == "__main__":
    main()