from flask import Flask, request, jsonify, render_template_string import os import threading import time import logging import json # تهيئة Flask app app = Flask(__name__) # تهيئة logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # حالة البوت bot_status = { "running": False, "start_time": None, "telegram_listener": False, "error": None } # المتغيرات العالمية userbot_instance = None telegram_listener_instance = None main_thread = None # استيراد الوحدات مع معالجة الأخطاء modules_info = { "telethon": {"available": False, "error": None}, "userbot": {"available": False, "error": None}, "telegram_listener": {"available": False, "error": None}, "moviepy": {"available": False, "error": None} } try: from telethon import TelegramClient modules_info["telethon"]["available"] = True logger.info("✅ Telethon module imported successfully") except ImportError as e: modules_info["telethon"]["error"] = str(e) logger.warning(f"⚠️ Telethon not available: {e}") try: from userbot import UserBot, start_bot modules_info["userbot"]["available"] = True logger.info("✅ UserBot module imported successfully") except ImportError as e: modules_info["userbot"]["error"] = str(e) logger.warning(f"⚠️ UserBot not available: {e}") try: from telegram_listener import TelegramListener modules_info["telegram_listener"]["available"] = True logger.info("✅ TelegramListener module imported successfully") except ImportError as e: modules_info["telegram_listener"]["error"] = str(e) logger.warning(f"⚠️ TelegramListener not available: {e}") try: import moviepy modules_info["moviepy"]["available"] = True logger.info("✅ MoviePy module imported successfully") except ImportError as e: modules_info["moviepy"]["error"] = str(e) logger.warning(f"⚠️ MoviePy not available: {e}") # تحديد إذا كانت الوحدات الأساسية متاحة essential_modules_available = ( modules_info["telethon"]["available"] and modules_info["userbot"]["available"] ) # حالة واجهة الدردشة chat_interface_state = { "assistants": [], "platforms": [], "messages": [], "settings": { "userName": "مستخدم", "theme": "default", "autoConnect": True } } def start_bot_wrapper(): """تشغيل البوت في thread منفصل""" global bot_status, userbot_instance, telegram_listener_instance try: if not essential_modules_available: missing_modules = [] for name, info in modules_info.items(): if not info["available"] and name in ["telethon", "userbot"]: missing_modules.append(name) error_msg = f"Essential modules not available: {', '.join(missing_modules)}" bot_status["error"] = error_msg logger.error(f"❌ {error_msg}") return # تشغيل UserBot إذا كان متاحًا if modules_info["userbot"]["available"]: userbot_instance = start_bot() logger.info("✅ UserBot started successfully") # تشغيل TelegramListener إذا كان متاحًا if modules_info["telegram_listener"]["available"]: telegram_listener_instance = TelegramListener() telegram_listener_instance.start() logger.info("✅ TelegramListener started successfully") bot_status["running"] = True bot_status["start_time"] = time.time() bot_status["telegram_listener"] = modules_info["telegram_listener"]["available"] bot_status["error"] = None logger.info("🎉 Bot started successfully with all available modules") except Exception as e: error_msg = f"Error starting bot: {str(e)}" logger.error(f"❌ {error_msg}") bot_status["error"] = error_msg bot_status["running"] = False def stop_bot_wrapper(): """إيقاف البوت""" global bot_status, userbot_instance, telegram_listener_instance try: if modules_info["userbot"]["available"] and userbot_instance: from userbot import stop_bot stop_bot(userbot_instance) logger.info("✅ UserBot stopped") if modules_info["telegram_listener"]["available"] and telegram_listener_instance: telegram_listener_instance.stop() logger.info("✅ TelegramListener stopped") bot_status["running"] = False bot_status["telegram_listener"] = False bot_status["error"] = None logger.info("🛑 Bot stopped successfully") except Exception as e: error_msg = f"Error stopping bot: {str(e)}" logger.error(f"❌ {error_msg}") bot_status["error"] = error_msg def get_system_info(): """الحصول على معلومات النظام""" return { "python_version": os.environ.get('PYTHON_VERSION', '3.10'), "environment": "Hugging Face Spaces", "start_time": time.strftime('%Y-%m-%d %H:%M:%S'), "modules": {name: info["available"] for name, info in modules_info.items()} } # API endpoints لواجهة الدردشة @app.route('/api/chat', methods=['POST']) def api_chat(): """معالجة رسائل الدردشة""" try: data = request.get_json() message = data.get('message', '') assistant_id = data.get('assistant_id', 'nora') # محاكاة رد نورا responses = { 'nora': f"أهلاً بك! أنا نورا. لقد استلمت رسالتك: '{message}'. كيف يمكنني مساعدتك اليوم؟", 'assistant1': f"مساعد 1: شكراً على رسالتك '{message}'. أنا هنا لمساعدتك.", 'assistant2': f"مساعد 2: مرحباً! لقد فهمت رسالتك: '{message}'." } response = responses.get(assistant_id, f"مساعد غير معروف: {message}") return jsonify({ "success": True, "response": response, "assistant_id": assistant_id }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/api/platforms/status') def api_platforms_status(): """حالة المنصات المتصلة""" return jsonify({ "success": True, "active_connections": chat_interface_state["platforms"], "total_connections": len(chat_interface_state["platforms"]), "assistants_count": len(chat_interface_state["assistants"]) }) @app.route('/api/assistants/real', methods=['POST']) def api_add_assistant(): """إضافة مساعد حقيقي""" try: data = request.get_json() name = data.get('name', '') url = data.get('url', '') assistant = { "id": f"assistant_{len(chat_interface_state['assistants']) + 1}", "name": name, "url": url, "status": "connected", "created_at": time.time() } chat_interface_state["assistants"].append(assistant) return jsonify({ "success": True, "message": f"تم إضافة المساعد {name} بنجاح", "assistant": assistant }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/api/platforms/send', methods=['POST']) def api_send_to_platform(): """إرسال رسالة لمنصة محددة""" try: data = request.get_json() connection_id = data.get('connection_id', '') message = data.get('message', '') # محاكاة الرد من المنصة response = f"تم استلام رسالتك '{message}' في المنصة {connection_id}. هذا رد محاكاة." return jsonify({ "success": True, "response": response, "connection_id": connection_id }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/api/broadcast/real', methods=['POST']) def api_broadcast_message(): """بث رسالة لجميع المساعدين""" try: data = request.get_json() message = data.get('message', '') responses = [] # رد نورا المحلي responses.append({ "type": "local", "assistant": "نورا", "response": f"نورا: لقد استلمت رسالة البث '{message}'. شكراً للمشاركة!", "success": True }) # ردود المساعدين الخارجيين for assistant in chat_interface_state["assistants"]: responses.append({ "type": "external", "assistant": assistant["name"], "response": f"{assistant['name']}: تم استلام البث '{message}'. رد محاكاة.", "success": True }) return jsonify({ "success": True, "message": f"تم البث إلى {len(responses)} مساعد", "responses": responses }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/api/platforms/disconnect', methods=['POST']) def api_disconnect_platform(): """قطع اتصال منصة""" try: data = request.get_json() connection_id = data.get('connection_id', '') # إزالة المساعد من القائمة chat_interface_state["assistants"] = [ a for a in chat_interface_state["assistants"] if a["id"] != connection_id ] return jsonify({ "success": True, "message": f"تم قطع الاتصال بالمساعد {connection_id}" }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 # المسارات الرئيسية @app.route('/') def home(): """الصفحة الرئيسية""" return render_template_string(""" نورا - لوحة التحكم

