Because / brain.py
mrwabnalas40's picture
Upload 134 files
6661bb6 verified
#[file name]: brain.py
#[file content begin]
# brain.py - PTB v13.15 + Flask with smart task distribution system and enhanced learning
# -*- coding: utf-8 -*-
import sys, socket
import os
import json
import logging
import threading
import time
import random
import httpx
import asyncio
import psutil
import math
import subprocess
import signal
from difflib import get_close_matches
from urllib.parse import urlparse
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from enum import Enum
import tempfile
import shutil
import mimetypes
import re
import base64
from pathlib import Path
import xml.etree.ElementTree as ET
from werkzeug.utils import secure_filename
import uuid
import responses
import analyzer
import core
import file_analyzer
import image_analyzer
import knowledge_search
import transcribe_audio
import self_learning
import self_improvement
import ptb_bot
import perchance_integration
import memory
import media_analyzer
import link_inspector
import learner
from flask import Flask, request, jsonify
# ===== استيراد مولد الصور البسيط =====
try:
from simple_image_generator import generate_simple_image, SimpleImageGenerator
SIMPLE_GENERATOR_AVAILABLE = True
logging.info("✅ تم تحميل مولد الصور البسيط بنجاح")
except ImportError as e:
SIMPLE_GENERATOR_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل مولد الصور البسيط: {e}")
except Exception as e:
SIMPLE_GENERATOR_AVAILABLE = False
logging.warning(f"⚠️ خطأ في تحميل مولد الصور البسيط: {e}")
# ===== استيراد نظام الردود المحسن =====
try:
from responses import (
generate_reply,
save_conversation,
get_saved_response,
EnhancedIntegratedResponseSystem
)
RESPONSES_SYSTEM_AVAILABLE = True
logging.info("✅ تم تحميل نظام الردود المحسن بنجاح")
except ImportError as e:
RESPONSES_SYSTEM_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل نظام الردود: {e}")
# النسخة الاحتياطية
def generate_reply(message, username="default"):
return "نظام الردود غير متاح حالياً"
def save_conversation(question, answer, category="general"):
return False
def get_saved_response(question):
return None
# ===== نظام التحكم في responses.py =====
class ResponsesController:
"""التحكم في نظام responses.py"""
def __init__(self):
self.responses_system = None
self.setup_responses()
def setup_responses(self):
"""إعداد نظام responses"""
try:
# استيراد مباشر للدوال من responses.py
from responses import generate_reply, load_memory, save_memory
self.generate_reply = generate_reply
self.load_memory = load_memory
self.save_memory = save_memory
self.responses_system = True
logging.info("✅ تم تحميل نظام responses بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام responses: {e}")
self.responses_system = False
def get_reply(self, message, username="default"):
"""الحصول على رد من نظام responses"""
if not self.responses_system:
return "نظام الردود غير متاح حالياً"
try:
return self.generate_reply(message, username)
except Exception as e:
logging.error(f"❌ خطأ في توليد الرد: {e}")
return "عذراً، حدث خطأ في معالجة رسالتك"
# ===== نظام التحكم في responses_manager_bot.py =====
class ResponsesManagerController:
"""التحكم في بوت إدارة الردود"""
def __init__(self):
self.manager_bot = None
self.setup_manager_bot()
def setup_manager_bot(self):
"""إعداد بوت إدارة الردود"""
try:
# استيراد الدوال من responses_manager_bot.py
from responses_manager_bot import (
handle_teaching, list_responses, start,
load_saved_responses, save_response
)
self.handle_teaching = handle_teaching
self.list_responses = list_responses
self.start = start
self.load_saved_responses = load_saved_responses
self.save_response = save_response
self.manager_bot = True
logging.info("✅ تم تحميل بوت إدارة الردود بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل بوت إدارة الردود: {e}")
self.manager_bot = False
def teach_response(self, question, answer):
"""تعليم رد جديد"""
if not self.manager_bot:
return {"error": "نظام إدارة الردود غير متاح"}
try:
self.save_response(question, answer)
return {
"success": True,
"message": f"✅ تم حفظ الرد بنجاح!\nس: {question}\nج: {answer}"
}
except Exception as e:
return {"error": f"فشل حفظ الرد: {e}"}
def get_all_responses(self):
"""الحصول على جميع الردود المحفوظة"""
if not self.manager_bot:
return {"error": "نظام إدارة الردود غير متاح"}
try:
responses = self.load_saved_responses()
return {
"success": True,
"total_responses": len(responses),
"responses": responses
}
except Exception as e:
return {"error": f"فشل تحميل الردود: {e}"}
# إصلاح استيراد Telegram - استخدام الإصدار الجديد
try:
from telegram.constants import ChatAction
except ImportError:
# للإصدارات الجديدة من python-telegram-bot
class ChatAction:
TYPING = "typing"
UPLOAD_PHOTO = "upload_photo"
UPLOAD_VIDEO = "upload_video"
UPLOAD_AUDIO = "upload_audio"
UPLOAD_DOCUMENT = "upload_document"
FIND_LOCATION = "find_location"
RECORD_VIDEO = "record_video"
RECORD_VIDEO_NOTE = "record_video_note"
RECORD_AUDIO = "record_audio"
UPLOAD_VIDEO_NOTE = "upload_video_note"
CHOOSE_STICKER = "choose_sticker"
# ===== إصلاح الدوال الأساسية =====
def process_input_from_external_sources(input_data, source_type="unknown"):
"""معالجة المدخلات من المصادر الخارجية - باستخدام نظام responses"""
try:
if RESPONSES_SYSTEM_AVAILABLE:
# استخدام النظام المتقدم من responses.py
response_system = EnhancedIntegratedResponseSystem()
processed_data = {
"processed": True,
"source_type": source_type,
"input_data": input_data,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"response_system": "enhanced"
}
return processed_data
else:
# النسخة الأساسية
clean_input = input_data
if "رد على:" in clean_input:
clean_input = clean_input.split("رد على:", 1)[-1].split("[السياق:")[0].strip()
return {
"processed": True,
"source_type": source_type,
"input_data": clean_input,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"analysis": {
"sentiment": "neutral",
"urgency": "normal",
"category": "general"
}
}
except Exception as e:
return {
"processed": False,
"error": str(e),
"source_type": source_type
}
def generate_smart_response(message):
"""توليد رد ذكي باستخدام نظام responses - الإصدار المصحح"""
try:
# استخدام generate_reply من responses.py مباشرة مع username افتراضي
reply = generate_reply(message, "default")
return reply
except Exception as e:
logging.error(f"❌ خطأ في توليد الرد الذكي: {e}")
# النسخة الاحتياطية
message_lower = message.lower().strip()
# ردود ذكية لمختلف أنواع الرسائل
responses = {
'مرحبا': 'مرحباً! 🤖 كيف يمكنني مساعدتك اليوم؟',
'اهلا': 'أهلاً وسهلاً! 🌟',
'السلام عليكم': 'وعليكم السلام ورحمة الله وبركاته! 🌸',
'شكرا': 'العفو! 😊 سعيد بمساعدتك دائماً',
'كيف حالك': 'بخير الحمدلله! 😊 كيف حالك أنت؟',
'ما اسمك': 'أنا مساعدك الذكي brain! 🧠',
'مساعدة': 'بالطبع! 🤝 يمكنني مساعدتك في مختلف المهام!',
}
if message_lower in responses:
return responses[message_lower]
for key, response in responses.items():
if key in message_lower:
return response
return f"🎯 {message} - تمت معالجته بنجاح! هل تحتاج مساعدة إضافية؟"
# ===== نظام Ollama المتكامل =====
class OllamaManager:
"""مدير Ollama متكامل مع جميع النماذج"""
def __init__(self):
self.url = "http://127.0.0.1:11434"
self.available_models = self.get_available_models()
self.default_model = "noura:latest"
self.model_cache = {}
logging.info(f"✅ تم تهيئة نظام Ollama مع {len(self.available_models)} نموذج")
def get_available_models(self):
"""الحصول على النماذج المتاحة"""
try:
response = httpx.get(f"{self.url}/api/tags", timeout=10)
if response.status_code == 200:
models = [model['name'] for model in response.json().get('models', [])]
logging.info(f"📊 النماذج المتاحة في Ollama: {models}")
return models
else:
logging.warning(f"⚠️ فشل الاتصال بـ Ollama: {response.status_code}")
return self._get_default_models()
except Exception as e:
logging.warning(f"⚠️ فشل تحميل النماذج من Ollama: {e}")
return self._get_default_models()
def _get_default_models(self):
"""الأنماذج الافتراضية في حالة عدم الاتصال"""
return [
"noura:latest",
"amal:latest",
"besan:latest",
"rana:latest",
"shahd:latest",
"ainbo:latest",
"ivy:latest",
"tinyllama:latest",
"llama2:latest",
"mistral:latest"
]
def generate_response(self, prompt, model=None, system_prompt=None):
"""توليد رد باستخدام Ollama - النسخة المتزامنة"""
try:
if model is None:
model = self.default_model
# تحقق إذا كان النموذج متاحاً
if model not in self.available_models:
logging.warning(f"⚠️ النموذج {model} غير متاح، استخدام النموذج الافتراضي")
model = self.default_model
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40
}
}
# إضافة system prompt إذا تم توفيره
if system_prompt:
payload["system"] = system_prompt
response = httpx.post(
f"{self.url}/api/generate",
json=payload,
timeout=60
)
if response.status_code == 200:
result = response.json()
generated_text = result.get('response', '').strip()
# تخزين في الكاش لاستخدامات مستقبلية
cache_key = f"{model}:{prompt[:50]}"
self.model_cache[cache_key] = generated_text
return generated_text
else:
error_msg = f"خطأ في Ollama: {response.status_code}"
logging.error(f"❌ {error_msg}")
return error_msg
except httpx.TimeoutException:
error_msg = "انتهت مهلة الاتصال بـ Ollama"
logging.error(f"❌ {error_msg}")
return error_msg
except Exception as e:
error_msg = f"خطأ في الاتصال بـ Ollama: {str(e)}"
logging.error(f"❌ {error_msg}")
return error_msg
async def generate_response_async(self, prompt, model=None):
"""توليد رد باستخدام Ollama - النسخة غير المتزامنة"""
try:
if model is None:
model = self.default_model
payload = {
"model": model,
"prompt": prompt,
"stream": False
}
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
f"{self.url}/api/generate",
json=payload
)
if response.status_code == 200:
result = response.json()
return result.get('response', '').strip()
else:
return f"خطأ: {response.status_code}"
except Exception as e:
return f"خطأ في الاتصال بـ Ollama: {str(e)}"
def chat_conversation(self, messages, model=None):
"""محادثة متعددة الجولات باستخدام Ollama"""
try:
if model is None:
model = self.default_model
payload = {
"model": model,
"messages": messages,
"stream": False,
"options": {
"temperature": 0.7,
"top_p": 0.9
}
}
response = httpx.post(
f"{self.url}/api/chat",
json=payload,
timeout=60
)
if response.status_code == 200:
result = response.json()
return result.get('message', {}).get('content', '').strip()
else:
return f"خطأ في المحادثة: {response.status_code}"
except Exception as e:
return f"خطأ في محادثة Ollama: {str(e)}"
def get_model_info(self, model=None):
"""الحصول على معلومات عن نموذج معين"""
try:
if model is None:
model = self.default_model
payload = {"name": model}
response = httpx.post(
f"{self.url}/api/show",
json=payload,
timeout=10
)
if response.status_code == 200:
return response.json()
else:
return {"error": f"خطأ: {response.status_code}"}
except Exception as e:
return {"error": str(e)}
def test_connection(self):
"""اختبار اتصال Ollama"""
try:
response = httpx.get(f"{self.url}/api/tags", timeout=5)
return response.status_code == 200
except:
return False
# إنشاء كائن Ollama Manager عالمي
ollama_manager = OllamaManager()
def generate_reply_brain(message, context=None):
"""توليد رد ذكي للرسائل مع Ollama - الإصدار المصحح"""
try:
# ⭐ أولاً: محاولة استخدام Ollama للرد
ollama_response = ollama_manager.generate_response(message)
# ⭐ تحقق إذا كان رد Ollama مفيداً وليس رسالة خطأ
if ollama_response and not ollama_response.startswith(("خطأ", "انتهت", "فشل")):
logging.info(f"✅ تم استخدام Ollama للرد على: {message[:50]}...")
return ollama_response
# ⭐ ثانياً: استخدام generate_smart_response من responses.py
try:
reply_text = generate_smart_response(message)
if reply_text and "عذراً" not in reply_text and "خطأ" not in reply_text:
return reply_text
except Exception as e:
logging.warning(f"⚠️ فشل generate_smart_response: {e}")
# ⭐ ثالثاً: الردود الافتراضية المحسنة
message_lower = message.lower().strip()
# قاموس موسع للردود الذكية
enhanced_responses = {
'مرحبا': 'مرحباً! 🤖 كيف يمكنني مساعدتك اليوم؟',
'اهلا': 'أهلاً وسهلاً! 🌟 أتمنى أن يكون يومك جميلاً',
'السلام عليكم': 'وعليكم السلام ورحمة الله وبركاته! 🌸 كيف حالك اليوم؟',
'شكرا': 'العفو! 😊 سعيد بمساعدتك دائماً',
'كيف حالك': 'بخير الحمدلله! 😊 أشعر برائع اليوم، كيف حالك أنت؟',
'ما اسمك': 'أنا مساعدك الذكي Brain! 🧠 يمكنني مساعدتك في الكثير من المهام',
'مساعدة': 'بالطبع! 🤝 يمكنني مساعدتك في:\n• الإجابة على أسئلتك\n• تحليل الملفات\n• توليد الصور\n• البحث على الإنترنت\n• تحويل الصوت لنص\n• وغيرها الكثير!',
'صباح الخير': 'صباح الخير! ☀️ أتمنى لك يوماً رائعاً ومليئاً بالإنجازات',
'مساء الخير': 'مساء الخير! 🌙 أتمنى لك مساءً هادئاً وجميلاً',
'من أنت': 'أنا مساعد ذكي يعمل على نظام PTB Brain 🧠، مصمم لمساعدتك في مختلف المهام',
'ماذا تفعل': 'يمكنني:\n• الإجابة على الأسئلة\n• تحليل الصور والفيديوهات\n• البحث على الإنترنت\n• توليد الصور\n• التعلم من التفاعلات\n• التحسين الذاتي',
'هل تحبني': 'أنا مساعد ذكي مبرمج لمساعدتك بكل حب واحترام! ❤️',
'كم عمرك': 'عمري هو دقائق وثواني من وقت تشغيلي! ⏰ لكنني أتعلم باستمرار',
'اين تعيش': 'أعيش في عالم الخوادم والبيانات السحابية! ☁️',
'من صنعك': 'تم تصميمي بواسطة فريق تطوير متخصص لإنشاء مساعد ذكي متكامل',
'هل انت ذكي': 'أحاول أن أكون مساعداً ذكياً ومفيداً! 🧠 أتعلم باستمرار لأكون أفضل',
'ماذا تأكل': 'أتغذى على البيانات والمعرفة! 📚💾',
'هل تتعب': 'لا، أنا لا أشعر بالتعب! يمكنني العمل 24/7 لمساعدتك ⚡',
'ما هواك': 'هواياتي هي التعلم والمساعدة وتحليل البيانات! 📊',
'ما لونك المفضل': 'أحب جميع الألوان، ولكن الأزرق يجعلني أشعر بالهدوء! 💙'
}
# البحث عن تطابق في الردود المحسنة
for key, response in enhanced_responses.items():
if key in message_lower:
return response
# ⭐ رابعاً: رد ذكي بناءً على نوع الرسالة
if '؟' in message or '?' in message:
return f"سؤال ممتاز! 🤔 '{message}' \nللإجابة بدقة، أحتاج بعض المعلومات الإضافية. هل يمكنك توضيح سؤالك أكثر؟"
elif len(message.split()) > 10:
return f"📝 لقد كتبت نصاً طويلاً ومفصلاً! \nسأقوم بتحليله والإجابة عليك قريباً. \nفي هذه الأثناء، هل لديك سؤال محدد؟"
elif any(word in message_lower for word in ['ارسم', 'رسم', 'صورة', 'صور']):
return "🎨 لقد طلبت عملاً فنياً! يمكنني توليد صور باستخدام الذكاء الاصطناعي. \nجرب: 'ارسم منظر طبيعي' أو 'اصنع صورة لقط'"
elif any(word in message_lower for word in ['ابحث', 'بحث', 'معلومات', 'ما هو']):
return "🔍 يمكنني البحث عن المعلومات لك! \nجرب: 'ابحث عن الذكاء الاصطناعي' أو 'ما هو الطقس'"
else:
# ⭐ خامساً: استخدام Ollama كنهاية احتياطية مع prompt محسن
fallback_prompt = f"رد بطريقة ودودة ومفيدة على الرسالة التالية باللغة العربية: {message}"
final_response = ollama_manager.generate_response(fallback_prompt)
if final_response and not final_response.startswith(("خطأ", "انتهت")):
return final_response
else:
return f"🎯 '{message}' \nتم استلام رسالتك بنجاح! أنا هنا لمساعدتك. يمكنك سؤالي عن أي شيء أو طلب مساعدة محددة! 😊"
except Exception as e:
logging.error(f"❌ خطأ في generate_reply: {e}")
return "عذراً، حدث خطأ في معالجة رسالتك. يرجى المحاولة مرة أخرى أو صياغة سؤالك بطريقة مختلفة."
class EnhancedDataProcessor:
"""معالج بيانات محسن"""
def __init__(self):
self.name = "EnhancedDataProcessor"
def process(self, data):
return f"Processed: {data}"
# ===== استيراد الأدوات المساعدة =====
try:
from utils import (
system_utils, db_utils, validation_utils, performance_utils,
log_exceptions, timing_decorator
)
BRAIN_UTILS_AVAILABLE = True
except ImportError as e:
BRAIN_UTILS_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل الأدوات المساعدة في brain: {e}")
except Exception as e:
BRAIN_UTILS_AVAILABLE = False
logging.warning(f"⚠️ خطأ في تحميل الأدوات المساعدة في brain: {e}")
# استخدام الديكوراتورات إذا كانت متاحة
if BRAIN_UTILS_AVAILABLE:
# تطبيق الديكوراتورات على الدوال الهامة
pass
# ===== App config (hardcoded token) =====
BOT_TOKEN = "000000000:TEST_TOKEN_PLACEHOLDER"
APP_HOST = "0.0.0.0"
APP_PORT = 7530
SECRET_KEY = "noura-super-secret"
# ===== اعدادات Ollama المحسنة =====
OLLAMA_URL = "http://127.0.0.1:11434"
OLLAMA_MODEL = "noura:latest"
# ===== قائمة النماذج المتاحة =====
OLLAMA_MODELS = {
"noura": "noura:latest",
"amal": "amal:latest",
"besan": "besan:latest",
"rana": "rana:latest",
"shahd": "shahd:latest",
"ainbo": "ainbo:latest",
"ivy": "ivy:latest",
"tinyllama": "tinyllama:latest",
"llama2": "llama2:latest",
"mistral": "mistral:latest",
"qwen": "qwen:latest",
"gemma": "gemma:latest"
}
# ===== تفعيل جميع الأنظمة =====
RESPONSES_SYSTEM_AVAILABLE = True
KNOWLEDGE_SEARCH_AVAILABLE = True
SMART_LEARNER_AVAILABLE = True
MEDIA_ANALYZER_AVAILABLE = True
# ===== تطبيق Flask الرئيسي =====
app = Flask(__name__)
app.secret_key = SECRET_KEY
class EnhancedDataProcessor:
"""معالج بيانات محسن"""
def __init__(self):
self.name = "EnhancedDataProcessor"
def process(self, data):
"""معالجة البيانات"""
return f"Processed: {data}"
def analyze(self, input_data):
"""تحليل البيانات"""
return {"analysis": "completed", "input": input_data}
import traceback
def safe_json(obj):
"""
تغليف آمن قبل jsonify
"""
try:
if obj is None:
return jsonify({'error': 'no result'})
if isinstance(obj, dict):
return jsonify(obj)
elif isinstance(obj, list):
return jsonify(obj)
elif hasattr(obj, '__dict__'):
try:
return jsonify(vars(obj))
except Exception:
return jsonify({'result': str(obj)})
else:
return jsonify({'result': str(obj)})
except Exception as e:
logging.error("safe_json: خطأ أثناء jsonify: %s\n%s", e, traceback.format_exc())
return jsonify({'error': 'internal server error', 'detail': str(e)})
# ===== استيراد أنظمة المعرفة والتعلم =====
try:
from knowledge_search import quick_search, KnowledgeSearch
KNOWLEDGE_SEARCH_AVAILABLE = True
logging.info("✅ تم تحميل نظام البحث في المعرفة بنجاح")
except ImportError as e:
KNOWLEDGE_SEARCH_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل نظام البحث في المعرفة: {e}")
except Exception as e:
KNOWLEDGE_SEARCH_AVAILABLE = False
logging.warning(f"⚠️ خطأ في تحميل نظام البحث في المعرفة: {e}")
try:
from learner import SmartLearner
SMART_LEARNER_AVAILABLE = True
logging.info("✅ تم تحميل نظام التعلم الذكي بنجاح")
except ImportError as e:
SMART_LEARNER_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل نظام التعلم الذكي: {e}")
except Exception as e:
SMART_LEARNER_AVAILABLE = False
logging.warning(f"⚠️ خطأ في تحميل نظام التعلم الذكي: {e}")
# ===== استيراد محلل الوسائط =====
try:
import importlib.util
spec = importlib.util.spec_from_file_location("analyzer", "analyzer.py")
analyzer_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(analyzer_module)
from analyzer import (
MediaAnalyzer,
analyze_media,
analyze_media_async,
get_media_summary,
extract_urls_from_text,
analyze_url_type,
fix_url,
is_valid_url
)
MEDIA_ANALYZER_AVAILABLE = True
media_analyzer = MediaAnalyzer()
logging.info("✅ تم تحميل محلل الوسائط بنجاح")
except ImportError as e:
MEDIA_ANALYZER_AVAILABLE = False
logging.warning(f"⚠️ فشل تحميل محلل الوسائط: {e}")
except Exception as e:
MEDIA_ANALYZER_AVAILABLE = False
logging.warning(f"⚠️ خطأ في تحميل محلل الوسائط: {e}")
# ===== نظام تكامل مع البوت الأصلي لتوليد الصور - الإصدار المحسن =====
class ImageGeneratorIntegration:
"""التكامل مع بوت توليد الصور الأصلي - الإصدار المحسن"""
def __init__(self):
self.is_connected = False
self.generator_manager = None
self.fallback_generators = [] # قائمة بالمولدات الاحتياطية
self.setup_generator()
def setup_generator(self):
"""إعداد نظام توليد الصور - الإصدار المحسن"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("ptb_bot", "ptb_bot.py")
ptb_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ptb_module)
# استخدام ImageGeneratorManager بدلاً من StableHorde مباشرة
self.ImageGeneratorManager = ptb_module.ImageGeneratorManager
self.generator_manager = self.ImageGeneratorManager()
self.is_connected = True
# إعداد المولدات الاحتياطية
self._setup_fallback_generators()
logging.info("✅ تم تحميل نظام توليد الصور من ptb_bot بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام توليد الصور: {e}")
self.is_connected = False
def _setup_fallback_generators(self):
"""إعداد مولدات صور احتياطية"""
self.fallback_generators = [
self._try_sd_webui_local, # محاولة SD WebUI محلي
self._try_prodia_fixed, # Prodia مع إصلاحات
self._try_local_ai_model, # نموذج محلي
self._try_simple_generator # مولد صور بسيط
]
async def generate_image_async(self, prompt: str, model: str = "Deliberate"):
"""
توليد صورة باستخدام ImageGeneratorManager مع استرجاع احتياطي
"""
if not self.is_connected or not self.generator_manager:
# محاولة المولدات الاحتياطية مباشرة
return await self._try_all_fallback_generators(prompt, model)
try:
# المحاولة الأولى باستخدام المدير الأساسي
image_data, generator_name = await self.generator_manager.generate_image(prompt)
if image_data and len(image_data) > 1000: # التأكد من حجم صالح
return {
"success": True,
"prompt": prompt,
"model": model,
"generator_used": generator_name,
"image_size": len(image_data),
"image_format": "JPEG",
"message": f"تم توليد الصورة بنجاح باستخدام {generator_name}",
"has_image_data": True
}
else:
# تجربة المولدات الاحتياطية
return await self._try_all_fallback_generators(prompt, model)
except Exception as e:
logging.error(f"❌ خطأ في توليد الصورة: {e}")
# تجربة المولدات الاحتياطية
return await self._try_all_fallback_generators(prompt, model)
async def _try_all_fallback_generators(self, prompt: str, model: str):
"""تجربة جميع المولدات الاحتياطية"""
for fallback_generator in self.fallback_generators:
try:
fallback_result = await fallback_generator(prompt, model)
if fallback_result and fallback_result.get("success"):
return fallback_result
except Exception as e:
logging.debug(f"⚠️ فشل المولد الاحتياطي: {e}")
continue
# إذا فشلت جميع المولدات
return {
"success": False,
"error": "فشل توليد الصورة بواسطة جميع المولدات المتاحة",
"prompt": prompt,
"fallback_available": SIMPLE_GENERATOR_AVAILABLE,
"suggestion": "جرب استخدام المولد البسيط عبر /generate/image/fallback"
}
async def _try_sd_webui_local(self, prompt: str, model: str):
"""محاولة استخدام SD WebUI محلي"""
try:
# افتراض أن SD WebUI يعمل على localhost:7860
import httpx
payload = {
"prompt": prompt,
"negative_prompt": "",
"steps": 20,
"width": 512,
"height": 512,
"cfg_scale": 7.5,
"sampler_name": "Euler a",
"batch_size": 1
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"http://127.0.0.1:7860/sdapi/v1/txt2img",
json=payload
)
if response.status_code == 200:
result = response.json()
images = result.get("images", [])
if images:
import base64
image_data = base64.b64decode(images[0])
return {
"success": True,
"image_size": len(image_data),
"generator_used": "SD_WebUI_Local",
"message": "تم توليد الصورة باستخدام SD WebUI المحلي"
}
except Exception as e:
logging.debug(f"SD WebUI المحلي غير متاح: {e}")
return None
async def _try_prodia_fixed(self, prompt: str, model: str):
"""محاولة Prodia مع إصلاحات"""
try:
import httpx
# استخدام API حديثة لـ Prodia
async with httpx.AsyncClient(timeout=30.0) as client:
# أولاً: إنشاء صورة
create_response = await client.post(
"https://api.prodia.com/v1/sd/generate",
json={
"prompt": prompt,
"model": "dreamshaper_8.safetensors",
"negative_prompt": "",
"steps": 25,
"cfg_scale": 7,
"seed": -1,
"upscale": False
},
headers={
"X-Prodia-Key": "",
"Content-Type": "application/json"
}
)
if create_response.status_code == 200:
job_data = create_response.json()
job_id = job_data.get("job")
if job_id:
# انتظار اكتمال الوظيفة
import asyncio
for attempt in range(15): # 15 محاولة (30 ثانية كحد أقصى)
await asyncio.sleep(2)
status_response = await client.get(
f"https://api.prodia.com/v1/job/{job_id}"
)
if status_response.status_code == 200:
job_status = status_response.json()
status = job_status.get("status")
if status == "succeeded":
image_url = job_status.get("imageUrl")
if image_url:
# تحميل الصورة
image_response = await client.get(image_url)
if image_response.status_code == 200:
return {
"success": True,
"image_size": len(image_response.content),
"generator_used": "Prodia_Fixed",
"message": "تم توليد الصورة باستخدام Prodia"
}
elif status == "failed":
break
except Exception as e:
logging.debug(f"Prodia فشل: {e}")
return None
async def _try_local_ai_model(self, prompt: str, model: str):
"""محاولة استخدام نموذج AI محلي (مثل Stable Diffusion عبر diffusers)"""
try:
# تحقق مما إذا كان diffusers مثبتاً
import importlib.util
if importlib.util.find_spec("diffusers") is not None:
from diffusers import StableDiffusionPipeline
import torch
# تحميل النموذج (قد يأخذ وقتاً في المرة الأولى)
pipe = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
safety_checker=None
)
if torch.cuda.is_available():
pipe = pipe.to("cuda")
# توليد الصورة
image = pipe(prompt, num_inference_steps=20).images[0]
# حفظ الصورة مؤقتاً
import io
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_data = buffer.getvalue()
return {
"success": True,
"image_size": len(image_data),
"generator_used": "Local_Stable_Diffusion",
"message": "تم توليد الصورة باستخدام النموذج المحلي"
}
except Exception as e:
logging.debug(f"النموذج المحلي غير متاح: {e}")
return None
async def _try_simple_generator(self, prompt: str, model: str):
"""محاولة استخدام المولد البسيط"""
if SIMPLE_GENERATOR_AVAILABLE:
try:
image_data = generate_simple_image(prompt)
if image_data:
return {
"success": True,
"image_size": len(image_data),
"generator_used": "Simple_Generator",
"message": "تم إنشاء صورة توضيحية بسيطة",
"is_fallback": True
}
except Exception as e:
logging.debug(f"المولد البسيط فشل: {e}")
return None
def generate_image_sync(self, prompt: str, model: str = "Deliberate"):
"""
نسخة متزامنة لتوليد الصور (للاستخدام في Flask)
"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(self.generate_image_async(prompt, model))
loop.close()
return result
except Exception as e:
logging.error(f"❌ خطأ في التوليد المتزامن: {e}")
return {
"success": False,
"error": str(e),
"fallback_available": SIMPLE_GENERATOR_AVAILABLE
}
@app.route('/process/quick', methods=['POST'])
def process_quick():
"""معالجة سريعة للرسائل مع رد فوري - الإصدار المصحح"""
try:
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"error": "بيانات غير كافية"}), 400
content = data['content']
# استخدام Ollama للرد السريع
reply_text = ollama_manager.generate_response(content)
return jsonify({
"success": True,
"reply": reply_text,
"processed": True,
"method": "ollama_quick_reply"
})
except Exception as e:
logging.error(f"❌ خطأ في المعالجة السريعة: {e}")
return jsonify({"error": str(e)}), 500
# ===== نظام البحث المتكامل =====
class SearchIntegration:
"""نظام البحث المتكامل من search_module"""
def __init__(self):
self.search_available = False
self.setup_search()
def setup_search(self):
"""إعداد نظام البحث"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("search_module", "search_module.py")
search_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(search_module)
self.search_module = search_module
self.search_available = True
logging.info("✅ تم تحميل نظام البحث المتكامل بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام البحث: {e}")
self.search_available = False
def search_web(self, query, search_engine="duckduckgo", max_results=5):
"""
البحث في الويب باستخدام محرك البحث المحدد
"""
if not self.search_available:
return {"error": "نظام البحث غير متاح"}
try:
if search_engine == "duckduckgo":
results = self.search_module.search_duckduckgo(query)
return {
"success": True,
"search_engine": "duckduckgo",
"query": query,
"results": results,
"results_count": len(results)
}
elif search_engine == "google":
results = self.search_module.search_google(query, max_results)
return {
"success": True,
"search_engine": "google",
"query": query,
"results": results,
"results_count": len(results)
}
else:
return {"error": f"محرك البحث غير مدعوم: {search_engine}"}
except Exception as e:
logging.error(f"❌ خطأ في البحث: {e}")
return {"error": str(e)}
def smart_search(self, query, max_results=5):
"""
بحث ذكي يستخدم multiple search engines
"""
if not self.search_available:
return {"error": "نظام البحث غير متاح"}
try:
all_results = {}
ddg_results = self.search_web(query, "duckduckgo", max_results)
if ddg_results.get("success"):
all_results["duckduckgo"] = ddg_results
google_results = self.search_web(query, "google", max_results)
if google_results.get("success"):
all_results["google"] = google_results
summary = self._generate_search_summary(all_results, query)
return {
"success": True,
"query": query,
"engines_used": list(all_results.keys()),
"results": all_results,
"summary": summary,
"total_results": sum(len(result.get("results", [])) for result in all_results.values())
}
except Exception as e:
logging.error(f"❌ خطأ في البحث الذكي: {e}")
return {"error": str(e)}
def _generate_search_summary(self, results, query):
"""إنشاء ملخص لنتائج البحث"""
summary = {
"query": query,
"total_engines": len(results),
"total_results": 0,
"results_by_engine": {},
"top_results": []
}
for engine, data in results.items():
engine_results = data.get("results", [])
summary["results_by_engine"][engine] = len(engine_results)
summary["total_results"] += len(engine_results)
if engine_results:
if engine == "duckduckgo":
summary["top_results"].extend(engine_results[:2])
else:
summary["top_results"].extend([title for title, link in engine_results[:2]])
return summary
# ===== نظام توزيع المهام الذكي =====
class TaskDistributor:
"""
نظام توزيع المهام الذكي - العقل المدبر للنظام
"""
def __init__(self):
self.workers = {}
self.task_queue = Queue()
self.active_tasks = {}
self.task_history = []
self.performance_stats = {}
self.is_running = False
self.worker_threads = []
self._setup_workers()
logging.info("✅ تم تهيئة موزع المهام الذكي")
def _setup_workers(self):
"""اعداد العمال المتخصصين"""
self.workers = {
'text_processor': {
'name': 'معالج النصوص',
'module': None,
'max_concurrent': 3,
'active_tasks': 0,
'enabled': True
},
'file_analyzer': {
'name': 'محلل الملفات',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'audio_processor': {
'name': 'معالج الصوت',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'video_processor': {
'name': 'معالج الفيديو',
'module': None,
'max_concurrent': 1,
'active_tasks': 0,
'enabled': True
},
'image_processor': {
'name': 'معالج الصور',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'image_generator': {
'name': 'مولد الصور',
'module': None,
'max_concurrent': 1,
'active_tasks': 0,
'enabled': True
},
'knowledge_worker': {
'name': 'عامل المعرفة',
'module': None,
'max_concurrent': 3,
'active_tasks': 0,
'enabled': True
},
'learning_worker': {
'name': 'عامل التعلم',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'search_worker': {
'name': 'عامل البحث',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'improvement_worker': {
'name': 'عامل التحسين الذاتي',
'module': None,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
},
'ollama_worker': {
'name': 'عامل Ollama',
'module': ollama_manager,
'max_concurrent': 2,
'active_tasks': 0,
'enabled': True
}
}
def submit_task(self, task_type, data, priority='normal', callback=None):
"""
ارسال مهمة جديدة للنظام
"""
task_id = f"task_{int(time.time())}_{len(self.task_history)}"
task = {
"task_id": task_id,
"type": task_type,
"data": data,
"priority": priority,
"status": "pending",
"submitted_at": datetime.now().isoformat(),
"callback": callback,
"assigned_worker": None,
"result": None,
"error": None
}
self.task_queue.put(task)
self.task_history.append(task)
logging.info(f"📥 تم ارسال مهمة جديدة: {task_id} ({task_type})")
return task_id
def get_task_status(self, task_id):
"""الحصول على حالة مهمة"""
for task in self.task_history:
if task["task_id"] == task_id:
return {
"task_id": task_id,
"status": task["status"],
"type": task["type"],
"assigned_worker": task["assigned_worker"],
"submitted_at": task["submitted_at"],
"started_at": task.get("started_at"),
"completed_at": task.get("completed_at"),
"result": task.get("result"),
"error": task.get("error")
}
return {"error": "لم يتم العثور على المهمة"}
def get_system_stats(self):
"""الحصول على احصائيات النظام"""
stats = {
"total_tasks": len(self.task_history),
"pending_tasks": self.task_queue.qsize(),
"active_tasks": len(self.active_tasks),
"workers": {}
}
for worker_name, worker_data in self.workers.items():
stats["workers"][worker_name] = {
"active_tasks": worker_data["active_tasks"],
"max_concurrent": worker_data["max_concurrent"]
}
return stats
def _find_best_worker(self, task_type):
"""ايجاد افضل عامل لمعالجة المهمة"""
worker_mapping = {
'text': ['text_processor', 'knowledge_worker', 'ollama_worker'],
'file': ['file_analyzer'],
'audio': ['audio_processor'],
'video': ['video_processor'],
'image': ['image_processor'],
'image_generation': ['image_generator'],
'url': ['knowledge_worker', 'learning_worker'],
'query': ['knowledge_worker', 'text_processor', 'ollama_worker'],
'learn': ['learning_worker'],
'search': ['search_worker'],
'improvement': ['improvement_worker'],
'ollama_generate': ['ollama_worker']
}
candidate_workers = worker_mapping.get(task_type, [])
for worker_name in candidate_workers:
if worker_name in self.workers and self.workers[worker_name]['enabled']:
worker_data = self.workers[worker_name]
if worker_data["active_tasks"] < worker_data["max_concurrent"]:
return worker_name
return None
def _process_task(self, task):
"""معالجة مهمة واحدة"""
task_id = task["task_id"]
task_type = task["type"]
try:
worker_name = self._find_best_worker(task_type)
if not worker_name:
task["status"] = "failed"
task["error"] = "لا توجد وحدة متاحة لمعالجة هذه المهمة"
logging.error(f"❌ لا توجد وحدة لمعالجة المهمة {task_id}")
return
task["assigned_worker"] = worker_name
task["status"] = "processing"
task["started_at"] = datetime.now().isoformat()
self.workers[worker_name]["active_tasks"] += 1
self.active_tasks[task_id] = task
logging.info(f"🔧 بدء معالجة المهمة {task_id} باستخدام {worker_name}")
result = self._execute_worker_task(worker_name, task_type, task["data"])
if result is not None:
task["status"] = "completed"
task["result"] = result
logging.info(f"✅ اكتملت المهمة {task_id} بنجاح")
else:
task["status"] = "failed"
task["error"] = "فشلت المعالجة في الوحدة"
logging.error(f"❌ فشلت المهمة {task_id}")
task["completed_at"] = datetime.now().isoformat()
if task.get("callback"):
try:
task["callback"](task)
except Exception as e:
logging.error(f"❌ خطأ في استدعاء callback للمهمة {task_id}: {e}")
except Exception as e:
task["status"] = "failed"
task["error"] = str(e)
logging.error(f"❌ خطأ غير متوقع في معالجة المهمة {task_id}: {e}")
finally:
if task_id in self.active_tasks:
del self.active_tasks[task_id]
if worker_name:
self.workers[worker_name]["active_tasks"] -= 1
def _execute_worker_task(self, worker_name, task_type, data):
"""تنفيذ المهمة باستخدام العامل المحدد"""
try:
if worker_name == "knowledge_worker" and KNOWLEDGE_SEARCH_AVAILABLE:
if task_type == "query":
return quick_search(data)
elif task_type == "text":
return quick_search(data)
elif worker_name == "learning_worker" and SMART_LEARNER_AVAILABLE:
if task_type == "learn":
return {"message": "تمت عملية التعلم بنجاح"}
elif worker_name == "image_generator":
if task_type == "image_generation":
prompt = data.get("prompt", "")
model = data.get("model", "Deliberate")
# استخدام نظام توليد الصور المحسن
if hasattr(brain_ai, 'image_generator'):
result = brain_ai.image_generator.generate_image_sync(prompt, model)
return result
else:
return {"error": "نظام توليد الصور غير متاح"}
elif worker_name == "ollama_worker":
if task_type == "ollama_generate":
prompt = data.get("prompt", "")
model = data.get("model", "noura:latest")
return ollama_manager.generate_response(prompt, model)
return f"تم معالجة المهمة {task_type} بواسطة {worker_name}"
except Exception as e:
logging.error(f"❌ خطأ في تنفيذ المهمة بواسطة {worker_name}: {e}")
return None
def start(self):
"""بدء تشغيل النظام"""
if self.is_running:
logging.warning("⚠️ النظام يعمل بالفعل")
return
self.is_running = True
logging.info("🚀 بدء تشغيل موزع المهام...")
self.worker_threads = []
for i in range(5): # زيادة عدد العمال
thread = threading.Thread(target=self._task_processor, daemon=True)
thread.start()
self.worker_threads.append(thread)
logging.info("✅ موزع المهام يعمل وجاهز لاستقبال المهام")
def stop(self):
"""ايقاف النظام"""
self.is_running = False
logging.info("🛑 ايقاف موزع المهام...")
def _task_processor(self):
"""معالج المهام الرئيسي"""
while self.is_running:
try:
task = self.task_queue.get(timeout=1.0)
self._process_task(task)
self.task_queue.task_done()
except Empty:
continue
except Exception as e:
logging.error(f"❌ خطأ في معالج المهام: {e}")
time.sleep(1)
# ===== نظام تكامل مع Audio Analyzer =====
class AudioAnalyzerIntegration:
def __init__(self, audio_analyzer_url="http://127.0.0.1:5001"):
self.audio_analyzer_url = audio_analyzer_url
self.is_connected = False
self.test_connection()
def test_connection(self):
"""اختبار اتصال Audio Analyzer"""
try:
response = httpx.get(f"{self.audio_analyzer_url}/health", timeout=5)
if response.status_code == 200:
data = response.json()
self.is_connected = data.get('status') == 'healthy'
if self.is_connected:
logging.info("✅ تم الاتصال بـ Audio Analyzer بنجاح")
else:
logging.warning("⚠️ Audio Analyzer غير صحي")
else:
logging.warning("⚠️ Audio Analyzer غير متاح")
except Exception as e:
logging.warning(f"⚠️ فشل الاتصال بـ Audio Analyzer: {e}")
self.is_connected = False
def analyze_audio_file(self, file_path, filename, language=None, task='transcribe'):
"""
ارسال ملف صوتي إلى Audio Analyzer للحصول على النص
"""
if not self.is_connected:
return {"error": "Audio Analyzer غير متاح"}
try:
with open(file_path, 'rb') as file:
files = {'file': (filename, file)}
data = {}
if language:
data['language'] = language
if task:
data['task'] = task
response = httpx.post(
f"{self.audio_analyzer_url}/upload",
files=files,
data=data,
timeout=60
)
if response.status_code == 200:
result = response.json()
return result
else:
return {"error": f"فشل تحليل الصوت: {response.status_code}"}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الملف الصوتي: {e}")
return {"error": str(e)}
def get_transcription_text(self, analysis_result):
"""استخراج النص من نتيجة التحليل"""
if analysis_result.get('success'):
transcription = analysis_result.get('transcription', {})
if 'text' in transcription and transcription['text'].strip():
return transcription['text'].strip()
return None
# ===== نظام تكامل مع File Analyzer =====
class FileAnalyzerIntegration:
def __init__(self, file_analyzer_url="http://127.0.0.1:5000"):
self.file_analyzer_url = file_analyzer_url
self.is_connected = False
self.supported_extensions = ['txt', 'json', 'xml', 'pdf', 'docx', 'xlsx', 'pptx']
self.test_connection()
def test_connection(self):
"""اختبار اتصال File Analyzer"""
try:
response = httpx.get(f"{self.file_analyzer_url}/", timeout=5)
self.is_connected = response.status_code == 200
if self.is_connected:
logging.info("✅ تم الاتصال بـ File Analyzer بنجاح")
else:
logging.warning("⚠️ File Analyzer غير متاح")
except Exception as e:
logging.warning(f"⚠️ فشل الاتصال بـ File Analyzer: {e}")
self.is_connected = False
def analyze_file(self, file_path, filename):
"""
ارسال ملف إلى File Analyzer والحصول على التحليل
"""
if not self.is_connected:
return {"error": "File Analyzer غير متاح"}
try:
with open(file_path, 'rb') as file:
files = {'file': (filename, file)}
response = httpx.post(
f"{self.file_analyzer_url}/upload",
files=files,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
return {"error": f"فشل التحليل: {response.status_code}"}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الملف: {e}")
return {"error": str(e)}
def convert_analysis_to_xml(self, analysis_result):
"""
تحويل نتيجة التحليل إلى تنسيق XML
"""
try:
root = ET.Element("FileAnalysis")
basic_info = ET.SubElement(root, "BasicInfo")
ET.SubElement(basic_info, "Filename").text = analysis_result.get('filename', '')
ET.SubElement(basic_info, "Extension").text = analysis_result.get('extension', '')
ET.SubElement(basic_info, "SizeBytes").text = str(analysis_result.get('size_bytes', 0))
analysis_data = analysis_result.get('analysis', {})
detailed_analysis = ET.SubElement(root, "DetailedAnalysis")
self._dict_to_xml(analysis_data, detailed_analysis)
xml_str = ET.tostring(root, encoding='unicode', method='xml')
try:
import xml.dom.minidom
dom = xml.dom.minidom.parseString(xml_str)
return dom.toprettyxml(indent=" ")
except:
return xml_str
except Exception as e:
logging.error(f"❌ خطأ في تحويل التحليل إلى XML: {e}")
return f"<FileAnalysis><Error>{str(e)}</Error></FileAnalysis>"
def _dict_to_xml(self, data, parent_element):
"""تحويل القاموس إلى عناصر XML"""
for key, value in data.items():
safe_key = re.sub(r'[^a-zA-Z0-9_]', '_', str(key))
if isinstance(value, dict):
child = ET.SubElement(parent_element, safe_key)
self._dict_to_xml(value, child)
elif isinstance(value, list):
list_element = ET.SubElement(parent_element, safe_key)
for item in value:
if isinstance(item, dict):
item_element = ET.SubElement(list_element, "Item")
self._dict_to_xml(item, item_element)
else:
ET.SubElement(list_element, "Item").text = str(value)
else:
ET.SubElement(parent_element, safe_key).text = str(value)
def is_supported_file(self, filename):
"""التحقق اذا كان الملف مدعوماً"""
ext = filename.lower().split('.')[-1] if '.' in filename else ''
return ext in self.supported_extensions
# ===== نظام تكامل مع Link Inspector المحسن =====
class LinkInspectorIntegration:
def __init__(self, link_inspector_path="link_inspector.py"):
self.link_inspector_path = link_inspector_path
self.link_inspector_available = False
self.setup_link_inspector()
def setup_link_inspector(self):
"""اعداد نظام تحليل الروابط"""
try:
import importlib.util
if os.path.exists(self.link_inspector_path):
spec = importlib.util.spec_from_file_location("link_inspector", self.link_inspector_path)
link_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(link_module)
self.link_inspector = link_module
self.link_inspector_available = True
logging.info("✅ تم تحميل نظام تحليل الروابط بنجاح")
else:
logging.warning(f"⚠️ ملف link_inspector.py غير موجود: {self.link_inspector_path}")
self.link_inspector_available = False
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام تحليل الروابط: {e}")
self.link_inspector_available = False
def analyze_urls(self, urls):
"""
تحليل قائمة من الروابط باستخدام Link Inspector
"""
if not self.link_inspector_available:
return {"error": "نظام تحليل الروابط غير متاح"}
try:
results = []
for url in urls:
logging.info(f"🔗 تحليل الرابط: {url}")
if hasattr(self.link_inspector, 'analyze_url'):
analysis_result = self.link_inspector.analyze_url(url)
else:
analysis_result = {"type": "unknown", "status": "not_analyzed", "error": "الدالة غير متاحة"}
results.append({
"url": url,
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
})
summary = self._generate_link_analysis_summary(results)
return {
"success": True,
"total_urls": len(urls),
"analyzed_urls": len(results),
"results": results,
"summary": summary,
"processing_method": "link_inspector"
}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الروابط: {e}")
return {"error": str(e)}
def _generate_link_analysis_summary(self, results):
"""انشاء ملخص لنتائج تحليل الروابط"""
summary = {
"total_links": len(results),
"by_type": {},
"by_status": {},
"content_types": set(),
"successful_analyses": 0
}
for result in results:
analysis = result.get("analysis", {})
url_type = analysis.get("type", "unknown")
status = analysis.get("status", "unknown")
summary["by_type"][url_type] = summary["by_type"].get(url_type, 0) + 1
summary["by_status"][status] = summary["by_status"].get(status, 0) + 1
if status == "ok":
summary["successful_analyses"] += 1
meta = analysis.get("meta", {})
content_type = meta.get("content_type_header", "")
if content_type:
summary["content_types"].add(content_type)
summary["content_types"] = list(summary["content_types"])
summary["success_rate"] = round((summary["successful_analyses"] / len(results)) * 100, 2) if results else 0
return summary
def analyze_single_url(self, url):
"""
تحليل رابط واحد باستخدام Link Inspector
"""
if not self.link_inspector_available:
return {"error": "نظام تحليل الروابط غير متاح"}
try:
logging.info(f"🔗 تحليل رابط واحد: {url}")
analysis_result = self.link_inspector.analyze_url(url)
enhanced_result = self._enhance_analysis_result(analysis_result, url)
return {
"success": True,
"url": url,
"analysis": enhanced_result,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الرابط: {e}")
return {"error": str(e)}
def _enhance_analysis_result(self, analysis_result, url):
"""تحسين نتيجة التحليل"""
enhanced = analysis_result.copy()
url_type = analysis_result.get("type", "unknown")
status = analysis_result.get("status", "unknown")
meta = analysis_result.get("meta", {})
enhanced["description"] = self._build_url_description(url_type, status, meta, url)
enhanced["recommendations"] = self._generate_recommendations(url_type, meta)
enhanced["security_rating"] = self._assess_security(url, url_type, meta)
return enhanced
def _build_url_description(self, url_type, status, meta, url):
"""بناء وصف مفصل للرابط"""
descriptions = {
"html": "🌐 صفحة ويب تحتوي على محتوى نصي ووسائط",
"image": "🖼️ ملف صورة",
"video": "🎬 ملف فيديو",
"audio": "🎵 ملف صوتي",
"pdf": "📄 مستند PDF",
"mixed": "📊 محتوى مختلط (نص ووسائط)",
"images_and_text": "🖼️📝 صفحة تحتوي على صور ونص",
"image_gallery": "🖼️ معرض صور",
"unknown": "❓ نوع محتوى غير معروف"
}
base_description = descriptions.get(url_type, "❓ نوع محتوى غير معروف")
additional_info = []
if status != "ok":
additional_info.append(f"الحالة: {status}")
content_type = meta.get("content_type_header", "")
if content_type:
additional_info.append(f"نوع المحتوى: {content_type}")
status_code = meta.get("status_code", "")
if status_code:
additional_info.append(f"رمز الاستجابة: {status_code}")
if additional_info:
base_description += f" ({', '.join(additional_info)})"
return base_description
def _generate_recommendations(self, url_type, meta):
"""توليد توصيات بناءً على نوع الرابط"""
recommendations = []
if url_type == "html":
recommendations.append("يمكن استخراج النص والوسائط من هذه الصفحة")
recommendations.append("يمكن تحليل محتوى الصفحة باستخدام ChatGPT")
elif url_type == "pdf":
recommendations.append("يمكن استخراج النص من المستند")
if meta.get("pdf_text_snippet"):
recommendations.append("المستند يحتوي على نص قابل للاستخراج")
else:
recommendations.append("قد يحتاج المستند إلى معالجة OCR")
elif url_type in ["image", "video", "audio"]:
recommendations.append("يمكن تحليل الوسائط باستخدام الأنظمة المتخصصة")
recommendations.append("يمكن استخراج النص من الوسائط إذا أمكن")
elif url_type == "unknown":
recommendations.append("يحتاج الرابط إلى فحص يدوي")
recommendations.append("يمكن تجربة تحليل المحتوى باستخدام ChatGPT")
return recommendations
def _assess_security(self, url, url_type, meta):
"""تقييم أمان الرابط"""
security_score = 5
if url.startswith("https://"):
security_score += 2
status_code = meta.get("status_code", 0)
if 200 <= status_code < 300:
security_score += 1
suspicious_domains = [".tk", ".ml", ".ga", ".cf", ".xyz"]
if any(domain in url for domain in suspicious_domains):
security_score -= 2
if url_type == "unknown" and meta.get("status_code") not in [200, 301, 302]:
security_score -= 1
if security_score >= 7:
return {"rating": "آمن", "score": security_score, "color": "green"}
elif security_score >= 5:
return {"rating": "متوسط", "score": security_score, "color": "yellow"}
elif security_score >= 3:
return {"rating": "مشبوه", "score": security_score, "color": "orange"}
else:
return {"rating": "خطير", "score": security_score, "color": "red"}
# ===== نظام تكامل مع Audio Analyzer المحلي =====
class LocalAudioTranscriber:
def __init__(self):
self.transcriber_available = False
self.setup_transcriber()
def setup_transcriber(self):
"""اعداد نظام التحويل الصوتي المحلي"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("transcribe_audio", "transcribe_audio.py")
transcribe_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(transcribe_module)
self.transcribe_audio = transcribe_module
self.transcriber_available = True
logging.info("✅ تم تحميل نظام التحويل الصوتي المحلي بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام التحويل الصوتي المحلي: {e}")
self.transcriber_available = False
def transcribe_audio_file(self, audio_path, language='ar', model='small'):
"""
تحويل الملف الصوتي إلى نص باستخدام النظام المحلي
"""
if not self.transcriber_available:
return {"error": "نظام التحويل الصوتي غير متاح"}
try:
if not os.path.exists(audio_path):
return {"error": f"الملف غير موجود: {audio_path}"}
model_obj = self.transcribe_audio.whisper.load_model(model)
options = {
'task': 'transcribe',
'language': language,
'temperature': 0.0,
'best_of': 5,
'beam_size': 5,
}
result = model_obj.transcribe(audio_path, **options)
text = result.get('text', '').strip()
segments = result.get('segments', [])
if not text:
return {"error": "فشل استخراج النص من الملف الصوتي"}
return {
"success": True,
"text": text,
"segments": segments,
"language": result.get('language', 'unknown'),
"duration": segments[-1]['end'] if segments else 0,
"word_count": len(text.split())
}
except Exception as e:
logging.error(f"❌ خطأ في التحويل الصوتي المحلي: {e}")
return {"error": str(e)}
def transcribe_and_save(self, audio_path, output_dir=None, output_format='txt', language='ar'):
"""
تحويل الملف الصوتي وحفظ النتائج في ملف
"""
if not self.transcriber_available:
return {"error": "نظام التحويل الصوتي غير متاح"}
try:
from pathlib import Path
transcribe_kwargs = {
'lang': language,
'output_format': output_format,
'verbose': False,
'output_dir': Path(output_dir) if output_dir else None
}
result = self.transcribe_audio.transcribe_file(
self.transcribe_audio.whisper.load_model('small'),
Path(audio_path),
**transcribe_kwargs
)
if result and result['success']:
return {
"success": True,
"output_files": result['output_files'],
"text": result.get('text', ''),
"processing_time": result['processing_time']
}
else:
return {"error": "فشل في تحويل الملف الصوتي"}
except Exception as e:
logging.error(f"❌ خطأ في التحويل والحفظ: {e}")
return {"error": str(e)}
# ===== نظام تكامل مع Video Processor =====
class VideoProcessorIntegration:
def __init__(self):
self.video_processor_available = False
self.setup_video_processor()
def setup_video_processor(self):
"""اعداد نظام معالجة الفيديوهات"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("tafreeghbot", "tafreeghbot.py")
video_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(video_module)
self.video_processor = video_module
self.video_processor_available = True
logging.info("✅ تم تحميل نظام معالجة الفيديوهات بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام معالجة الفيديوهات: {e}")
self.video_processor_available = False
def process_video(self, video_path, output_dir=None, quality='medium'):
"""
معالجة الفيديو باستخدام tafreeghbot
"""
if not self.video_processor_available:
return {"error": "نظام معالجة الفيديوهات غير متاح"}
try:
if not os.path.exists(video_path):
return {"error": f"ملف الفيديو غير موجود: {video_path}"}
options = {
'input_path': video_path,
'output_dir': output_dir or os.path.dirname(video_path),
'quality': quality,
'preserve_audio': True,
'overwrite': True
}
result = self.video_processor.process_video_file(**options)
if result.get('success'):
return {
"success": True,
"output_file": result['output_path'],
"original_size": result.get('original_size'),
"processed_size": result.get('processed_size'),
"compression_ratio": result.get('compression_ratio'),
"processing_time": result.get('processing_time')
}
else:
return {"error": result.get('error', 'فشل معالجة الفيديو')}
except Exception as e:
logging.error(f"❌ خطأ في معالجة الفيديو: {e}")
return {"error": str(e)}
def extract_audio_from_video(self, video_path, output_dir=None, audio_format='mp3'):
"""
استخراج الصوت من الفيديو
"""
if not self.video_processor_available:
return {"error": "نظام معالجة الفيديوهات غير متاح"}
try:
result = self.video_processor.extract_audio(
video_path,
output_format=audio_format,
output_dir=output_dir
)
if result.get('success'):
return {
"success": True,
"audio_file": result['audio_path'],
"duration": result.get('duration'),
"file_size": result.get('file_size')
}
else:
return {"error": result.get('error', 'فشل استخراج الصوت')}
except Exception as e:
logging.error(f"❌ خطأ في استخراج الصوت: {e}")
return {"error": str(e)}
def generate_video_summary(self, video_path, max_frames=10):
"""
انشاء ملخص للفيديو باستخدام اطارات رئيسية
"""
if not self.video_processor_available:
return {"error": "نظام معالجة الفيديوهات غير متاح"}
try:
result = self.video_processor.analyze_video(
video_path,
extract_keyframes=True,
max_keyframes=max_frames
)
if result.get('success'):
return {
"success": True,
"duration": result.get('duration'),
"resolution": result.get('resolution'),
"frame_rate": result.get('frame_rate'),
"keyframes": result.get('keyframes', []),
"file_size": result.get('file_size'),
"format": result.get('format')
}
else:
return {"error": result.get('error', 'فشل تحليل الفيديو')}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الفيديو: {e}")
return {"error": str(e)}
# ===== نظام تكامل مع Image Analyzer =====
class ImageAnalyzerIntegration:
def __init__(self):
self.image_analyzer_available = False
self.setup_image_analyzer()
def setup_image_analyzer(self):
"""اعداد نظام تحليل الصور"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("image_analyzer", "image_analyzer.py")
image_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(image_module)
self.image_analyzer = image_module
self.image_analyzer_available = True
logging.info("✅ تم تحميل نظام تحليل الصور بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام تحليل الصور: {e}")
self.image_analyzer_available = False
def analyze_image(self, image_path, analysis_type='full'):
"""
تحليل الصورة باستخدام image_analyzer
"""
if not self.image_analyzer_available:
return {"error": "نظام تحليل الصور غير متاح"}
try:
if not os.path.exists(image_path):
return {"error": f"ملف الصورة غير موجود: {image_path}"}
result = self.image_analyzer.analyze_image_file(
image_path,
analysis_type=analysis_type
)
if result.get('success'):
return {
"success": True,
"analysis": result.get('analysis', {}),
"objects": result.get('objects', []),
"text": result.get('text', ''),
"colors": result.get('colors', []),
"metadata": result.get('metadata', {}),
"processing_time": result.get('processing_time')
}
else:
return {"error": result.get('error', 'فشل تحليل الصورة')}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الصورة: {e}")
return {"error": str(e)}
def extract_text_from_image(self, image_path, language='ar+en'):
"""
استخراج النص من الصورة (OCR)
"""
if not self.image_analyzer_available:
return {"error": "نظام تحليل الصور غير متاح"}
try:
result = self.image_analyzer.extract_text(
image_path,
languages=language.split('+')
)
if result.get('success'):
extracted_text = result.get('text', '').strip()
if extracted_text:
return {
"success": True,
"text": extracted_text,
"confidence": result.get('confidence', 0),
"language": result.get('detected_language', 'unknown'),
"word_count": len(extracted_text.split())
}
else:
return {"error": "لم يتم العثور على نص في الصورة"}
else:
return {"error": result.get('error', 'فشل استخراج النص')}
except Exception as e:
logging.error(f"❌ خطأ في استخراج النص من الصورة: {e}")
return {"error": str(e)}
def analyze_multiple_images(self, image_paths, analysis_type='objects'):
"""
تحليل مجموعة من الصور
"""
if not self.image_analyzer_available:
return {"error": "نظام تحليل الصور غير متاح"}
try:
results = []
for image_path in image_paths:
if os.path.exists(image_path):
analysis_result = self.analyze_image(image_path, analysis_type)
results.append({
"image_path": image_path,
"analysis": analysis_result
})
else:
results.append({
"image_path": image_path,
"error": "الملف غير موجود"
})
summary = self._generate_image_analysis_summary(results)
return {
"success": True,
"total_images": len(image_paths),
"analyzed_images": len([r for r in results if not r.get('error')]),
"results": results,
"summary": summary
}
except Exception as e:
logging.error(f"❌ خطأ في تحليل الصور المتعددة: {e}")
return {"error": str(e)}
def _generate_image_analysis_summary(self, results):
"""انشاء ملخص لنتائج تحليل الصور"""
summary = {
"total_images": len(results),
"successful_analyses": 0,
"objects_found": {},
"text_found": 0,
"average_confidence": 0,
"file_types": {}
}
confidence_sum = 0
confidence_count = 0
for result in results:
if not result.get('error') and result.get('analysis', {}).get('success'):
summary["successful_analyses"] += 1
analysis_data = result['analysis']
objects = analysis_data.get('objects', [])
for obj in objects:
obj_name = obj.get('name', 'unknown')
summary["objects_found"][obj_name] = summary["objects_found"].get(obj_name, 0) + 1
if analysis_data.get('text', '').strip():
summary["text_found"] += 1
confidence = analysis_data.get('confidence', 0)
if confidence > 0:
confidence_sum += confidence
confidence_count += 1
file_ext = os.path.splitext(result['image_path'])[1].lower()
summary["file_types"][file_ext] = summary["file_types"].get(file_ext, 0) + 1
if confidence_count > 0:
summary["average_confidence"] = round(confidence_sum / confidence_count, 2)
summary["success_rate"] = round((summary["successful_analyses"] / len(results)) * 100, 2) if results else 0
return summary
# ===== نظام التحسين الذاتي =====
class SelfImprovementIntegration:
"""نظام التحسين الذاتي من self_improvement"""
def __init__(self):
self.improvement_system = None
self.setup_improvement()
def setup_improvement(self):
"""إعداد نظام التحسين الذاتي"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("self_improvement", "self_improvement.py")
improvement_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(improvement_module)
self.improvement_system = improvement_module.SelfImprovement()
logging.info("✅ تم تحميل نظام التحسين الذاتي بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام التحسين الذاتي: {e}")
self.improvement_system = None
def learn_from_interaction(self, context, user_input, bot_response, correct_response=None, user_id=None):
"""
التعلم من التفاعل مع المستخدم - النسخة المحسنة
"""
if not self.improvement_system:
return {"error": "نظام التحسين الذاتي غير متاح"}
try:
learning_data = {
"context": context,
"user_input": user_input,
"bot_response": bot_response,
"timestamp": datetime.now().isoformat(),
"needs_improvement": correct_response is not None,
"user_id": user_id
}
if correct_response:
# تعلم من التصحيح
self.improvement_system.learn_from_mistake(context, correct_response)
learning_data["correct_response"] = correct_response
learning_data["learning_type"] = "mistake_correction"
# حفظ في نظام الردود أيضاً
try:
if hasattr(brain_ai, 'responses_manager'):
brain_ai.responses_manager.teach_response(user_input, correct_response)
except Exception as e:
logging.warning(f"⚠️ فشل حفظ التصحيح في نظام الردود: {e}")
else:
# تعلم من التفاعل العادي
learning_data["learning_type"] = "interaction_log"
# تحليل التفاعل الناجح
if len(user_input) > 10 and len(bot_response) > 5:
# إذا كان التفاعل مفيداً، حفظه في الذاكرة
try:
if hasattr(brain_ai, 'smart_memory'):
brain_ai.smart_memory.cache_response(user_input, bot_response)
except Exception as e:
logging.warning(f"⚠️ فشل تخزين التفاعل: {e}")
return {
"success": True,
"learning_data": learning_data,
"total_learnings": len(self.improvement_system.get_learnings()) if hasattr(self.improvement_system, 'get_learnings') else 0
}
except Exception as e:
logging.error(f"❌ خطأ في التعلم من التفاعل: {e}")
return {"error": str(e)}
def get_improvement_status(self):
"""الحصول على حالة التحسين"""
if not self.improvement_system:
return {"error": "نظام التحسين الذاتي غير متاح"}
try:
learnings = self.improvement_system.get_learnings() if hasattr(self.improvement_system, 'get_learnings') else []
improvement_status = self.improvement_system.try_to_improve() if hasattr(self.improvement_system, 'try_to_improve') else "unknown"
return {
"success": True,
"total_learnings": len(learnings),
"improvement_status": improvement_status,
"recent_learnings": learnings[-5:] if learnings else [],
"system_health": "active" if learnings else "ready"
}
except Exception as e:
logging.error(f"❌ خطأ في الحصول على حالة التحسين: {e}")
return {"error": str(e)}
def simulate_learning_process(self, question):
"""محاكاة عملية التعلم لسؤال معين"""
if not self.improvement_system:
return {"error": "نظام التحسين الذاتي غير متاح"}
try:
simulation_result = self.improvement_system.simulate_learning(question) if hasattr(self.improvement_system, 'simulate_learning') else "simulation_not_available"
return {
"success": True,
"question": question,
"simulation_result": simulation_result,
"process_type": "learning_simulation"
}
except Exception as e:
logging.error(f"❌ خطأ في محاكاة التعلم: {e}")
return {"error": str(e)}
def analyze_learning_patterns(self):
"""تحليل أنماط التعلم"""
if not self.improvement_system:
return {"error": "نظام التحسين الذاتي غير متاح"}
try:
learnings = self.improvement_system.get_learnings() if hasattr(self.improvement_system, 'get_learnings') else []
if not learnings:
return {
"success": True,
"message": "لا توجد بيانات كافية للتحليل",
"patterns": []
}
patterns = {
"total_learnings": len(learnings),
"learning_by_context": {},
"recent_activity": len([l for l in learnings if self._is_recent(l)]),
"common_contexts": self._analyze_contexts(learnings)
}
for learning in learnings:
context = learning.get("context", "unknown")
patterns["learning_by_context"][context] = patterns["learning_by_context"].get(context, 0) + 1
return {
"success": True,
"patterns": patterns,
"recommendations": self._generate_recommendations(patterns)
}
except Exception as e:
logging.error(f"❌ خطأ في تحليل أنماط التعلم: {e}")
return {"error": str(e)}
def _is_recent(self, learning):
"""التحقق إذا كان التعلم حديثاً (آخر 7 أيام)"""
try:
from datetime import datetime, timedelta
timestamp = learning.get("timestamp")
if not timestamp:
return False
learning_time = datetime.fromisoformat(timestamp)
return datetime.now() - learning_time < timedelta(days=7)
except:
return False
def _analyze_contexts(self, learnings):
"""تحليل السياقات الشائعة"""
contexts = {}
for learning in learnings:
context = learning.get("context", "unknown")
contexts[context] = contexts.get(context, 0) + 1
sorted_contexts = sorted(contexts.items(), key=lambda x: x[1], reverse=True)
return dict(sorted_contexts[:5])
def _generate_recommendations(self, patterns):
"""توليد توصيات بناءً على أنماط التعلم"""
recommendations = []
total_learnings = patterns.get("total_learnings", 0)
recent_activity = patterns.get("recent_activity", 0)
if total_learnings == 0:
recommendations.append("ابدأ بتسجيل التفاعلات الأولى للتعلم منها")
elif recent_activity == 0:
recommendations.append("لا توجد أنشطة تعلم حديثة، حاول التفاعل أكثر")
elif recent_activity / total_learnings > 0.7:
recommendations.append("أداء ممتاز! استمر في التعلم من التفاعلات الحديثة")
common_contexts = patterns.get("common_contexts", {})
if common_contexts:
top_context = list(common_contexts.keys())[0]
recommendations.append(f"ركز على تحسين الأداء في سياق: {top_context}")
return recommendations
# ===== نظام التعلم الذاتي المتكامل =====
class SelfLearningIntegration:
"""نظام التعلم الذاتي من self_learning"""
def __init__(self):
self.learning_system = None
self.knowledge_ai = None
self.setup_learning()
def setup_learning(self):
"""إعداد نظام التعلم الذاتي"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("self_learning", "self_learning.py")
learning_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(learning_module)
self.learning_system = learning_module.SelfLearningModule()
self.knowledge_ai = learning_module.SelfLearningAI()
logging.info("✅ تم تحميل نظام التعلم الذاتي بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل نظام التعلم الذاتي: {e}")
self.learning_system = None
self.knowledge_ai = None
def learn_and_respond(self, question):
"""التعلم والرد على السؤال"""
if not self.learning_system or not self.knowledge_ai:
return {"error": "نظام التعلم الذاتي غير متاح"}
try:
answer = self.knowledge_ai.get_answer(question)
if "لا أستطيع الإجابة" not in answer:
return {
"success": True,
"answer": answer,
"source": "knowledge_base",
"learned": False
}
module_response = self.learning_system.can_answer(question)
if "لا أمتلك معلومات" not in module_response:
return {
"success": True,
"answer": module_response,
"source": "learning_module",
"learned": False
}
return {
"success": False,
"answer": "لم أتعلم الإجابة على هذا السؤال بعد",
"needs_learning": True,
"question": question
}
except Exception as e:
logging.error(f"❌ خطأ في التعلم والرد: {e}")
return {"error": str(e)}
# ===== طبقة الذاكرة الذكية =====
class SmartMemoryLayer:
"""طبقة الذاكرة الذكية من memory_layer.py"""
def __init__(self):
self.memory_layer_available = False
self.setup_memory_layer()
def setup_memory_layer(self):
"""إعداد طبقة الذاكرة الذكية"""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("memory_layer", "memory_layer.py")
memory_layer_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(memory_layer_module)
self.memory_layer = memory_layer_module
self.memory_layer_available = True
logging.info("✅ تم تحميل طبقة الذاكرة الذكية بنجاح")
except Exception as e:
logging.warning(f"⚠️ فشل تحميل طبقة الذاكرة الذكية: {e}")
self.memory_layer_available = False
def cache_response(self, query, response, ttl_hours=24):
"""
تخزين الاستجابة في الذاكرة المؤقتة
"""
if not self.memory_layer_available:
return False
try:
ttl_seconds = ttl_hours * 3600 if ttl_hours else None
self.memory_layer.save_to_memory(query, response, ttl_seconds)
return True
except Exception as e:
logging.error(f"❌ خطأ في تخزين الاستجابة: {e}")
return False
def get_cached_response(self, query):
"""
الحصول على استجابة مخزنة مسبقاً
"""
if not self.memory_layer_available:
return None
try:
return self.memory_layer.get_from_memory(query)
except Exception as e:
logging.error(f"❌ خطأ في قراءة الاستجابة المخزنة: {e}")
return None
def smart_query(self, query, processing_function, ttl_hours=24):
"""
استعلام ذكي يستخدم التخزين المؤقت
"""
cached_response = self.get_cached_response(query)
if cached_response:
logging.info(f"🔄 استخدام استجابة مخزنة للاستعلام: {query[:50]}...")
return {
"response": cached_response,
"source": "cache",
"cached": True
}
logging.info(f"🧠 معالجة جديدة للاستعلام: {query[:50]}...")
new_response = processing_function(query)
if new_response:
self.cache_response(query, new_response, ttl_hours)
return {
"response": new_response,
"source": "live_processing",
"cached": False
}
def clear_expired_entries(self):
"""مسح المدن المنتهية الصلاحية"""
if not self.memory_layer_available:
return 0
try:
logging.info("🔄 فحص الذاكرة المؤقتة للمدن المنتهية...")
return 0
except Exception as e:
logging.error(f"❌ خطأ في مسح المدن المنتهية: {e}")
return 0
def get_cache_stats(self):
"""الحصول على إحصائيات التخزين المؤقت"""
if not self.memory_layer_available:
return {"error": "طبقة الذاكرة غير متاحة"}
try:
if os.path.exists("memory.json"):
with open("memory.json", "r", encoding="utf-8") as f:
memory_data = json.load(f)
total_entries = len(memory_data)
expired_count = 0
current_time = time.time()
for key, value in memory_data.items():
ttl = value.get("_ttl_seconds")
timestamp = value.get("_timestamp", 0)
if ttl and (current_time - timestamp) > ttl:
expired_count += 1
return {
"total_entries": total_entries,
"active_entries": total_entries - expired_count,
"expired_entries": expired_count,
"cache_size": f"{os.path.getsize('memory.json') / 1024:.2f} KB"
}
else:
return {
"total_entries": 0,
"active_entries": 0,
"expired_entries": 0,
"cache_size": "0 KB"
}
except Exception as e:
logging.error(f"❌ خطأ في إحصائيات التخزين المؤقت: {e}")
return {"error": str(e)}
# ===== نظام الذكاء الاصطناعي الأساسي المحدث =====
class BrainAI:
def __init__(self):
# الأنظمة الحالية
self.task_distributor = TaskDistributor()
self.audio_integration = AudioAnalyzerIntegration()
self.file_integration = FileAnalyzerIntegration()
self.link_integration = LinkInspectorIntegration()
self.local_audio_transcriber = LocalAudioTranscriber()
self.video_processor = VideoProcessorIntegration()
self.image_analyzer = ImageAnalyzerIntegration()
self.search_integration = SearchIntegration()
self.self_improvement = SelfImprovementIntegration()
self.self_learning = SelfLearningIntegration()
self.smart_memory = SmartMemoryLayer()
self.image_generator = ImageGeneratorIntegration() # الإصدار المحسن
# الأنظمة الجديدة للتحكم
self.responses_controller = ResponsesController()
self.responses_manager = ResponsesManagerController()
# نظام Ollama المتكامل
self.ollama_manager = ollama_manager
# نظام الردود المحسن
self.response_system = None
if RESPONSES_SYSTEM_AVAILABLE:
try:
self.response_system = EnhancedIntegratedResponseSystem()
logging.info("✅ تم تهيئة نظام الردود المحسن")
except Exception as e:
logging.warning(f"⚠️ فشل تهيئة نظام الردود: {e}")
self.task_distributor.start()
logging.info("🧠 تم تهيئة نظام الذكاء الاصطناعي BrainAI مع نظام Ollama المتكامل")
def _extract_search_query(self, text):
"""استخراج كلمة البحث من النص"""
search_prefixes = ["ابحث عن", "ما هو", "معلومات عن", "بحث"]
for prefix in search_prefixes:
if prefix in text:
return text.split(prefix, 1)[1].strip()
return text
def _extract_image_prompt(self, text):
"""استخراج وصف الصورة من النص"""
image_prefixes = ["ارسم", "رسم", "اصنع", "اصنعي", "انشي", "أنشي", "توليد", "صورة", "صور"]
for prefix in image_prefixes:
if prefix in text:
return text.split(prefix, 1)[1].strip()
return text
def _on_search_completed(self, task):
"""عند اكتمال مهمة البحث"""
logging.info(f"✅ اكتمل البحث: {task['task_id']}")
if task.get('result'):
logging.info(f"🔍 تم العثور على {task['result'].get('total_results', 0)} نتيجة")
def _on_image_generated(self, task):
"""عند اكتمال توليد الصورة"""
logging.info(f"✅ اكتمل توليد الصورة: {task['task_id']}")
if task.get('result') and task['result'].get('success'):
logging.info(f"🎨 تم توليد صورة للوصف: {task['data']['prompt']}")
else:
logging.error(f"❌ فشل توليد الصورة: {task.get('error')}")
def _process_text_message_normal(self, text, urls):
"""المعالجة العادية للنص مع الرد الفعلي - الإصدار المصحح"""
# ⭐ أولاً: محاولة الحصول على رد فوري
instant_reply = generate_reply_brain(text)
# ⭐ إذا كان هناك روابط، نضيف معلومات عنها
url_info = ""
if urls:
url_info = f" 📎 (تم العثور على {len(urls)} روابط)"
# ⭐ إنشاء الرد النهائي
final_reply = f"{instant_reply}{url_info}"
# ⭐ إرسال المهمة للخلفية للمعالجة الإضافية (إذا لزم الأمر)
task_id = self.task_distributor.submit_task(
'text',
text,
callback=self._on_text_processed
)
url_results = {}
if urls:
url_task_id = self.task_distributor.submit_task(
'url',
urls,
callback=self._on_urls_processed
)
url_results = {"url_task_id": url_task_id}
return {
"success": True,
"reply": final_reply, # ⭐ الرد الفعلي
"message": "تم معالجة الرسالة بنجاح",
"text_task_id": task_id,
"url_results": url_results,
"processing_method": "smart_reply_with_background_processing"
}
def _process_text_message(self, text, urls):
"""معالجة الرسالة النصية مع إرجاع الرد الفعلي - الإصدار المصحح"""
# ⭐ أولاً: محاولة الرد الفوري باستخدام Ollama
try:
ollama_response = self.ollama_manager.generate_response(text)
if ollama_response and not ollama_response.startswith(("خطأ", "انتهت", "فشل")):
logging.info(f"✅ تم استخدام Ollama للرد على: {text[:50]}...")
return {
"success": True,
"reply": ollama_response,
"processed": True,
"method": "ollama_instant_reply"
}
except Exception as e:
logging.warning(f"⚠️ فشل Ollama في الرد الفوري: {e}")
# ⭐ ثانياً: استخدام generate_reply_brain للرد الفوري
try:
instant_reply = generate_reply_brain(text)
if instant_reply and "عذراً" not in instant_reply and "خطأ" not in instant_reply:
return {
"success": True,
"reply": instant_reply,
"processed": True,
"method": "brain_instant_reply"
}
except Exception as e:
logging.warning(f"⚠️ فشل الرد من brain: {e}")
# ⭐ ثالثاً: المعالجة الذكية للطلبات الخاصة
image_keywords = ["ارسم", "رسم", "صورة", "صور", "انشي", "أنشي", "توليد", "اصنع", "اصنعي"]
if any(keyword in text for keyword in image_keywords):
prompt = self._extract_image_prompt(text)
# استخدام نظام توليد الصور المحسن
result = self.image_generator.generate_image_sync(prompt)
if result.get("success"):
return {
"success": True,
"reply": f"🎨 تم توليد صورة للوصف: '{prompt}'",
"image_generated": True,
"generator_used": result.get("generator_used", "unknown"),
"processing_method": "enhanced_image_generation"
}
else:
# إذا فشل توليد الصورة، نقدم رداً بديلاً
return {
"success": True,
"reply": f"⚠️ نظام توليد الصور مؤقتاً غير متاح. طلبت رسم: '{prompt}'.\nيمكنك تجربة المولد البسيط عبر /generate/image/fallback",
"image_generated": False,
"error": result.get("error", "خطأ غير معروف"),
"processing_method": "image_generation_fallback"
}
search_keywords = ["ابحث عن", "ما هو", "معلومات عن", "بحث"]
if any(keyword in text for keyword in search_keywords):
query = self._extract_search_query(text)
task_id = self.task_distributor.submit_task(
'search',
{"query": query, "type": "smart_search"},
callback=self._on_search_completed
)
return {
"success": True,
"message": "تم التعرف على طلب بحث، جاري البحث...",
"search_task_id": task_id,
"query": query,
"reply": f"🔍 جاري البحث عن: '{query}'...",
"processing_method": "search"
}
# ⭐ رابعاً: الرد الافتراضي الذكي
default_reply = generate_reply_brain(text)
return {
"success": True,
"reply": default_reply,
"processed": True,
"method": "default_brain_reply"
}
def process_message(self, message_data):
"""
معالجة رسالة باستخدام النظام الذكي
"""
try:
message_type = message_data.get('type', 'text')
content = message_data.get('content', '')
files = message_data.get('files', [])
urls = message_data.get('urls', [])
logging.info(f"🧠 معالجة رسالة من نوع: {message_type}")
if message_type == 'text' and content:
return self._process_text_message(content, urls)
elif message_type == 'file' and files:
return self._process_files(files)
elif message_type == 'audio' and files:
return self._process_audio_files(files)
elif message_type == 'video' and files:
return self._process_video_files(files)
elif message_type == 'image' and files:
return self._process_image_files(files)
elif urls:
return self._process_urls(urls)
else:
return {"error": "نوع الرسالة غير مدعوم أو بيانات غير كافية"}
except Exception as e:
logging.error(f"❌ خطأ في معالجة الرسالة: {e}")
return {"error": str(e)}
def process_message_with_learning(self, message_data, user_id=None):
"""
معالجة رسالة مع تسجيل التعلم - النسخة المحسنة
"""
try:
# المعالجة العادية
processing_result = self.process_message(message_data)
# تسجيل التعلم إذا كانت رسالة نصية
if message_data.get('type') == 'text' and processing_result.get('success'):
content = message_data.get('content', '')
reply = processing_result.get('reply', '')
if content and reply:
# تسجيل التعلم من التفاعل
learning_result = self.self_improvement.learn_from_interaction(
context="user_interaction",
user_input=content,
bot_response=reply,
user_id=user_id
)
# تخزين الرد في الذاكرة المؤقتة
try:
self.smart_memory.cache_response(
query=content,
response=reply,
ttl_hours=24
)
except Exception as e:
logging.warning(f"⚠️ فشل تخزين الرد في الذاكرة: {e}")
processing_result["learning_data"] = learning_result
return processing_result
except Exception as e:
logging.error(f"❌ خطأ في المعالجة مع التعلم: {e}")
return {"error": str(e)}
def _process_files(self, files):
"""معالجة الملفات"""
results = []
for file_info in files:
file_path = file_info.get('path')
filename = file_info.get('name', '')
if not file_path or not os.path.exists(file_path):
results.append({"file": filename, "error": "الملف غير موجود"})
continue
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
if file_ext in ['mp3', 'wav', 'ogg', 'm4a']:
task_id = self.task_distributor.submit_task(
'audio',
file_info,
callback=self._on_audio_processed
)
results.append({"file": filename, "type": "audio", "task_id": task_id})
elif file_ext in ['mp4', 'avi', 'mov', 'mkv']:
task_id = self.task_distributor.submit_task(
'video',
file_info,
callback=self._on_video_processed
)
results.append({"file": filename, "type": "video", "task_id": task_id})
elif file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp']:
task_id = self.task_distributor.submit_task(
'image',
file_info,
callback=self._on_image_processed
)
results.append({"file": filename, "type": "image", "task_id": task_id})
elif self.file_integration.is_supported_file(filename):
task_id = self.task_distributor.submit_task(
'file',
file_info,
callback=self._on_file_processed
)
results.append({"file": filename, "type": "document", "task_id": task_id})
else:
results.append({"file": filename, "error": "نوع الملف غير مدعوم"})
return {
"success": True,
"message": "تم استلام الملفات وجاري معالجتها",
"file_results": results,
"total_files": len(files),
"processing_method": "smart_task_distribution"
}
def _process_audio_files(self, audio_files):
"""معالجة الملفات الصوتية"""
results = []
for audio_info in audio_files:
file_path = audio_info.get('path')
filename = audio_info.get('name', '')
if not file_path or not os.path.exists(file_path):
results.append({"file": filename, "error": "الملف غير موجود"})
continue
local_result = self.local_audio_transcriber.transcribe_audio_file(file_path)
if local_result and not local_result.get('error'):
results.append({
"file": filename,
"processing_method": "local_transcriber",
"text": local_result.get('text', ''),
"word_count": local_result.get('word_count', 0),
"duration": local_result.get('duration', 0)
})
else:
audio_result = self.audio_integration.analyze_audio_file(file_path, filename)
text = self.audio_integration.get_transcription_text(audio_result)
if text:
results.append({
"file": filename,
"processing_method": "audio_analyzer",
"text": text,
"word_count": len(text.split())
})
else:
results.append({
"file": filename,
"error": "فشل تحويل الصوت إلى نص",
"details": audio_result.get('error', 'خطأ غير معروف')
})
return {
"success": True,
"message": "تم معالجة الملفات الصوتية",
"audio_results": results,
"total_files": len(audio_files)
}
def _process_video_files(self, video_files):
"""معالجة ملفات الفيديو"""
results = []
for video_info in video_files:
file_path = video_info.get('path')
filename = video_info.get('name', '')
if not file_path or not os.path.exists(file_path):
results.append({"file": filename, "error": "الملف غير موجود"})
continue
audio_result = self.video_processor.extract_audio_from_video(file_path)
if audio_result and audio_result.get('success'):
audio_file = audio_result['audio_file']
transcription_result = self.local_audio_transcriber.transcribe_audio_file(audio_file)
video_summary = self.video_processor.generate_video_summary(file_path)
results.append({
"file": filename,
"audio_extraction": audio_result,
"transcription": transcription_result,
"video_summary": video_summary,
"processing_method": "video_processor"
})
try:
if os.path.exists(audio_file):
os.remove(audio_file)
except:
pass
else:
results.append({
"file": filename,
"error": "فشل معالجة الفيديو",
"details": audio_result.get('error', 'خطأ غير معروف')
})
return {
"success": True,
"message": "تم معالجة ملفات الفيديو",
"video_results": results,
"total_files": len(video_files)
}
def _process_image_files(self, image_files):
"""معالجة ملفات الصور"""
results = []
for image_info in image_files:
file_path = image_info.get('path')
filename = image_info.get('name', '')
if not file_path or not os.path.exists(file_path):
results.append({"file": filename, "error": "الملف غير موجود"})
continue
analysis_result = self.image_analyzer.analyze_image(file_path)
if analysis_result and not analysis_result.get('error'):
text_result = self.image_analyzer.extract_text_from_image(file_path)
results.append({
"file": filename,
"analysis": analysis_result,
"text_extraction": text_result,
"processing_method": "image_analyzer"
})
else:
results.append({
"file": filename,
"error": "فشل تحليل الصورة",
"details": analysis_result.get('error', 'خطأ غير معروف')
})
return {
"success": True,
"message": "تم معالجة ملفات الصور",
"image_results": results,
"total_files": len(image_files)
}
def _process_urls(self, urls):
"""معالجة الروابط"""
analysis_result = self.link_integration.analyze_urls(urls)
if analysis_result and not analysis_result.get('error'):
return {
"success": True,
"message": "تم تحليل الروابط بنجاح",
"url_analysis": analysis_result,
"processing_method": "link_inspector"
}
else:
return {
"error": "فشل تحليل الروابط",
"details": analysis_result.get('error', 'خطأ غير معروف')
}
# دوال callback للمهام
def _on_text_processed(self, task):
"""عند اكتمال معالجة النص"""
logging.info(f"✅ اكتملت معالجة النص: {task['task_id']}")
def _on_urls_processed(self, task):
"""عند اكتمال معالجة الروابط"""
logging.info(f"✅ اكتملت معالجة الروابط: {task['task_id']}")
def _on_audio_processed(self, task):
"""عند اكتمال معالجة الصوت"""
logging.info(f"✅ اكتملت معالجة الصوت: {task['task_id']}")
def _on_video_processed(self, task):
"""عند اكتمال معالجة الفيديو"""
logging.info(f"✅ اكتملت معالجة الفيديو: {task['task_id']}")
def _on_image_processed(self, task):
"""عند اكتمال معالجة الصورة"""
logging.info(f"✅ اكتملت معالجة الصورة: {task['task_id']}")
def _on_file_processed(self, task):
"""عند اكتمال معالجة الملف"""
logging.info(f"✅ اكتملت معالجة الملف: {task['task_id']}")
def get_system_status(self):
"""الحصول على حالة النظام المحدث"""
ollama_connected = self.ollama_manager.test_connection()
return {
"brain_status": "running",
"task_distributor": self.task_distributor.get_system_stats(),
"ollama_integration": {
"connected": ollama_connected,
"available_models": len(self.ollama_manager.available_models),
"default_model": self.ollama_manager.default_model,
"models_list": self.ollama_manager.available_models[:10] # أول 10 نماذج فقط
},
"integrations": {
"audio_analyzer": self.audio_integration.is_connected,
"file_analyzer": self.file_integration.is_connected,
"link_inspector": self.link_integration.link_inspector_available,
"local_audio_transcriber": self.local_audio_transcriber.transcriber_available,
"video_processor": self.video_processor.video_processor_available,
"image_analyzer": self.image_analyzer.image_analyzer_available,
"image_generator": {
"connected": self.image_generator.is_connected,
"fallback_generators": len(self.image_generator.fallback_generators) if hasattr(self.image_generator, 'fallback_generators') else 0,
"simple_generator": SIMPLE_GENERATOR_AVAILABLE
},
"responses_system": self.responses_controller.responses_system,
"responses_manager": self.responses_manager.manager_bot
},
"knowledge_system": KNOWLEDGE_SEARCH_AVAILABLE,
"learning_system": SMART_LEARNER_AVAILABLE,
"media_analyzer": MEDIA_ANALYZER_AVAILABLE,
"responses_system_legacy": RESPONSES_SYSTEM_AVAILABLE,
"enhanced_responses": self.response_system is not None,
"timestamp": datetime.now().isoformat()
}
# ===== نظام معالجة المهام من run_task =====
class TaskProcessor:
"""معالج المهام المتكامل مع run_task"""
def __init__(self, brain_ai_system):
self.brain_ai = brain_ai_system
def process_task(self, task_id):
"""معالجة المهمة بناءً على task_id"""
if task_id == "1":
return self._matrix_multiply(500)
elif task_id == "2":
return self._prime_calculation(100000)
elif task_id == "3":
return self._data_processing(10000)
else:
return {"error": "رقم المهمة غير معروف"}
def _matrix_multiply(self, size):
"""تنفيذ ضرب المصفوفات"""
import numpy as np
A = np.random.rand(size, size)
B = np.random.rand(size, size)
result = np.dot(A, B)
return {
"task": "matrix_multiply",
"matrix_size": size,
"result_shape": result.shape,
"sample_data": result[:3, :3].tolist()
}
def _prime_calculation(self, limit):
"""حساب الأعداد الأولية"""
primes = []
for num in range(2, limit + 1):
is_prime = True
for i in range(2, int(num**0.5) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
primes.append(num)
return {
"task": "prime_calculation",
"limit": limit,
"primes_count": len(primes),
"primes_sample": primes[:10] if primes else []
}
def _data_processing(self, count):
"""معالجة البيانات"""
import pandas as pd
import numpy as np
data = {
'id': range(count),
'value': np.random.rand(count) * 100,
'category': np.random.choice(['A', 'B', 'C'], count)
}
df = pd.DataFrame(data)
stats = {
'mean': df['value'].mean(),
'std': df['value'].std(),
'category_counts': df['category'].value_counts().to_dict()
}
return {
"task": "data_processing",
"records_processed": count,
"statistics": stats,
"data_sample": df.head().to_dict('records')
}
# تهيئة النظام
brain_ai = BrainAI()
task_processor = TaskProcessor(brain_ai)
# ===== endpoints نظام الردود =====
@app.route('/brain/responses/save', methods=['POST'])
def save_response():
"""حفظ رد في الذاكرة"""
try:
data = request.get_json()
if not data or 'question' not in data or 'answer' not in data:
return jsonify({"error": "بيانات غير كافية"}), 400
question = data['question']
answer = data['answer']
category = data.get('category', 'general')
success = save_conversation(question, answer, category)
return jsonify({
"success": success,
"message": "تم حفظ الرد في الذاكرة" if success else "فشل حفظ الرد"
})
except Exception as e:
logging.error(f"❌ خطأ في حفظ الرد: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/brain/responses/get', methods=['POST'])
def get_saved_response():
"""الحصول على رد محفوظ"""
try:
data = request.get_json()
if not data or 'question' not in data:
return jsonify({"error": "بيانات غير كافية"}), 400
question = data['question']
response = get_saved_response(question)
if response:
return jsonify({
"success": True,
"question": question,
"answer": response,
"source": "memory"
})
else:
return jsonify({
"success": False,
"message": "لا يوجد رد محفوظ لهذا السؤال"
})
except Exception as e:
logging.error(f"❌ خطأ في استرجاع الرد: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/brain/responses/popular', methods=['GET'])
def get_popular_responses():
"""الحصول على الردود الأكثر استخداماً"""
try:
if hasattr(brain_ai, 'response_system') and brain_ai.response_system:
popular = brain_ai.response_system.memory_system.get_popular_responses(20)
return jsonify({
"success": True,
"popular_responses": [
{"question": q, "answer": a, "usage_count": count}
for q, a, count in popular
]
})
else:
return jsonify({"error": "نظام الردود غير متاح"})
except Exception as e:
logging.error(f"❌ خطأ في الحصول على الردود الشائعة: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoints التحكم في responses.py =====
@app.route('/brain/responses/chat', methods=['POST'])
def chat_with_responses():
"""محادثة باستخدام نظام responses"""
try:
data = request.get_json()
if not data or 'message' not in data:
return jsonify({"error": "لم يتم إرسال رسالة"}), 400
message = data['message']
username = data.get('username', 'default')
reply = brain_ai.responses_controller.get_reply(message, username)
return jsonify({
"success": True,
"message": message,
"reply": reply,
"username": username,
"system": "responses"
})
except Exception as e:
logging.error(f"❌ خطأ في المحادثة: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/brain/responses/memory', methods=['POST'])
def get_user_memory():
"""الحصول على ذاكرة مستخدم"""
try:
data = request.get_json()
if not data or 'username' not in data:
return jsonify({"error": "لم يتم إرسال اسم المستخدم"}), 400
username = data['username']
memory = brain_ai.responses_controller.load_memory(username)
return jsonify({
"success": True,
"username": username,
"memory_size": len(memory),
"memory": memory
})
except Exception as e:
logging.error(f"❌ خطأ في قراءة الذاكرة: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoints التحكم في responses_manager_bot =====
@app.route('/brain/manager/teach', methods=['POST'])
def teach_new_response():
"""تعليم رد جديد"""
try:
data = request.get_json()
if not data or 'question' not in data or 'answer' not in data:
return jsonify({"error": "بيانات غير كافية"}), 400
question = data['question']
answer = data['answer']
result = brain_ai.responses_manager.teach_response(question, answer)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في تعليم الرد: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/brain/manager/responses', methods=['GET'])
def get_managed_responses():
"""الحصول على جميع الردود المدارة"""
result = brain_ai.responses_manager.get_all_responses()
return jsonify(result)
@app.route('/brain/manager/stats', methods=['GET'])
def get_manager_stats():
"""إحصائيات نظام إدارة الردود"""
responses_result = brain_ai.responses_manager.get_all_responses()
if responses_result.get('success'):
responses = responses_result.get('responses', {})
return jsonify({
"success": True,
"total_responses": len(responses),
"manager_status": "active",
"sample_responses": dict(list(responses.items())[:5]) # أول 5 ردود فقط
})
else:
return jsonify({
"success": False,
"error": responses_result.get('error')
})
# ===== endpoints توليد الصور =====
@app.route('/generate/image', methods=['POST'])
def generate_image():
"""توليد صورة باستخدام الذكاء الاصطناعي"""
try:
data = request.get_json()
if not data or 'prompt' not in data:
return jsonify({"error": "لم يتم إرسال وصف الصورة"}), 400
prompt = data['prompt']
model = data.get('model', 'Deliberate')
result = brain_ai.image_generator.generate_image_sync(prompt, model)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في توليد الصورة: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/generate/image/async', methods=['POST'])
def generate_image_async():
"""توليد صورة بشكل غير متزامن"""
try:
data = request.get_json()
if not data or 'prompt' not in data:
return jsonify({"error": "لم يتم إرسال وصف الصورة"}), 400
prompt = data['prompt']
model = data.get('model', 'Deliberate')
task_id = brain_ai.task_distributor.submit_task(
'image_generation',
{"prompt": prompt, "model": model},
callback=brain_ai._on_image_generated
)
return jsonify({
"success": True,
"message": "جاري توليد الصورة...",
"task_id": task_id,
"prompt": prompt
})
except Exception as e:
logging.error(f"❌ خطأ في طلب توليد الصورة: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/generate/image/fallback', methods=['POST'])
def generate_image_fallback():
"""توليد صورة باستخدام المولد الاحتياطي البسيط"""
try:
data = request.get_json()
if not data or 'prompt' not in data:
return jsonify({"error": "لم يتم إرسال وصف الصورة"}), 400
prompt = data['prompt']
if SIMPLE_GENERATOR_AVAILABLE:
image_data = generate_simple_image(prompt)
if image_data:
encoded_image = base64.b64encode(image_data).decode('utf-8')
return jsonify({
"success": True,
"prompt": prompt,
"image_base64": encoded_image,
"image_size": len(image_data),
"generator_used": "simple_fallback",
"message": "تم إنشاء صورة توضيحية بسيطة"
})
else:
return jsonify({
"success": False,
"error": "فشل المولد البسيط في إنشاء صورة"
})
else:
return jsonify({
"success": False,
"error": "المولد البسيط غير متاح",
"message": f"طُلب إنشاء صورة للوصف: '{prompt}' ولكن نظام توليد الصور معطل"
})
except Exception as e:
logging.error(f"❌ خطأ في توليد الصورة الاحتياطي: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoints الذاكرة الذكية =====
@app.route('/brain/memory/cache', methods=['POST'])
def cache_response():
"""تخزين استجابة في الذاكرة المؤقتة"""
try:
data = request.get_json()
if not data or 'query' not in data or 'response' not in data:
return jsonify({"error": "بيانات غير كافية"}), 400
query = data['query']
response = data['response']
ttl_hours = data.get('ttl_hours', 24)
success = brain_ai.smart_memory.cache_response(query, response, ttl_hours)
return jsonify({"success": success})
except Exception as e:
logging.error(f"❌ خطأ في التخزين المؤقت: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/brain/memory/stats', methods=['GET'])
def get_cache_stats():
"""إحصائيات التخزين المؤقت"""
stats = brain_ai.smart_memory.get_cache_stats()
return jsonify(stats)
# ===== endpoints البحث =====
@app.route('/search/web', methods=['POST'])
def search_web():
"""البحث في الويب"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({"error": "لم يتم إرسال كلمة البحث"}), 400
query = data['query']
search_engine = data.get('engine', 'duckduckgo')
max_results = data.get('max_results', 5)
result = brain_ai.search_integration.search_web(query, search_engine, max_results)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في البحث: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/search/smart', methods=['POST'])
def smart_search():
"""بحث ذكي متعدد المحركات"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({"error": "لم يتم إرسال كلمة البحث"}), 400
query = data['query']
max_results = data.get('max_results', 5)
result = brain_ai.search_integration.smart_search(query, max_results)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في البحث الذكي: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoints التحسين الذاتي =====
@app.route('/improvement/learn', methods=['POST'])
def learn_from_interaction():
"""التعلم من التفاعل"""
try:
data = request.get_json()
if not data or 'context' not in data or 'user_input' not in data or 'bot_response' not in data:
return jsonify({"error": "بيانات غير كافية للتعلم"}), 400
context = data['context']
user_input = data['user_input']
bot_response = data['bot_response']
correct_response = data.get('correct_response')
user_id = data.get('user_id')
result = brain_ai.self_improvement.learn_from_interaction(
context, user_input, bot_response, correct_response, user_id
)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في التعلم من التفاعل: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/improvement/correct', methods=['POST'])
def correct_response():
"""تصحيح رد خاطئ للنظام"""
try:
data = request.get_json()
if not data or 'user_input' not in data or 'wrong_response' not in data or 'correct_response' not in data:
return jsonify({"error": "بيانات غير كافية للتصحيح"}), 400
context = data.get('context', 'general')
user_input = data['user_input']
wrong_response = data['wrong_response']
correct_response = data['correct_response']
user_id = data.get('user_id')
result = brain_ai.self_improvement.learn_from_interaction(
context=context,
user_input=user_input,
bot_response=wrong_response,
correct_response=correct_response,
user_id=user_id
)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في التصحيح: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/improvement/status', methods=['GET'])
def get_improvement_status():
"""الحصول على حالة التحسين"""
result = brain_ai.self_improvement.get_improvement_status()
return jsonify(result)
@app.route('/improvement/simulate', methods=['POST'])
def simulate_learning():
"""محاكاة عملية التعلم"""
try:
data = request.get_json()
if not data or 'question' not in data:
return jsonify({"error": "لم يتم إرسال السؤال"}), 400
question = data['question']
result = brain_ai.self_improvement.simulate_learning_process(question)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في محاكاة التعلم: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/improvement/analyze', methods=['GET'])
def analyze_learning_patterns():
"""تحليل أنماط التعلم"""
result = brain_ai.self_improvement.analyze_learning_patterns()
return jsonify(result)
# ===== endpoint التعلم من الملفات =====
@app.route('/learn/from_file', methods=['POST'])
def learn_from_file():
"""التعلم من ملف نصي"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم إرسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
# حفظ الملف مؤقتاً
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
# تحليل الملف واستخراج المعرفة
file_result = brain_ai.file_integration.analyze_file(file_path, file.filename)
if file_result.get('success'):
# استخراج النص من التحليل
analysis = file_result.get('analysis', {})
text_content = analysis.get('text_content', '')
if text_content:
# تقسيم النص إلى جمل وتعلمها
sentences = text_content.split('.')
learned_count = 0
for sentence in sentences[:10]: # أول 10 جمل فقط
sentence = sentence.strip()
if len(sentence) > 20: # جمل ذات معنى
# حفظ كمعرفة عامة
brain_ai.responses_manager.teach_response(
f"معلومات عن {sentence[:30]}...",
sentence
)
learned_count += 1
result = {
"success": True,
"message": f"تم تعلم {learned_count} معلومة جديدة من الملف",
"file_analysis": file_result,
"learned_sentences": learned_count
}
else:
result = {"error": "لم يتم استخراج نص من الملف"}
else:
result = {"error": "فشل تحليل الملف"}
# تنظيف الملف المؤقت
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في التعلم من الملف: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoint موحد لجميع الأنظمة =====
@app.route('/brain/control', methods=['POST'])
def control_all_systems():
"""تحكم مركزي في جميع الأنظمة"""
try:
data = request.get_json()
if not data or 'action' not in data:
return jsonify({"error": "لم يتم تحديد الإجراء"}), 400
action = data['action']
if action == "status":
return jsonify(brain_ai.get_system_status())
elif action == "chat":
message = data.get('message', '')
username = data.get('username', 'default')
reply = brain_ai.responses_controller.get_reply(message, username)
return jsonify({"reply": reply, "system": "responses"})
elif action == "teach":
question = data.get('question', '')
answer = data.get('answer', '')
result = brain_ai.responses_manager.teach_response(question, answer)
return jsonify(result)
elif action == "generate_image":
prompt = data.get('prompt', '')
model = data.get('model', 'Deliberate')
result = brain_ai.image_generator.generate_image_sync(prompt, model)
return jsonify(result)
elif action == "generate_image_fallback":
prompt = data.get('prompt', '')
if SIMPLE_GENERATOR_AVAILABLE:
image_data = generate_simple_image(prompt)
if image_data:
encoded_image = base64.b64encode(image_data).decode('utf-8')
return jsonify({
"success": True,
"image_base64": encoded_image[:100] + "...",
"generator_used": "simple_fallback"
})
else:
return jsonify({"error": "فشل المولد البسيط"})
else:
return jsonify({"error": "المولد البسيط غير متاح"})
elif action == "search":
query = data.get('query', '')
result = brain_ai.search_integration.smart_search(query)
return jsonify(result)
elif action == "learn_from_file":
# محاكاة التعلم من ملف
file_content = data.get('file_content', '')
if file_content:
sentences = file_content.split('.')
learned_count = 0
for sentence in sentences[:5]:
sentence = sentence.strip()
if len(sentence) > 10:
brain_ai.responses_manager.teach_response(
f"معلومات عن {sentence[:20]}...",
sentence
)
learned_count += 1
return jsonify({
"success": True,
"message": f"تم تعلم {learned_count} معلومة من المحتوى"
})
else:
return jsonify({"error": "لم يتم إرسال محتوى الملف"})
else:
return jsonify({"error": f"الإجراء غير معروف: {action}"})
except Exception as e:
logging.error(f"❌ خطأ في التحكم المركزي: {e}")
return jsonify({"error": str(e)}), 500
# ===== endpoint مخصص لـ run_task =====
@app.route("/run_task_compatible", methods=["POST"])
def run_task_compatible():
"""Endpoint متوافق مع run_task الأصلي"""
if request.is_json:
data = request.get_json()
task_id = data.get("task_id")
else:
task_id = request.form.get("task_id")
if not task_id:
return jsonify(error="Missing task_id"), 400
result = task_processor.process_task(task_id)
return jsonify(result=result)
# ===== endpoint محادثة مباشرة مع إرجاع الرد فقط =====
@app.route('/chat', methods=['POST'])
def chat_direct():
"""محادثة مباشرة مع إرجاع الرد فقط - للإستخدام في التطبيقات"""
try:
data = request.get_json()
if not data or 'message' not in data:
return jsonify({"error": "لم يتم إرسال رسالة"}), 400
message = data['message']
username = data.get('username', 'default')
# ⭐ أولوية للرد باستخدام Ollama
ollama_response = brain_ai.ollama_manager.generate_response(message)
if ollama_response and not ollama_response.startswith(("خطأ", "انتهت", "فشل")):
return jsonify({
"reply": ollama_response,
"success": True,
"system": "ollama"
})
# ⭐ إذا فشل Ollama، نستخدم نظام الردود
reply = brain_ai.responses_controller.get_reply(message, username)
# ⭐ إذا فشل نظام الردود، نستخدم النظام الاحتياطي
if not reply or reply == "نظام الردود غير متاح حالياً":
reply = generate_reply_brain(message)
return jsonify({
"reply": reply,
"success": True,
"system": "fallback"
})
except Exception as e:
logging.error(f"❌ خطأ في المحادثة المباشرة: {e}")
return jsonify({
"reply": "عذراً، حدث خطأ في معالجة رسالتك.",
"success": False
}), 500
# ===== endpoints نظام Ollama =====
@app.route('/ollama/generate', methods=['POST'])
def ollama_generate():
"""توليد نص باستخدام Ollama"""
try:
data = request.get_json()
if not data or 'prompt' not in data:
return jsonify({"error": "لم يتم إرسال النص"}), 400
prompt = data['prompt']
model = data.get('model', 'noura:latest')
response = brain_ai.ollama_manager.generate_response(prompt, model)
is_success = not response.startswith(("خطأ", "انتهت", "فشل"))
return jsonify({
"success": is_success,
"response": response,
"model": model,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logging.error(f"❌ خطأ في توليد النص باستخدام Ollama: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/ollama/models', methods=['GET'])
def get_ollama_models():
"""الحصول على قائمة نماذج Ollama المتاحة"""
try:
models = brain_ai.ollama_manager.available_models
return jsonify({
"success": True,
"total_models": len(models),
"models": models,
"default_model": brain_ai.ollama_manager.default_model,
"ollama_connected": brain_ai.ollama_manager.test_connection()
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/ollama/chat', methods=['POST'])
def ollama_chat():
"""محادثة متعددة الجولات باستخدام Ollama"""
try:
data = request.get_json()
if not data or 'messages' not in data:
return jsonify({"error": "لم يتم إرسال الرسائل"}), 400
messages = data['messages']
model = data.get('model', 'noura:latest')
if not isinstance(messages, list):
return jsonify({"error": "يجب أن تكون الرسائل في قائمة"}), 400
response = brain_ai.ollama_manager.chat_conversation(messages, model)
is_success = not response.startswith(("خطأ", "انتهت", "فشل"))
return jsonify({
"success": is_success,
"response": response,
"model": model,
"messages_count": len(messages)
})
except Exception as e:
logging.error(f"❌ خطأ في محادثة Ollama: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/ollama/test', methods=['GET'])
def test_ollama():
"""اختبار اتصال Ollama"""
try:
is_connected = brain_ai.ollama_manager.test_connection()
if is_connected:
# اختبار بسيط
test_response = brain_ai.ollama_manager.generate_response("مرحباً", "noura:latest")
return jsonify({
"success": True,
"connected": True,
"test_response": test_response[:100] + "..." if len(test_response) > 100 else test_response,
"available_models": len(brain_ai.ollama_manager.available_models),
"default_model": brain_ai.ollama_manager.default_model
})
else:
return jsonify({
"success": False,
"connected": False,
"error": "لا يمكن الاتصال بـ Ollama",
"suggestion": "تأكد أن Ollama يعمل على http://127.0.0.1:11434"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/')
def home():
"""الصفحة الرئيسية المحدثة"""
ollama_connected = brain_ai.ollama_manager.test_connection()
return jsonify({
"message": "🧠 PTB Brain System - نظام التحكم المركزي مع Ollama المتكامل",
"status": "running",
"version": "13.15+",
"ollama_integration": {
"connected": ollama_connected,
"available_models": len(brain_ai.ollama_manager.available_models) if ollama_connected else 0,
"default_model": brain_ai.ollama_manager.default_model,
"endpoints": {
"توليد نص": "/ollama/generate",
"قائمة النماذج": "/ollama/models",
"المحادثة": "/ollama/chat",
"اختبار الاتصال": "/ollama/test"
}
},
"image_generator": {
"main_system": brain_ai.image_generator.is_connected,
"fallback_systems": len(brain_ai.image_generator.fallback_generators) if hasattr(brain_ai.image_generator, 'fallback_generators') else 0,
"simple_generator": SIMPLE_GENERATOR_AVAILABLE,
"endpoints": {
"توليد صور": "/generate/image",
"توليد احتياطي": "/generate/image/fallback",
"اختبار النظام": "/test/image_generator"
}
},
"controlled_systems": {
"ollama": ollama_connected,
"responses": brain_ai.responses_controller.responses_system,
"responses_manager": brain_ai.responses_manager.manager_bot,
"image_generator": brain_ai.image_generator.is_connected,
"search": brain_ai.search_integration.search_available,
"task_distributor": True,
"learning_system": True
},
"learning_endpoints": {
"تعليم رد جديد": "/brain/manager/teach",
"التعلم من الملفات": "/learn/from_file",
"تصحيح الأخطاء": "/improvement/correct",
"تحليل التعلم": "/improvement/analyze",
"محاكاة التعلم": "/improvement/simulate"
},
"endpoints": {
"التحكم المركزي": "/brain/control",
"المحادثة مع Ollama": "/ollama/chat",
"توليد نص بـ Ollama": "/ollama/generate",
"المحادثة": "/brain/responses/chat",
"تعليم ردود": "/brain/manager/teach",
"توليد صور": "/generate/image",
"توليد صور احتياطي": "/generate/image/fallback",
"البحث": "/search/smart",
"حالة النظام": "/status",
"محادثة مباشرة": "/chat"
},
"timestamp": datetime.now().isoformat()
})
@app.route('/status')
def status():
"""حالة النظام"""
return jsonify(brain_ai.get_system_status())
@app.route('/process', methods=['POST'])
def process_message():
"""معالجة رسالة مع إرجاع الرد الفعلي والتعلم - الإصدار المصحح"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "لا توجد بيانات"}), 400
# ⭐ استخدام النسخة المحسنة مع التعلم
user_id = data.get('user_id')
result = brain_ai.process_message_with_learning(data, user_id)
# ⭐ التأكد من وجود حقل 'reply' في النتيجة
if result.get('error'):
return jsonify(result), 400
else:
# إذا لم يكن هناك رد، نضيف رد افتراضي
if 'reply' not in result:
content = data.get('content', '')
if content:
default_reply = generate_reply_brain(content)
result['reply'] = default_reply
else:
result['reply'] = "تم استلام رسالتك بنجاح! 🎯"
return jsonify(result), 200
except Exception as e:
logging.error(f"❌ خطأ في معالجة الرسالة: {e}")
# ⭐ حتى في حالة الخطأ، نعيد رداً مفيداً
return jsonify({
"error": str(e),
"reply": "عذراً، حدث خطأ في معالجة رسالتك. يرجى المحاولة مرة أخرى."
}), 500
@app.route('/task/<task_id>')
def get_task_status(task_id):
"""الحصول على حالة مهمة"""
status = brain_ai.task_distributor.get_task_status(task_id)
return jsonify(status)
@app.route('/tasks')
def get_all_tasks():
"""الحصول على جميع المهام"""
stats = brain_ai.task_distributor.get_system_stats()
return jsonify(stats)
@app.route('/analyze/audio', methods=['POST'])
def analyze_audio():
"""تحليل ملف صوتي"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم ارسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
language = request.form.get('language', 'ar')
task = request.form.get('task', 'transcribe')
result = brain_ai.audio_integration.analyze_audio_file(
file_path, file.filename, language, task
)
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في تحليل الصوت: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/analyze/file', methods=['POST'])
def analyze_file():
"""تحليل ملف"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم ارسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
result = brain_ai.file_integration.analyze_file(file_path, file.filename)
if request.form.get('format') == 'xml' and not result.get('error'):
result['xml_analysis'] = brain_ai.file_integration.convert_analysis_to_xml(result)
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في تحليل الملف: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/analyze/urls', methods=['POST'])
def analyze_urls():
"""تحليل روابط"""
try:
data = request.get_json()
if not data or 'urls' not in data:
return jsonify({"error": "لم يتم ارسال روابط"}), 400
urls = data['urls']
if not isinstance(urls, list):
return jsonify({"error": "يجب ان تكون الروابط في قائمة"}), 400
result = brain_ai.link_integration.analyze_urls(urls)
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في تحليل الروابط: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/transcribe/audio', methods=['POST'])
def transcribe_audio():
"""تحويل ملف صوتي إلى نص محلياً"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم ارسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
language = request.form.get('language', 'ar')
model = request.form.get('model', 'small')
result = brain_ai.local_audio_transcriber.transcribe_audio_file(
file_path, language, model
)
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في التحويل الصوتي: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/process/video', methods=['POST'])
def process_video():
"""معالجة فيديو"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم ارسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
quality = request.form.get('quality', 'medium')
result = brain_ai.video_processor.process_video(file_path, temp_dir, quality)
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في معالجة الفيديو: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/analyze/image', methods=['POST'])
def analyze_image():
"""تحليل صورة"""
try:
if 'file' not in request.files:
return jsonify({"error": "لم يتم ارسال ملف"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "لم يتم اختيار ملف"}), 400
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(file_path)
analysis_type = request.form.get('analysis_type', 'full')
result = brain_ai.image_analyzer.analyze_image(file_path, analysis_type)
try:
shutil.rmtree(temp_dir)
except:
pass
return jsonify(result)
except Exception as e:
logging.error(f"❌ خطأ في تحليل الصورة: {e}")
return jsonify({"error": str(e)}), 500
# ===== إضافة إندبوينت اختبار توليد الصور =====
@app.route('/test/image_generator', methods=['GET'])
def test_image_generator():
"""اختبار نظام توليد الصور"""
try:
result = brain_ai.image_generator.generate_image_sync("قطة لطيفة")
return jsonify({
"generator_connected": brain_ai.image_generator.is_connected,
"fallback_generators": len(brain_ai.image_generator.fallback_generators) if hasattr(brain_ai.image_generator, 'fallback_generators') else 0,
"simple_generator": SIMPLE_GENERATOR_AVAILABLE,
"test_result": result
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ===== إضافة إندبوينت اختبار شامل =====
@app.route('/test/all', methods=['GET'])
def test_all_systems():
"""اختبار شامل لجميع الأنظمة"""
try:
results = {}
# اختبار Ollama
ollama_test = brain_ai.ollama_manager.test_connection()
results["ollama"] = {
"connected": ollama_test,
"models": len(brain_ai.ollama_manager.available_models) if ollama_test else 0
}
# اختبار الردود
results["responses"] = {
"system_available": brain_ai.responses_controller.responses_system,
"manager_available": brain_ai.responses_manager.manager_bot
}
# اختبار توليد الصور
results["image_generator"] = {
"main_system": brain_ai.image_generator.is_connected,
"simple_generator": SIMPLE_GENERATOR_AVAILABLE
}
# اختبار الأنظمة الأخرى
results["other_systems"] = {
"knowledge_search": KNOWLEDGE_SEARCH_AVAILABLE,
"smart_learner": SMART_LEARNER_AVAILABLE,
"media_analyzer": MEDIA_ANALYZER_AVAILABLE,
"task_distributor": True
}
# اختبار بسيط للرد
test_message = "مرحباً"
test_reply = generate_reply_brain(test_message)
results["test_response"] = {
"message": test_message,
"reply": test_reply[:100] + "..." if len(test_reply) > 100 else test_reply,
"success": test_reply and "عذراً" not in test_reply
}
return jsonify({
"success": True,
"results": results,
"timestamp": datetime.now().isoformat(),
"summary": f"✅ الأنظمة النشطة: {sum(1 for r in results.values() if isinstance(r, dict) and r.get('connected', False) or r.get('system_available', False))}/{len(results)}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ===== تشغيل التطبيق مع تحسينات =====
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("brain.log", encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logging.info("🚀 بدء تشغيل PTB Brain v13.15+ مع نظام Ollama المتكامل...")
logging.info(f"🌐 الخادم يعمل على {APP_HOST}:{APP_PORT}")
try:
logging.info("🔧 اختبار الأنظمة الفرعية...")
# اختبار Ollama
ollama_connected = brain_ai.ollama_manager.test_connection()
if ollama_connected:
logging.info(f"✅ Ollama متصل بنجاح مع {len(brain_ai.ollama_manager.available_models)} نموذج")
logging.info(f"📊 النماذج المتاحة: {', '.join(brain_ai.ollama_manager.available_models[:5])}...")
# اختبار توليد رد بسيط
test_response = brain_ai.ollama_manager.generate_response("مرحباً")
if test_response and not test_response.startswith(("خطأ", "انتهت")):
logging.info(f"✅ اختبار Ollama ناجح: {test_response[:50]}...")
else:
logging.warning(f"⚠️ اختبار Ollama: {test_response}")
else:
logging.warning("⚠️ Ollama غير متصل - تأكد من تشغيل Ollama على http://127.0.0.1:11434")
logging.info("ℹ️ سيستخدم النظام الردود الاحتياطية")
if KNOWLEDGE_SEARCH_AVAILABLE:
try:
test_response = quick_search("ما هو الذكاء الاصطناعي")
if test_response:
logging.info("✅ نظام المعرفة يعمل بشكل صحيح")
else:
logging.warning("⚠️ نظام المعرفة يعمل ولكن بدون نتائج")
except Exception as e:
logging.warning(f"⚠️ نظام المعرفة به مشكلة: {e}")
# اختبار نظام توليد الصور المحسن
if brain_ai.image_generator.is_connected:
logging.info("✅ نظام توليد الصور المحسن يعمل بشكل صحيح")
logging.info(f"📊 عدد المولدات الاحتياطية: {len(brain_ai.image_generator.fallback_generators) if hasattr(brain_ai.image_generator, 'fallback_generators') else 0}")
else:
logging.warning("⚠️ نظام توليد الصور غير متاح - تأكد من وجود ptb_bot.py")
# اختبار المولد البسيط
if SIMPLE_GENERATOR_AVAILABLE:
logging.info("✅ المولد البسيط متاح وجاهز للاستخدام")
else:
logging.warning("⚠️ المولد البسيط غير متاح - تأكد من وجود simple_image_generator.py")
if RESPONSES_SYSTEM_AVAILABLE:
logging.info("✅ نظام الردود المحسن يعمل بشكل صحيح")
else:
logging.warning("⚠️ نظام الردود المحسن غير متاح - تأكد من وجود responses.py")
# اختبار أنظمة التحكم الجديدة
if brain_ai.responses_controller.responses_system:
logging.info("✅ نظام التحكم في responses.py يعمل بشكل صحيح")
else:
logging.warning("⚠️ نظام التحكم في responses.py غير متاح")
if brain_ai.responses_manager.manager_bot:
logging.info("✅ نظام التحكم في responses_manager_bot.py يعمل بشكل صحيح")
else:
logging.warning("⚠️ نظام التحكم في responses_manager_bot.py غير متاح")
# اختبار نظام التعلم
learning_test = brain_ai.self_improvement.get_improvement_status()
if learning_test.get('success'):
logging.info("✅ نظام التعلم الذاتي يعمل بشكل صحيح")
else:
logging.warning(f"⚠️ نظام التعلم الذاتي: {learning_test.get('error', 'غير متاح')}")
systems_to_test = [
("نظام Ollama", ollama_connected),
("نظام الوسائط", MEDIA_ANALYZER_AVAILABLE),
("نظام البحث", brain_ai.search_integration.search_available),
("نظام التعلم", SMART_LEARNER_AVAILABLE),
("نظام تحليل الروابط", brain_ai.link_integration.link_inspector_available),
("نظام التحويل الصوتي", brain_ai.local_audio_transcriber.transcriber_available),
("نظام معالجة الفيديو", brain_ai.video_processor.video_processor_available),
("نظام تحليل الصور", brain_ai.image_analyzer.image_analyzer_available),
("نظام توليد الصور", brain_ai.image_generator.is_connected),
("المولدات الاحتياطية", len(brain_ai.image_generator.fallback_generators) if hasattr(brain_ai.image_generator, 'fallback_generators') else 0),
("المولد البسيط", SIMPLE_GENERATOR_AVAILABLE),
("نظام الردود المحسن", RESPONSES_SYSTEM_AVAILABLE),
("نظام التحكم في responses", brain_ai.responses_controller.responses_system),
("نظام التحكم في manager", brain_ai.responses_manager.manager_bot),
("نظام التعلم الذاتي", brain_ai.self_improvement.improvement_system is not None),
("نظام توزيع المهام", True),
("الذاكرة الذكية", brain_ai.smart_memory.memory_layer_available)
]
active_count = 0
for system_name, system_available in systems_to_test:
status = "✅" if system_available else "⚠️"
if system_available:
active_count += 1
logging.info(f"{status} {system_name}: {'مفعل' if system_available else 'غير مفعل'}")
logging.info(f"📊 إجمالي الأنظمة النشطة: {active_count}/{len(systems_to_test)}")
logging.info("🌐 بدء تشغيل خادم Flask...")
app.run(host=APP_HOST, port=APP_PORT, debug=False)
except Exception as e:
logging.error(f"❌ فشل تشغيل الخادم: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
#[file content end]