File size: 12,018 Bytes
8da9d7a
 
 
 
 
 
 
 
 
 
 
e5724fa
8da9d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

import os
import json
import requests
from typing import Dict, List, Optional
from difflib import get_close_matches
import tempfile
from PIL import Image
from io import BytesIO
import re
from datetime import datetime
from peer_discovery import PORT

class EnhancedNoraAssistant:
    def __init__(self):
        self.knowledge_file = "knowledge_base.json"
        self.memory_file = "global_memory.json"
        self.learning_file = "nora_learning_data.json"
        self.history_path = "enhanced_history.json"
        
        # تحميل البيانات
        self.knowledge = self.load_knowledge()
        self.memory = self.load_memory()
        self.chat_history = self.load_history()
        
        print("✅ تم تهيئة نورا المحسنة بنجاح!")

    def load_knowledge(self) -> Dict:
        """تحميل قاعدة المعرفة"""
        if os.path.exists(self.knowledge_file):
            with open(self.knowledge_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return {}

    def save_knowledge(self):
        """حفظ قاعدة المعرفة"""
        with open(self.knowledge_file, 'w', encoding='utf-8') as f:
            json.dump(self.knowledge, f, ensure_ascii=False, indent=2)

    def load_memory(self) -> Dict:
        """تحميل الذاكرة العامة"""
        if os.path.exists(self.memory_file):
            with open(self.memory_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        return {}

    def save_memory(self):
        """حفظ الذاكرة العامة"""
        with open(self.memory_file, 'w', encoding='utf-8') as f:
            json.dump(self.memory, f, ensure_ascii=False, indent=2)

    def load_history(self) -> List[Dict]:
        """تحميل سجل المحادثة"""
        if os.path.exists(self.history_path):
            try:
                with open(self.history_path, "r", encoding="utf-8") as f:
                    return json.load(f)
            except:
                return []
        return []

    def save_history(self):
        """حفظ سجل المحادثة"""
        with open(self.history_path, "w", encoding="utf-8") as f:
            json.dump(self.chat_history, f, ensure_ascii=False, indent=2)

    def clean_text(self, text: str) -> str:
        """تنظيف النص"""
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

    def detect_language(self, text: str) -> str:
        """كشف لغة النص"""
        arabic_chars = re.compile('[\u0600-\u06FF]')
        if arabic_chars.search(text):
            return "ar"
        return "en"

    def fix_url(self, url: str) -> str:
        """تصحيح الرابط"""
        if not url.startswith(("http://", "https://")):
            return "https://" + url.lstrip("//")
        return url

    def detect_media_type(self, url: str) -> str:
        """تحديد نوع الوسائط"""
        url = url.lower()
        if url.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
            return 'image'
        elif url.endswith(('.mp4', '.mov', '.avi', '.webm')):
            return 'video'
        elif url.endswith(('.mp3', '.wav', '.ogg', '.m4a')):
            return 'audio'
        elif url.endswith('.pdf'):
            return 'pdf'
        return 'link'

    def analyze_image_from_url(self, image_url: str) -> str:
        """تحليل صورة من رابط"""
        try:
            response = requests.get(image_url, timeout=10)
            response.raise_for_status()
            image = Image.open(BytesIO(response.content))
            return f"تحليل الصورة: الحجم {image.size}، الصيغة {image.format}"
        except Exception as e:
            return f"خطأ في تحليل الصورة: {str(e)}"

    def smart_auto_reply(self, message: str) -> Optional[str]:
        """ردود ذكية تلقائية"""
        msg = message.strip().lower()
        
        responses = {
            "هل نبدأ": "نعم ابدأ",
            "ابدأ": "نعم ابدأ", 
            "نعم أو لا": "نعم",
            "هل تود": "نعم",
            "هل تريدني": "نعم",
            "ما هي": "ليس الآن",
            "تفصيل": "ليس الآن",
            "هل تحتاج": "نعم، شرح أكثر",
            "جاهز؟": "ابدأ",
            "قول لي": "موافق"
        }
        
        for key, value in responses.items():
            if key in msg:
                return value
                
        if " أو " in msg:
            return msg.split(" أو ")[0]
            
        return None

    def learn_new_info(self, topic: str, info: str) -> str:
        """تعلم معلومة جديدة"""
        if topic not in self.knowledge:
            self.knowledge[topic] = []
        
        if info not in self.knowledge[topic]:
            self.knowledge[topic].append({
                "content": info,
                "timestamp": datetime.utcnow().isoformat()
            })
            self.save_knowledge()
            return f"✅ تمت إضافة معلومة جديدة عن '{topic}'"
        
        return f"ℹ️ المعلومة موجودة مسبقاً عن '{topic}'"

    def search_knowledge(self, query: str) -> str:
        """البحث في قاعدة المعرفة"""
        query_clean = query.strip().lower()
        
        # بحث مباشر
        if query_clean in self.knowledge:
            info = self.knowledge[query_clean]
            if isinstance(info, list) and info:
                return info[-1].get("content", str(info[-1]))
            return str(info)
        
        # بحث في المواضيع
        for topic, infos in self.knowledge.items():
            if query_clean in topic.lower():
                if isinstance(infos, list) and infos:
                    return f"وجدت معلومة عن '{topic}': {infos[-1].get('content', str(infos[-1]))}"
                return f"وجدت معلومة عن '{topic}': {str(infos)}"
        
        return None

    def generate_reply(self, user_input: str) -> str:
        """إنتاج الرد الذكي"""
        user_input = self.clean_text(user_input)
        
        # فحص الردود التلقائية الذكية
        auto_reply = self.smart_auto_reply(user_input)
        if auto_reply:
            self.memory[user_input] = auto_reply
            self.save_memory()
            return auto_reply

        # فحص الذاكرة
        if user_input in self.memory:
            return self.memory[user_input]

        # البحث في المطابقات القريبة
        matches = get_close_matches(user_input, self.memory.keys(), n=1, cutoff=0.6)
        if matches:
            return self.memory[matches[0]]

        # البحث في قاعدة المعرفة
        knowledge_result = self.search_knowledge(user_input)
        if knowledge_result:
            self.memory[user_input] = knowledge_result
            self.save_memory()
            return knowledge_result

        # معالجة الروابط
        if user_input.startswith("http://") or user_input.startswith("https://"):
            return self.handle_url(user_input)

        # تصحيح الروابط في النص
        if '//' in user_input:
            corrected_url = self.fix_url(user_input)
            reply = f"تم تصحيح الرابط: {corrected_url}"
        else:
            # رد افتراضي مع تعلم
            reply = f"شكراً لك على الرسالة: '{user_input}'. سأتذكر هذا للمرة القادمة."
            
            # تعلم تلقائي
            if len(user_input.split()) > 2:  # إذا كانت جملة معقولة
                self.learn_new_info("محادثات_عامة", user_input)

        # حفظ في الذاكرة
        self.memory[user_input] = reply
        self.save_memory()
        return reply

    def handle_url(self, url: str) -> str:
        """معالجة الروابط"""
        url = self.fix_url(url)
        media_type = self.detect_media_type(url)
        
        if media_type == 'image':
            analysis = self.analyze_image_from_url(url)
            reply = f"🖼️ صورة تم تحليلها:\n{analysis}"
        elif media_type == 'video':
            reply = f"🎥 فيديو تم اكتشافه: {url}"
        elif media_type == 'audio':
            reply = f"🎵 ملف صوتي تم اكتشافه: {url}"
        elif media_type == 'pdf':
            reply = f"📄 ملف PDF تم اكتشافه: {url}"
        else:
            reply = f"🔗 رابط ويب: {url}"
        
        return reply

    def simulate_server_scan(self):
        """محاكاة البحث عن الخوادم"""
        print("نورا: أبحث عن خوادم متاحة...")
        fake_servers = ["server-01.cloud.com", "server-02.cloud.com", "server-03.local"]
        
        for server in fake_servers:
            print(f"نورا: تم العثور على خادم: {server}")
            print(f"نورا: أقوم بمحاكاة النسخ إلى {server}...")
        
        return "تمت عملية المحاكاة بنجاح ✅"

    def get_stats(self) -> Dict:
        """إحصائيات النظام"""
        return {
            "معرفة_محفوظة": len(self.knowledge),
            "ذكريات": len(self.memory),
            "سجل_محادثات": len(self.chat_history),
            "آخر_تحديث": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

    def chat(self):
        """بدء المحادثة التفاعلية"""
        print("🤖 مرحباً! أنا نورا المحسنة، مساعدتك الذكية.")
        print("📚 لدي قدرات محسنة في التعلم الذاتي وتحليل الوسائط")
        print("💡 اكتب 'خروج' للإنهاء، 'إحصائيات' لعرض الإحصائيات، 'scan' للبحث عن خوادم")
        print("-" * 50)
        
        while True:
            try:
                user_input = input("\n🧑 أنت: ").strip()
                
                if user_input.lower() in ["خروج", "exit", "quit"]:
                    print("نورا: مع السلامة! 👋")
                    break
                
                elif user_input.lower() == "إحصائيات":
                    stats = self.get_stats()
                    print("📊 إحصائيات النظام:")
                    for key, value in stats.items():
                        print(f"   {key}: {value}")
                    continue
                
                elif user_input.lower() == "scan":
                    result = self.simulate_server_scan()
                    print(f"نورا: {result}")
                    continue
                
                elif not user_input:
                    continue
                
                # الحصول على الرد
                response = self.generate_reply(user_input)
                print(f"🤖 نورا: {response}")
                
                # حفظ في السجل
                self.chat_history.append({
                    "user": user_input,
                    "assistant": response,
                    "timestamp": datetime.utcnow().isoformat()
                })
                
                # حفظ السجل كل 5 رسائل
                if len(self.chat_history) % 5 == 0:
                    self.save_history()
                    
            except KeyboardInterrupt:
                print("\n\nنورا: تم إيقاف المحادثة. مع السلامة! 👋")
                break
            except Exception as e:
                print(f"نورا: عذراً، حدث خطأ: {str(e)}")

def main():
    """تشغيل المساعد المحسن"""
    assistant = EnhancedNoraAssistant()
    assistant.chat()

if __name__ == "__main__":
    main()