🌟 نورا - نظام الذكاء الاصطناعي المتكامل

لوحة التحكم الرئيسية - اختر الواجهة المناسبة

🎮 لوحة تحكم البوت

🟢 النظام يعمل بشكل طبيعي

📊 إحصائيات النظام

المساعدون النشطون: 1

وقت التشغيل: 0 ثانية

الرسائل المعالجة: 0

💭 غرفة الدردشة المتعددة

🚀 واجهة الدردشة المتقدمة

استخدم واجهة الدردشة المتعددة للتواصل مع نورا والمساعدين الآخرين

📡 حالة النظام

جاري تحميل حالة النظام...

""") @app.route('/chat') def chat_interface(): """واجهة الدردشة المتعددة""" # هنا نعيد استخدام كود HTML الذي قدمته with open('chat_interface.html', 'r', encoding='utf-8') as f: chat_html = f.read() return chat_html # بقية ال endpoints تبقى كما هي @app.route('/status') def status(): """الحصول على حالة البوت""" uptime = 0 if bot_status["running"] and bot_status["start_time"]: uptime = int(time.time() - bot_status["start_time"]) return jsonify({ "bot_running": bot_status["running"], "telegram_listener_active": bot_status["telegram_listener"], "uptime_seconds": uptime, "essential_modules_available": essential_modules_available, "modules": {name: info["available"] for name, info in modules_info.items()}, "module_errors": {name: info["error"] for name, info in modules_info.items() if info["error"]}, "error": bot_status["error"], "timestamp": time.time(), "status": "success" }) @app.route('/start', methods=['POST']) def start(): """تشغيل البوت""" global main_thread if bot_status["running"]: return jsonify({"message": "Bot is already running", "status": "running"}) if not essential_modules_available: missing = [name for name, info in modules_info.items() if not info["available"] and name in ["telethon", "userbot"]] return jsonify({ "message": f"Cannot start bot: Essential modules not available ({', '.join(missing)})", "status": "error", "missing_modules": missing }), 400 try: main_thread = threading.Thread(target=start_bot_wrapper, daemon=True) main_thread.start() # انتظر قليلاً للتأكد من بدء التشغيل time.sleep(2) if bot_status["running"]: return jsonify({ "message": "Bot started successfully", "status": "started" }) else: return jsonify({ "message": f"Bot failed to start: {bot_status.get('error', 'Unknown error')}", "status": "error" }), 500 except Exception as e: return jsonify({ "message": f"Error starting bot: {str(e)}", "status": "error" }), 500 @app.route('/stop', methods=['POST']) def stop(): """إيقاف البوت""" if not bot_status["running"]: return jsonify({"message": "Bot is not running", "status": "stopped"}) try: stop_bot_wrapper() return jsonify({"message": "Bot stopped successfully", "status": "stopped"}) except Exception as e: return jsonify({ "message": f"Error stopping bot: {str(e)}", "status": "error" }), 500 @app.route('/health') def health_check(): """فحص صحة النظام""" return jsonify({ "status": "healthy", "bot_running": bot_status["running"], "essential_modules_available": essential_modules_available, "timestamp": time.time() }) @app.route('/system') def system_info(): """معلومات النظام""" return jsonify(get_system_info()) @app.route('/logs') def get_logs(): """الحصول على آخر ال logs""" log_info = { "bot_status": bot_status, "system_info": get_system_info(), "modules": {name: info for name, info in modules_info.items()}, "essential_modules_available": essential_modules_available, "last_update": time.strftime('%Y-%m-%d %H:%M:%S'), "status": "success" } return jsonify(log_info) @app.errorhandler(404) def not_found(error): """معالجة الصفحات غير الموجودة""" return jsonify({ "error": "Page not found", "status": "error" }), 404 @app.errorhandler(500) def internal_error(error): """معالجة الأخطاء الداخلية""" return jsonify({ "error": "Internal server error", "status": "error" }), 500 import os import sys import importlib.util import threading import time import logging from pathlib import Path logger = logging.getLogger(__name__) def run_all_python_files(): """ تشغيل جميع ملفات بايثون في المجلد الحالي """ current_dir = Path(__file__).parent python_files = [] # البحث عن جميع ملفات .py في المجلد الحالي for file_path in current_dir.glob("*.py"): if file_path.name != "__init__.py" and file_path.name != Path(__file__).name: python_files.append(file_path) logger.info(f"📁 وجد {len(python_files)} ملف بايثون في المجلد الحالي") # تشغيل الملفات for file_path in python_files: try: run_python_file(file_path) except Exception as e: logger.error(f"❌ خطأ في تشغيل {file_path.name}: {e}") def run_python_file(file_path): """ تشغيل ملف بايثون معين """ try: file_name = file_path.name logger.info(f"🚀 جاري تشغيل: {file_name}") # تحميل المواصفات والوحدة spec = importlib.util.spec_from_file_location(file_path.stem, file_path) module = importlib.util.module_from_spec(spec) # تشغيل الملف spec.loader.exec_module(module) logger.info(f"✅ تم تشغيل {file_name} بنجاح") return module except Exception as e: logger.error(f"❌ فشل تشغيل {file_path.name}: {e}") return None def run_python_files_in_threads(): """ تشغيل جميع ملفات بايثون في خيوط منفصلة """ current_dir = Path(__file__).parent python_files = [] # البحث عن جميع ملفات .py في المجلد الحالي for file_path in current_dir.glob("*.py"): if file_path.name != "__init__.py" and file_path.name != Path(__file__).name: python_files.append(file_path) logger.info(f"🧵 تشغيل {len(python_files)} ملف في خيوط منفصلة") threads = [] for file_path in python_files: thread = threading.Thread( target=run_python_file, args=(file_path,), name=f"Thread-{file_path.stem}", daemon=True ) thread.start() threads.append(thread) logger.info(f"🔧 بدأ تشغيل: {file_path.name}") return threads def run_python_files_with_main(): """ تشغيل فقط الملفات التي تحتوي على دالة main() """ current_dir = Path(__file__).parent files_with_main = [] for file_path in current_dir.glob("*.py"): if file_path.name != "__init__.py" and file_path.name != Path(__file__).name: try: # فحص الملف لمعرفة إذا كان يحتوي على دالة main with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if 'def main(' in content or 'if __name__ == "__main__"' in content: files_with_main.append(file_path) except Exception as e: logger.warning(f"⚠️ لا يمكن قراءة {file_path.name}: {e}") logger.info(f"🎯 وجد {len(files_with_main)} ملف يحتوي على دالة main") for file_path in files_with_main: try: module = run_python_file(file_path) # إذا كان الملف يحتوي على دالة main، قم باستدعائها if hasattr(module, 'main'): logger.info(f"🔧 استدعاء main() في {file_path.name}") module.main() except Exception as e: logger.error(f"❌ خطأ في استدعاء main() لـ {file_path.name}: {e}") def run_specific_files(file_patterns): """ تشغيل ملفات محددة بناء على أنماط """ current_dir = Path(__file__).parent matched_files = [] for pattern in file_patterns: for file_path in current_dir.glob(pattern): if file_path.is_file() and file_path.suffix == '.py': matched_files.append(file_path) logger.info(f"🎯 تشغيل {len(matched_files)} ملف مطابق للأنماط") for file_path in matched_files: run_python_file(file_path) return matched_files def monitor_and_restart_files(): """ مراقبة الملفات وإعادة تشغيلها عند التعديل """ current_dir = Path(__file__).parent file_timestamps = {} while True: try: for file_path in current_dir.glob("*.py"): if file_path.name != "__init__.py" and file_path.name != Path(__file__).name: current_mtime = file_path.stat().st_mtime if file_path not in file_timestamps: # أول مرة نرى الملف، قم بتشغيله file_timestamps[file_path] = current_mtime threading.Thread( target=run_python_file, args=(file_path,), daemon=True ).start() elif current_mtime > file_timestamps[file_path]: # الملف تم تعديله، أعد تشغيله logger.info(f"🔄 إعادة تشغيل {file_path.name} (تم التعديل)") file_timestamps[file_path] = current_mtime threading.Thread( target=run_python_file, args=(file_path,), daemon=True ).start() time.sleep(5) # فحص كل 5 ثواني except Exception as e: logger.error(f"❌ خطأ في المراقبة: {e}") time.sleep(10) import os import subprocess def run_all_python_files(): # الحصول على مسار المجلد الحالي current_directory = os.getcwd() # البحث عن جميع ملفات بايثون في المجلد for filename in os.listdir(current_directory): if filename.endswith('.py'): # بناء المسار الكامل للملف file_path = os.path.join(current_directory, filename) print(f'تشغيل: {file_path}') # تشغيل الملف subprocess.run(['python', file_path]) # استدعاء الدالة run_all_python_files() if __name__ == '__main__': logger.info("🚀 Starting Flask application...") logger.info(f"📦 Essential Modules Available: {essential_modules_available}") for name, info in modules_info.items(): status = "🟢" if info["available"] else "🔴" logger.info(f"📦 {name}: {status} {info['error'] or ''}") app.run(debug=False, host='0.0.0.0', port=7860)