amalCoreFlow / external_server.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
#!/usr/bin/env python3
"""
external_server.py โ€” ุณูŠุฑูุฑ ู…ุฑูƒุฒูŠ ู„ุชูˆุฒูŠุน ุงู„ู…ู‡ุงู… + Dashboard ุชูุงุนู„ูŠ
"""
import logging
import requests
import socket
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
from flask_socketio import SocketIO, emit
from peer_discovery import PEERS
from peer_discovery import PORT
logging.basicConfig(level=logging.INFO)
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}})
socketio = SocketIO(app, cors_allowed_origins="*")
connected_peers = {} # {node_id: {"cpu":%, "ram":%, "gpu":%}}
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุงู„ุชุญู‚ู‚ ู…ู† ุชูˆูุฑ ุงู„ู…ู†ูุฐ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def is_port_available(port):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', port))
return True
except OSError:
return False
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุงุฎุชูŠุงุฑ ุฃูุถู„ Peer โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def select_best_peer():
if not PEERS:
logging.warning("โš ๏ธ ู„ุง ุชูˆุฌุฏ ุฃุฌู‡ุฒุฉ ู…ุณุฌู„ุฉ ุญุงู„ูŠุงู‹.")
return None
try:
peer_loads = []
for peer_url in PEERS:
try:
# ุจู†ุงุก URL ู„ูุญุต ุงู„ุญุงู„ุฉ
status_url = peer_url.replace('/run_task', '/health')
if '/run_task' not in peer_url:
status_url = f"{peer_url}/health"
resp = requests.get(status_url, timeout=2)
if resp.ok:
data = resp.json()
# ุงูุชุฑุงุถ ุฃู† ุงู„ุฎุงุฏู… ูŠุนูŠุฏ cpu_load ุฃูˆ ุงุณุชุฎุฏุงู… ู‚ูŠู…ุฉ ุงูุชุฑุงุถูŠุฉ
cpu_load = data.get("cpu_load", 50)
peer_loads.append((peer_url, cpu_load))
logging.info(f"โœ… {peer_url} - ุงู„ุญู…ู„: {cpu_load}%")
except Exception as e:
logging.warning(f"โŒ ู„ุง ูŠู…ูƒู† ุงู„ูˆุตูˆู„ ุฅู„ู‰ {peer_url}: {e}")
continue
if not peer_loads:
return None
# ุงุฎุชูŠุงุฑ ุงู„ุฃู‚ู„ ุญู…ู„ู‹ุง
best_peer = min(peer_loads, key=lambda x: x[1])
logging.info(f"๐ŸŽฏ ุฃูุถู„ ุฌู‡ุงุฒ: {best_peer[0]} ู…ุน ุญู…ู„ {best_peer[1]}%")
return best_peer[0]
except Exception as e:
logging.error(f"โŒ ุฎุทุฃ ููŠ ุงุฎุชูŠุงุฑ ุงู„ู€ Peer: {e}")
return None
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ API ุชูˆุฒูŠุน ุงู„ู…ู‡ุงู… โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@app.route("/submit_task", methods=["POST"])
def submit_task():
data = request.get_json()
if not data or "func" not in data:
return jsonify({"error": "ูŠุฌุจ ุชุญุฏูŠุฏ ุงุณู… ุงู„ุฏุงู„ุฉ (func)"}), 400
peer = select_best_peer()
if not peer:
return jsonify({"error": "ู„ุง ุชูˆุฌุฏ ุฃุฌู‡ุฒุฉ ู…ุชุงุญุฉ ุญุงู„ูŠุงู‹"}), 503
try:
# ุชุฃูƒุฏ ู…ู† ุฃู† ุนู†ูˆุงู† ุงู„ู€ Peer ุตุญูŠุญ
if not peer.startswith('http'):
peer = f"http://{peer}"
logging.info(f"๐Ÿ“ค ุฅุฑุณุงู„ ุงู„ู…ู‡ู…ุฉ ุฅู„ู‰: {peer}")
resp = requests.post(peer, json=data, timeout=30)
if resp.ok:
result = resp.json()
logging.info(f"โœ… ุชู… ุชู†ููŠุฐ ุงู„ู…ู‡ู…ุฉ ุจู†ุฌุงุญ ุนู„ู‰ {peer}")
return jsonify({"status": "success", "result": result, "executed_on": peer})
else:
logging.error(f"โŒ ูุดู„ ุชู†ููŠุฐ ุงู„ู…ู‡ู…ุฉ ุนู„ู‰ {peer}: {resp.status_code}")
return jsonify({"error": f"ูุดู„ ุฅุฑุณุงู„ ุงู„ู…ู‡ู…ุฉ: {resp.text}"}), 500
except requests.exceptions.Timeout:
logging.error(f"โฐ ุงู†ุชู‡ุช ุงู„ู…ู‡ู„ุฉ ุฃุซู†ุงุก ุงู„ุงุชุตุงู„ ุจู€ {peer}")
return jsonify({"error": "ุงู†ุชู‡ุช ู…ู‡ู„ุฉ ุงู„ุงุชุตุงู„"}), 408
except Exception as e:
logging.error(f"โŒ ุฎุทุฃ ููŠ ุฅุฑุณุงู„ ุงู„ู…ู‡ู…ุฉ ุฅู„ู‰ {peer}: {e}")
return jsonify({"error": str(e)}), 500
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ API ุชุญุฏูŠุซ ุญุงู„ุฉ ุงู„ุฃุฌู‡ุฒุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@app.route("/update_status", methods=["POST"])
def update_status():
data = request.json
node_id = data.get("node_id")
if not node_id:
return jsonify({"error": "node_id ู…ุทู„ูˆุจ"}), 400
connected_peers[node_id] = {
"cpu": data.get("cpu", 0),
"ram": data.get("ram", 0),
"gpu": data.get("gpu", 0),
"last_update": "now" # ูŠู…ูƒู† ุฅุถุงูุฉ timestamp
}
socketio.emit("update_peers", connected_peers, broadcast=True)
logging.info(f"๐Ÿ“Š ุชู… ุชุญุฏูŠุซ ุญุงู„ุฉ {node_id}")
return jsonify({"status": "ok"})
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุตูุญุฉ ุงู„ุญุงู„ุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@app.route("/status")
def status():
return jsonify({
"connected_peers": connected_peers,
"available_peers": list(PEERS),
"total_peers": len(PEERS)
})
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุตูุญุฉ Dashboard โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@app.route("/")
def index():
return render_template("dashboard.html", peers=connected_peers)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุงุณุชู‚ุจุงู„ ุชุญุฏูŠุซุงุช ุงู„ุญุงู„ุฉ ุนุจุฑ WebSocket โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@socketio.on("status_update")
def handle_status_update(data):
connected_peers.update(data)
emit("update_peers", connected_peers, broadcast=True)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฏุฑุฏุดุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
@socketio.on("send_message")
def handle_message(data):
emit("receive_message", data, broadcast=True)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุชุดุบูŠู„ ุงู„ุณูŠุฑูุฑ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if __name__ == "__main__":
# ู…ุญุงูˆู„ุฉ ู…ู†ุงูุฐ ู…ุฎุชู„ูุฉ
ports_to_try = [5005, 5006, 5007, 5008, 5009]
selected_port = None
for port in ports_to_try:
if is_port_available(port):
selected_port = port
break
if selected_port is None:
logging.error("โŒ ู„ุง ุชูˆุฌุฏ ู…ู†ุงูุฐ ู…ุชุงุญุฉ. ุญุงูˆู„ ุฅุบู„ุงู‚ ุงู„ุชุทุจูŠู‚ุงุช ุงู„ุฃุฎุฑู‰.")
exit(1)
logging.info(f"๐Ÿš€ ุจุฏุก ุงู„ุณูŠุฑูุฑ ุงู„ู…ุฑูƒุฒูŠ ู…ุน Dashboard ุนู„ู‰ ุงู„ู…ู†ูุฐ {selected_port}")
try:
socketio.run(app, host="0.0.0.0", port=selected_port, debug=False)
except OSError as e:
logging.error(f"โŒ ูุดู„ ุชุดุบูŠู„ ุงู„ุณูŠุฑูุฑ ุนู„ู‰ ุงู„ู…ู†ูุฐ {selected_port}: {e}")
except KeyboardInterrupt:
logging.info("โน๏ธ ุฅูŠู‚ุงู ุงู„ุณูŠุฑูุฑ...")
except Exception as e:
logging.error(f"โŒ ุฎุทุฃ ุบูŠุฑ ู…ุชูˆู‚ุน: {e}")