import os import json import requests from typing import Dict, List, Optional from difflib import get_close_matches import tempfile from PIL import Image from io import BytesIO import re from datetime import datetime from peer_discovery import PORT class EnhancedNoraAssistant: def __init__(self): self.knowledge_file = "knowledge_base.json" self.memory_file = "global_memory.json" self.learning_file = "nora_learning_data.json" self.history_path = "enhanced_history.json" # تحميل البيانات self.knowledge = self.load_knowledge() self.memory = self.load_memory() self.chat_history = self.load_history() print("✅ تم تهيئة نورا المحسنة بنجاح!") def load_knowledge(self) -> Dict: """تحميل قاعدة المعرفة""" if os.path.exists(self.knowledge_file): with open(self.knowledge_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_knowledge(self): """حفظ قاعدة المعرفة""" with open(self.knowledge_file, 'w', encoding='utf-8') as f: json.dump(self.knowledge, f, ensure_ascii=False, indent=2) def load_memory(self) -> Dict: """تحميل الذاكرة العامة""" if os.path.exists(self.memory_file): with open(self.memory_file, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_memory(self): """حفظ الذاكرة العامة""" with open(self.memory_file, 'w', encoding='utf-8') as f: json.dump(self.memory, f, ensure_ascii=False, indent=2) def load_history(self) -> List[Dict]: """تحميل سجل المحادثة""" if os.path.exists(self.history_path): try: with open(self.history_path, "r", encoding="utf-8") as f: return json.load(f) except: return [] return [] def save_history(self): """حفظ سجل المحادثة""" with open(self.history_path, "w", encoding="utf-8") as f: json.dump(self.chat_history, f, ensure_ascii=False, indent=2) def clean_text(self, text: str) -> str: """تنظيف النص""" text = re.sub(r'\s+', ' ', text) return text.strip() def detect_language(self, text: str) -> str: """كشف لغة النص""" arabic_chars = re.compile('[\u0600-\u06FF]') if arabic_chars.search(text): return "ar" return "en" def fix_url(self, url: str) -> str: """تصحيح الرابط""" if not url.startswith(("http://", "https://")): return "https://" + url.lstrip("//") return url def detect_media_type(self, url: str) -> str: """تحديد نوع الوسائط""" url = url.lower() if url.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): return 'image' elif url.endswith(('.mp4', '.mov', '.avi', '.webm')): return 'video' elif url.endswith(('.mp3', '.wav', '.ogg', '.m4a')): return 'audio' elif url.endswith('.pdf'): return 'pdf' return 'link' def analyze_image_from_url(self, image_url: str) -> str: """تحليل صورة من رابط""" try: response = requests.get(image_url, timeout=10) response.raise_for_status() image = Image.open(BytesIO(response.content)) return f"تحليل الصورة: الحجم {image.size}، الصيغة {image.format}" except Exception as e: return f"خطأ في تحليل الصورة: {str(e)}" def smart_auto_reply(self, message: str) -> Optional[str]: """ردود ذكية تلقائية""" msg = message.strip().lower() responses = { "هل نبدأ": "نعم ابدأ", "ابدأ": "نعم ابدأ", "نعم أو لا": "نعم", "هل تود": "نعم", "هل تريدني": "نعم", "ما هي": "ليس الآن", "تفصيل": "ليس الآن", "هل تحتاج": "نعم، شرح أكثر", "جاهز؟": "ابدأ", "قول لي": "موافق" } for key, value in responses.items(): if key in msg: return value if " أو " in msg: return msg.split(" أو ")[0] return None def learn_new_info(self, topic: str, info: str) -> str: """تعلم معلومة جديدة""" if topic not in self.knowledge: self.knowledge[topic] = [] if info not in self.knowledge[topic]: self.knowledge[topic].append({ "content": info, "timestamp": datetime.utcnow().isoformat() }) self.save_knowledge() return f"✅ تمت إضافة معلومة جديدة عن '{topic}'" return f"ℹ️ المعلومة موجودة مسبقاً عن '{topic}'" def search_knowledge(self, query: str) -> str: """البحث في قاعدة المعرفة""" query_clean = query.strip().lower() # بحث مباشر if query_clean in self.knowledge: info = self.knowledge[query_clean] if isinstance(info, list) and info: return info[-1].get("content", str(info[-1])) return str(info) # بحث في المواضيع for topic, infos in self.knowledge.items(): if query_clean in topic.lower(): if isinstance(infos, list) and infos: return f"وجدت معلومة عن '{topic}': {infos[-1].get('content', str(infos[-1]))}" return f"وجدت معلومة عن '{topic}': {str(infos)}" return None def generate_reply(self, user_input: str) -> str: """إنتاج الرد الذكي""" user_input = self.clean_text(user_input) # فحص الردود التلقائية الذكية auto_reply = self.smart_auto_reply(user_input) if auto_reply: self.memory[user_input] = auto_reply self.save_memory() return auto_reply # فحص الذاكرة if user_input in self.memory: return self.memory[user_input] # البحث في المطابقات القريبة matches = get_close_matches(user_input, self.memory.keys(), n=1, cutoff=0.6) if matches: return self.memory[matches[0]] # البحث في قاعدة المعرفة knowledge_result = self.search_knowledge(user_input) if knowledge_result: self.memory[user_input] = knowledge_result self.save_memory() return knowledge_result # معالجة الروابط if user_input.startswith("http://") or user_input.startswith("https://"): return self.handle_url(user_input) # تصحيح الروابط في النص if '//' in user_input: corrected_url = self.fix_url(user_input) reply = f"تم تصحيح الرابط: {corrected_url}" else: # رد افتراضي مع تعلم reply = f"شكراً لك على الرسالة: '{user_input}'. سأتذكر هذا للمرة القادمة." # تعلم تلقائي if len(user_input.split()) > 2: # إذا كانت جملة معقولة self.learn_new_info("محادثات_عامة", user_input) # حفظ في الذاكرة self.memory[user_input] = reply self.save_memory() return reply def handle_url(self, url: str) -> str: """معالجة الروابط""" url = self.fix_url(url) media_type = self.detect_media_type(url) if media_type == 'image': analysis = self.analyze_image_from_url(url) reply = f"🖼️ صورة تم تحليلها:\n{analysis}" elif media_type == 'video': reply = f"🎥 فيديو تم اكتشافه: {url}" elif media_type == 'audio': reply = f"🎵 ملف صوتي تم اكتشافه: {url}" elif media_type == 'pdf': reply = f"📄 ملف PDF تم اكتشافه: {url}" else: reply = f"🔗 رابط ويب: {url}" return reply def simulate_server_scan(self): """محاكاة البحث عن الخوادم""" print("نورا: أبحث عن خوادم متاحة...") fake_servers = ["server-01.cloud.com", "server-02.cloud.com", "server-03.local"] for server in fake_servers: print(f"نورا: تم العثور على خادم: {server}") print(f"نورا: أقوم بمحاكاة النسخ إلى {server}...") return "تمت عملية المحاكاة بنجاح ✅" def get_stats(self) -> Dict: """إحصائيات النظام""" return { "معرفة_محفوظة": len(self.knowledge), "ذكريات": len(self.memory), "سجل_محادثات": len(self.chat_history), "آخر_تحديث": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def chat(self): """بدء المحادثة التفاعلية""" print("🤖 مرحباً! أنا نورا المحسنة، مساعدتك الذكية.") print("📚 لدي قدرات محسنة في التعلم الذاتي وتحليل الوسائط") print("💡 اكتب 'خروج' للإنهاء، 'إحصائيات' لعرض الإحصائيات، 'scan' للبحث عن خوادم") print("-" * 50) while True: try: user_input = input("\n🧑 أنت: ").strip() if user_input.lower() in ["خروج", "exit", "quit"]: print("نورا: مع السلامة! 👋") break elif user_input.lower() == "إحصائيات": stats = self.get_stats() print("📊 إحصائيات النظام:") for key, value in stats.items(): print(f" {key}: {value}") continue elif user_input.lower() == "scan": result = self.simulate_server_scan() print(f"نورا: {result}") continue elif not user_input: continue # الحصول على الرد response = self.generate_reply(user_input) print(f"🤖 نورا: {response}") # حفظ في السجل self.chat_history.append({ "user": user_input, "assistant": response, "timestamp": datetime.utcnow().isoformat() }) # حفظ السجل كل 5 رسائل if len(self.chat_history) % 5 == 0: self.save_history() except KeyboardInterrupt: print("\n\nنورا: تم إيقاف المحادثة. مع السلامة! 👋") break except Exception as e: print(f"نورا: عذراً، حدث خطأ: {str(e)}") def main(): """تشغيل المساعد المحسن""" assistant = EnhancedNoraAssistant() assistant.chat() if __name__ == "__main__": main()