Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| ram_manager.py – Distributed RAM Offload Agent (fixed) | |
| ===================================================== | |
| ❖ الغرض | |
| -------- | |
| يوفِّر هذا الملف إضافة مستقلة إلى مشروع **AmalOffload** من أجل مشاركة الذاكرة (RAM) | |
| بين جميع العُقد التي تشغِّل المشروع. عندما ينخفض مقدار الذاكرة الحرة على إحدى | |
| العُقد إلى أقل من حدّ معيّن، تُنقل كتل بيانات إلى عُقد أخرى تملك ذاكرة حرّة. | |
| ❖ المزايا | |
| --------- | |
| * مراقبة استهلاك الذاكرة محليًّا. | |
| * إعلان واكتشاف الأقران (Peers) إن توفّرت وحدات المشروع. | |
| * واجهة HTTP بسيطة عبر Flask: | |
| - GET /ram_status → حالة الذاكرة. | |
| - POST /ram_store → استلام كتلة بيانات (Base64) وتخزينها في الذاكرة. | |
| - GET /ram_fetch/<id> → إرجاع كتلة بيانات محفوظة. | |
| - GET /ram_info → معلومات عامة عن التخزين. | |
| - GET /health → فحص الصحة. | |
| طريقة التشغيل (أمثلة): | |
| --------------------- | |
| python ram_manager.py --ram-limit 2048 --chunk 64 --interval 5 --port 8765 | |
| أو باستخدام متغيّرات البيئة: | |
| export RAM_THRESHOLD_MB=2048 | |
| export RAM_CHUNK_MB=64 | |
| export RAM_CHECK_INTERVAL=10 | |
| export RAM_PORT=8765 | |
| python ram_manager.py | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import psutil | |
| import time | |
| import threading | |
| import socket | |
| import base64 | |
| import uuid | |
| import argparse | |
| import logging | |
| from typing import Dict, List, Optional | |
| try: | |
| from flask import Flask, request, jsonify | |
| except ImportError as exc: | |
| raise RuntimeError("Flask غير مُثبّت. نفِّذ: pip install flask") from exc | |
| # إعداد السجلات | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # استيراد نظام الاكتشاف من المشروع (اختياري) | |
| DISCOVERY_AVAILABLE = False | |
| try: | |
| from peer_discovery import PEERS, get_local_ip, get_peers # type: ignore | |
| DISCOVERY_AVAILABLE = True | |
| logger.info("✅ تم تحميل نظام الاكتشاف الموزع (peer_discovery)") | |
| except Exception: | |
| logger.warning("⚠️ نظام peer_discovery غير متوفر، سيتم استخدام بدائل بسيطة") | |
| # استيراد peer_registry (Zeroconf) إن وجد | |
| PEER_REGISTRY_AVAILABLE = False | |
| try: | |
| from peer_registry import discover_peers as discover_peers_zeroconf # type: ignore | |
| PEER_REGISTRY_AVAILABLE = True | |
| logger.info("✅ تم تحميل نظام تسجيل الأقران (peer_registry)") | |
| except Exception: | |
| logger.warning("⚠️ نظام peer_registry غير متوفر") | |
| # الإعدادات الافتراضية (يمكن تعديلها عبر متغيّرات البيئة أو وسيطات سطر الأوامر) | |
| RAM_LIMIT_MB = int(os.getenv("RAM_THRESHOLD_MB", "2048")) # حدّ الذاكرة الحرة قبل التفريغ | |
| CHUNK_MB = int(os.getenv("RAM_CHUNK_MB", "64")) # حجم الكتلة المُرسلة | |
| CHECK_INTERVAL = int(os.getenv("RAM_CHECK_INTERVAL", "10")) # فترة الفحص بالثواني | |
| RAM_PORT = int(os.getenv("RAM_PORT", "8765")) # منفذ واجهة خدمة الذاكرة | |
| app = Flask(__name__) | |
| # مخازن الذاكرة | |
| remote_chunks: Dict[str, bytes] = {} # كتل وصلت من أقران أخرى | |
| local_chunks: Dict[str, bytes] = {} # تخزين محلي إن أردت استخدامه لاحقًا | |
| # ─────────────────── وظائف مساعدة للذاكرة ─────────────────── | |
| def get_free_ram_mb() -> int: | |
| """إرجاع الذاكرة الحرّة بالميغابايت.""" | |
| return int(psutil.virtual_memory().available // (1024 * 1024)) | |
| def get_ram_usage_percent() -> float: | |
| """إرجاع نسبة استخدام الذاكرة.""" | |
| return float(psutil.virtual_memory().percent) | |
| def get_system_info() -> Dict[str, int | float]: | |
| """الحصول على معلومات النظام الحالية الخاصة بالذاكرة.""" | |
| memory = psutil.virtual_memory() | |
| return { | |
| "total_mb": int(memory.total // (1024 * 1024)), | |
| "available_mb": int(memory.available // (1024 * 1024)), | |
| "used_mb": int(memory.used // (1024 * 1024)), | |
| "usage_percent": float(memory.percent), | |
| "threshold_mb": int(RAM_LIMIT_MB), | |
| } | |
| # ─────────────────── واجهة HTTP ─────────────────── | |
| def ram_status(): | |
| """إرجاع كميّة الذاكرة الحرّة ومعلومات تفصيلية.""" | |
| return jsonify(get_system_info()) | |
| def ram_store(): | |
| """تلقّي كتلة بيانات وتخزينها في الذاكرة (remote_chunks).""" | |
| try: | |
| payload = request.get_json(force=True, silent=False) | |
| if not payload or not isinstance(payload, dict): | |
| return jsonify({"error": "بيانات غير صالحة"}), 400 | |
| cid: Optional[str] = payload.get("id") | |
| blob_b64: Optional[str] = payload.get("data") | |
| if not cid or not blob_b64: | |
| return jsonify({"error": "المعرّف أو البيانات مفقودة"}), 400 | |
| # فحص المساحة المتاحة قبل التخزين (شرط بسيط احترازي) | |
| if get_free_ram_mb() < max(64, RAM_LIMIT_MB // 2): | |
| return jsonify({"error": "مساحة ذاكرة غير كافية"}), 507 | |
| remote_chunks[cid] = base64.b64decode(blob_b64.encode()) | |
| logger.info("✅ تم تخزين كتلة بيانات: %s (%d بايت)", cid, len(remote_chunks[cid])) | |
| return jsonify({"status": "stored", "id": cid, "size_bytes": len(remote_chunks[cid])}) | |
| except Exception as e: | |
| logger.exception("❌ خطأ في التخزين") | |
| return jsonify({"error": str(e)}), 500 | |
| def ram_fetch(cid: str): | |
| """جلب كتلة بيانات مخزنة عبر معرّفها.""" | |
| blob = remote_chunks.get(cid) | |
| if blob is None: | |
| return jsonify({"error": "الكتلة غير موجودة"}), 404 | |
| return jsonify({ | |
| "id": cid, | |
| "data": base64.b64encode(blob).decode(), | |
| "size_bytes": len(blob), | |
| }) | |
| def ram_info(): | |
| """معلومات عامة عن الكتل المخزنة محليًا وبعيدًا.""" | |
| return jsonify({ | |
| "local_chunks": len(local_chunks), | |
| "remote_chunks": len(remote_chunks), | |
| "local_size_mb": sum(len(b) for b in local_chunks.values()) // (1024 * 1024), | |
| "remote_size_mb": sum(len(b) for b in remote_chunks.values()) // (1024 * 1024), | |
| }) | |
| def health(): | |
| """فحص صحة الخدمة.""" | |
| return jsonify({ | |
| "status": "healthy", | |
| "service": "ram_manager", | |
| "port": RAM_PORT, | |
| "free_ram_mb": get_free_ram_mb(), | |
| }) | |
| # ─────────────────── وظائف داخليّة ─────────────────── | |
| def start_api(): | |
| """تشغيل خادم Flask في خيط منفصل باستخدام Werkzeug server.""" | |
| try: | |
| from werkzeug.serving import make_server | |
| host = "0.0.0.0" | |
| server = make_server(host, RAM_PORT, app) | |
| logger.info("🚀 بدء خادم الذاكرة الموزعة على %s:%d", host, RAM_PORT) | |
| server.serve_forever() | |
| except Exception as e: | |
| logger.error("❌ فشل في بدء الخادم: %s", e) | |
| def check_peer_availability(ip: str) -> bool: | |
| """فحص توفر قرين عبر endpoint /health.""" | |
| try: | |
| import requests # محليًا لتفادي كونه تبعية صلبة إن لم تُستخدم | |
| r = requests.get(f"http://{ip}:{RAM_PORT}/health", timeout=3) | |
| return r.status_code == 200 | |
| except Exception: | |
| return False | |
| def _discover_peers_basic_scan(limit: int = 20) -> List[str]: | |
| """اكتشاف بسيط بمسح الشبكة إن لم تتوفر آليات المشروع.""" | |
| try: | |
| if DISCOVERY_AVAILABLE: | |
| local_ip = get_local_ip() | |
| else: | |
| local_ip = socket.gethostbyname(socket.gethostname()) | |
| base_ip = ".".join(local_ip.split(".")[:-1]) | |
| local_last = int(local_ip.split(".")[-1]) | |
| except Exception: | |
| return [] | |
| peers_ips: List[str] = [] | |
| for i in range(1, limit + 1): | |
| if i == local_last: | |
| continue | |
| ip = f"{base_ip}.{i}" | |
| if check_peer_availability(ip): | |
| peers_ips.append(ip) | |
| logger.info("🔍 اكتشف مسح شبكة: %s", ip) | |
| return peers_ips | |
| def discover_peers() -> List[str]: | |
| """الحصول على قائمة IPs للأقران المتاحين، بدون تكرار.""" | |
| peers_ips: List[str] = [] | |
| # 1) Zeroconf عبر peer_registry إن توفر | |
| if PEER_REGISTRY_AVAILABLE: | |
| try: | |
| peers_data = discover_peers_zeroconf(timeout=2) # type: ignore[arg-type] | |
| for peer in peers_data: | |
| if isinstance(peer, dict) and 'ip' in peer: | |
| ip = str(peer['ip']) | |
| if ip and ip not in peers_ips and check_peer_availability(ip): | |
| peers_ips.append(ip) | |
| logger.info("🔍 اكتشف zeroconf: %s", ip) | |
| except Exception as e: | |
| logger.warning("⚠️ خطأ في zeroconf: %s", e) | |
| # 2) نظام الاكتشاف الداخلي للمشروع إن توفر | |
| if DISCOVERY_AVAILABLE: | |
| try: | |
| plist = get_peers() if callable(get_peers) else [] # type: ignore[misc] | |
| for p in plist: | |
| if isinstance(p, str) and p.startswith("http://"): | |
| # استخراج IP من URL | |
| try: | |
| ip = p.split("//", 1)[1].split(":", 1)[0] | |
| except Exception: | |
| continue | |
| if ip and ip not in peers_ips and check_peer_availability(ip): | |
| peers_ips.append(ip) | |
| logger.info("🔍 اكتشف peer_discovery: %s", ip) | |
| except Exception as e: | |
| logger.warning("⚠️ خطأ في peer_discovery: %s", e) | |
| # 3) بديل: مسح بسيط للشبكة | |
| if not peers_ips: | |
| peers_ips = _discover_peers_basic_scan(limit=20) | |
| logger.info("📊 إجمالي الأقران المكتشفين: %d", len(peers_ips)) | |
| return peers_ips | |
| def offload_chunk(blob: bytes, peer_ip: str) -> bool: | |
| """إرسال كتلة بيانات إلى قرين محدّد عبر /ram_store.""" | |
| try: | |
| import requests | |
| chunk_id = str(uuid.uuid4()) | |
| r = requests.post( | |
| f"http://{peer_ip}:{RAM_PORT}/ram_store", | |
| json={"id": chunk_id, "data": base64.b64encode(blob).decode()}, | |
| timeout=10 | |
| ) | |
| if r.status_code == 200: | |
| logger.info("✅ تم إرسال كتلة %dMB إلى %s", len(blob) // (1024 * 1024), peer_ip) | |
| return True | |
| logger.warning("⚠️ رفض التخزين من %s: %s", peer_ip, r.status_code) | |
| return False | |
| except Exception as e: | |
| logger.warning("🔌 فشل الإرسال إلى %s: %s", peer_ip, e) | |
| return False | |
| def create_sample_data(size_mb: int) -> bytes: | |
| """إنشاء بيانات عشوائية للاختبار بعدد ميغابايتات محدّد.""" | |
| return os.urandom(size_mb * 1024 * 1024) | |
| def register_with_peer_registry(): | |
| """تسجيل هذه العقدة في نظام Zeroconf (اختياري).""" | |
| if not PEER_REGISTRY_AVAILABLE: | |
| return | |
| try: | |
| from peer_registry import register_service # type: ignore | |
| local_ip = get_local_ip() if DISCOVERY_AVAILABLE else socket.gethostbyname(socket.gethostname()) | |
| register_service(local_ip, RAM_PORT, load=0.0) | |
| logger.info("✅ تم تسجيل العقدة في Zeroconf: %s:%d", local_ip, RAM_PORT) | |
| except Exception as e: | |
| logger.warning("⚠️ فشل التسجيل في Zeroconf: %s", e) | |
| def monitor_loop(): | |
| """مراقبة الذاكرة واستدعاء offload عند الحاجة.""" | |
| logger.info("🔍 بدء مراقبة الذاكرة...") | |
| # تسجيل العقدة إن أمكن | |
| register_with_peer_registry() | |
| while True: | |
| try: | |
| free_mb = get_free_ram_mb() | |
| usage_percent = get_ram_usage_percent() | |
| # تسجيل الحالة كل ~30 ثانية (تقريبية بدون مؤقّت إضافي) | |
| if int(time.time()) % 30 == 0: | |
| logger.info("📊 حالة الذاكرة: %dMB حرّة (%.1f%% مستخدمة)", free_mb, usage_percent) | |
| # إن كانت الذاكرة الحرة أقل من الحدّ، حاول التفريغ | |
| if free_mb < RAM_LIMIT_MB: | |
| logger.warning("🚨 الذاكرة منخفضة: %dMB (الحد: %dMB)", free_mb, RAM_LIMIT_MB) | |
| peers = discover_peers() | |
| if not peers: | |
| logger.info("👥 لا يوجد أقران متاحون حاليًا.") | |
| else: | |
| # ⚠️ في التطبيق الفعلي، ينبغي تفريغ بيانات حقيقية من التطبيق | |
| blob = create_sample_data(CHUNK_MB) | |
| for ip in peers: | |
| if offload_chunk(blob, ip): | |
| logger.info("📤 تم تفريغ %dMB إلى %s", CHUNK_MB, ip) | |
| break | |
| else: | |
| logger.warning("❌ جميع الأقران رفضوا التخزين") | |
| time.sleep(CHECK_INTERVAL) | |
| except Exception as e: | |
| logger.error("💥 خطأ في مراقبة الذاكرة: %s", e) | |
| time.sleep(CHECK_INTERVAL) | |
| def main(): | |
| """الدالة الرئيسية لتشغيل الخدمة.""" | |
| global RAM_LIMIT_MB, CHUNK_MB, CHECK_INTERVAL, RAM_PORT | |
| parser = argparse.ArgumentParser(description="مدير الذاكرة الموزعة") | |
| parser.add_argument("--ram-limit", type=int, default=RAM_LIMIT_MB, | |
| help="حد الذاكرة الحرّة بالميجابايت") | |
| parser.add_argument("--chunk", type=int, default=CHUNK_MB, | |
| help="حجم الكتلة بالميجابايت") | |
| parser.add_argument("--interval", type=int, default=CHECK_INTERVAL, | |
| help="فترة المراقبة بالثواني") | |
| parser.add_argument("--port", type=int, default=RAM_PORT, | |
| help="منفذ الخدمة") | |
| args = parser.parse_args() | |
| # تحديث الإعدادات من الوسائط | |
| RAM_LIMIT_MB = int(args.ram_limit) | |
| CHUNK_MB = int(args.chunk) | |
| CHECK_INTERVAL = int(args.interval) | |
| RAM_PORT = int(args.port) | |
| logger.info("🚀 بدء تشغيل مدير الذاكرة الموزعة") | |
| logger.info("⚙️ الإعدادات: الحد %dMB, الكتلة %dMB, الفاصل %ds, المنفذ %d", | |
| RAM_LIMIT_MB, CHUNK_MB, CHECK_INTERVAL, RAM_PORT) | |
| # بدء الخادم في خيط منفصل | |
| server_thread = threading.Thread(target=start_api, daemon=True) | |
| server_thread.start() | |
| # بدء حلقة المراقبة | |
| monitor_loop() | |
| if __name__ == "__main__": | |
| main() | |