amalCoreFlow / main.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
main.py โ€” ู†ุธุงู… ุชูˆุฒูŠุน ุงู„ู…ู‡ุงู… ุงู„ุฐูƒูŠ
"""
import os
import sys
import time
import threading
import subprocess
import logging
import argparse
import socket
import random
import requests
import importlib.util
from pathlib import Path
from typing import Any
from flask import Flask, request, jsonify
from flask_cors import CORS
from peer_discovery import CENTRAL_REGISTRY_SERVERS
from peer_discovery import PORT
import external_server
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฅุนุฏุงุฏุงุช ุงู„ู…ุณุงุฑุงุช โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
FILE = Path(__file__).resolve()
BASE_DIR = FILE.parent
sys.path.insert(0, str(BASE_DIR))
def main():
print("๐Ÿ”ง CoreFlow ูŠุนู…ู„ ุงู„ุขู†...")
# ุดุบู‘ู„ ุงู„ุฎุงุฏู… ุฃูˆ ุฃูŠ ู…ู†ุทู‚ ุฃุณุงุณูŠ ู‡ู†ุง
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฅุนุฏุงุฏ ุงู„ุณุฌู„ุงุช โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
os.makedirs("logs", exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/main.log", mode="a", encoding="utf-8")
]
)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุชุญู…ูŠู„ ู…ุชุบูŠุฑุงุช ุงู„ุจูŠุฆุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
try:
from dotenv import load_dotenv
load_dotenv()
logging.info("ุชู… ุชุญู…ูŠู„ ู…ุชุบูŠุฑุงุช ุงู„ุจูŠุฆุฉ ู…ู† .env")
except ImportError:
logging.warning("python-dotenv ุบูŠุฑ ู…ุซุจู‘ูŽุชุ› ุชูŽุฎุทู‘ูŠ .env")
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุซูˆุงุจุช ุงู„ุชู‡ูŠุฆุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
CPU_PORT = int(os.getenv("CPU_PORT", "5297"))
SHARED_SECRET = os.getenv("SHARED_SECRET", "my_shared_secret_123")
PYTHON_EXE = sys.executable
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฎูŠุงุฑุงุช ุณุทุฑ ุงู„ุฃูˆุงู…ุฑ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
parser = argparse.ArgumentParser(description="ู†ุธุงู… ุชูˆุฒูŠุน ุงู„ู…ู‡ุงู… ุงู„ุฐูƒูŠ")
parser.add_argument(
"--stats-interval", "-s",
type=int,
default=0,
help="ุซูˆุงู†ูŠ ุจูŠู† ูƒู„ ุทุจุงุนุฉ ู„ุฅุญุตุงุฆูŠุฉ ุงู„ุฃู‚ุฑุงู† (0 = ู…ุฑุฉ ูˆุงุญุฏุฉ ูู‚ุท)"
)
parser.add_argument(
"--no-cli",
action="store_true",
help="ุชุนุทูŠู„ ุงู„ู‚ุงุฆู…ุฉ ุงู„ุชูุงุนู„ูŠุฉ ุญุชู‰ ุนู†ุฏ ูˆุฌูˆุฏ TTY"
)
args = parser.parse_args()
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ู…ุชุบูŠุฑุงุช ุงู„ู†ุธุงู… โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
PEERS = set() # ู…ุฌู…ูˆุนุฉ ุนู†ุงูˆูŠู† ุงู„ุฃู‚ุฑุงู† ูƒุณู„ุงุณู„ ู†ุตูŠุฉ
PEERS_INFO = {} # ู‚ุงู…ูˆุณ ู„ุญูุธ ู…ุนู„ูˆู…ุงุช ุงู„ุฃู‚ุฑุงู† ุงู„ูƒุงู…ู„ุฉ
current_server_index = 0
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฏูˆุงู„ ุงูƒุชุดุงู ุงู„ุฃู‚ุฑุงู† โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def register_service_lan():
"""ุชุณุฌูŠู„ ุงู„ุฎุฏู…ุฉ ุนู„ู‰ ุงู„ุดุจูƒุฉ ุงู„ู…ุญู„ูŠุฉ"""
while True:
try:
logging.info("ุฌุงุฑู ุชุณุฌูŠู„ ุงู„ุฎุฏู…ุฉ ุนู„ู‰ ุงู„ุดุจูƒุฉ ุงู„ู…ุญู„ูŠุฉ...")
time.sleep(10)
except Exception as e:
logging.error(f"ุฎุทุฃ ููŠ ุชุณุฌูŠู„ ุงู„ุฎุฏู…ุฉ: {e}")
def discover_lan_loop():
"""ุงูƒุชุดุงู ุงู„ุฃู‚ุฑุงู† ุนู„ู‰ ุงู„ุดุจูƒุฉ ุงู„ู…ุญู„ูŠุฉ"""
while True:
try:
logging.info("ุฌุงุฑู ู…ุณุญ ุงู„ุดุจูƒุฉ ุงู„ู…ุญู„ูŠุฉ...")
time.sleep(15)
except Exception as e:
logging.error(f"ุฎุทุฃ ููŠ ุงูƒุชุดุงู ุงู„ุฃู‚ุฑุงู†: {e}")
def fetch_central_loop():
"""ุฌู„ุจ ุชุญุฏูŠุซุงุช ู…ู† ุงู„ุณูŠุฑูุฑ ุงู„ู…ุฑูƒุฒูŠ"""
while True:
try:
logging.info("ุฌุงุฑู ุชุญุฏูŠุซ ู‚ุงุฆู…ุฉ ุงู„ุฃู‚ุฑุงู†...")
time.sleep(30)
except Exception as e:
logging.error(f"ุฎุทุฃ ููŠ ุฌู„ุจ ุงู„ุชุญุฏูŠุซุงุช: {e}")
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฏูˆุงู„ ู…ุณุงุนุฏุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def get_local_ip():
"""ุงู„ุญุตูˆู„ ุนู„ู‰ ุนู†ูˆุงู† IP ุงู„ู…ุญู„ูŠ"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def add_peer(peer_data):
"""ุฅุถุงูุฉ ู‚ุฑูŠู† ุฌุฏูŠุฏ ุฅู„ู‰ ุงู„ู†ุธุงู…"""
peer_url = f"http://{peer_data['ip']}:{peer_data['port']}/run"
if peer_url not in PEERS:
PEERS.add(peer_url)
PEERS_INFO[peer_url] = peer_data
logging.info(f"ุชู…ุช ุฅุถุงูุฉ ู‚ุฑูŠู† ุฌุฏูŠุฏ: {peer_url}")
return peer_url
def benchmark(fn, *args):
"""ู‚ูŠุงุณ ุฒู…ู† ุชู†ููŠุฐ ุงู„ุฏุงู„ุฉ"""
t0 = time.time()
res = fn(*args)
return time.time() - t0, res
def load_and_run_peer_discovery():
"""ุชุญู…ูŠู„ ูˆุชุดุบูŠู„ ู…ู„ู peer_discovery.py"""
try:
peer_discovery_path = Path(__file__).parent / "peer_discovery.py"
if not peer_discovery_path.exists():
raise FileNotFoundError("ู…ู„ู peer_discovery.py ุบูŠุฑ ู…ูˆุฌูˆุฏ")
spec = importlib.util.spec_from_file_location("peer_discovery_module", peer_discovery_path)
peer_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(peer_module)
logging.info("ุชู… ุชุญู…ูŠู„ peer_discovery.py ุจู†ุฌุงุญ")
return peer_module
except Exception as e:
logging.error(f"ุฎุทุฃ ููŠ ุชุญู…ูŠู„ peer_discovery.py: {str(e)}")
return None
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฏูˆุงู„ ุงู„ู…ู‡ุงู… โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def example_task(x: int) -> int:
"""ุฏุงู„ุฉ ู…ุซุงู„ ุจุฏูŠู„ุฉ ุฅุฐุง ู„ู… ุชูƒู† ู…ูˆุฌูˆุฏุฉ ููŠ your_tasks.py"""
return x * x
def matrix_multiply(size: int) -> list:
"""ุถุฑุจ ุงู„ู…ุตููˆูุงุช (ุจุฏูŠู„ ู…ุคู‚ุช)"""
return [[i*j for j in range(size)] for i in range(size)]
def prime_calculation(limit: int) -> list:
"""ุญุณุงุจ ุงู„ุฃุนุฏุงุฏ ุงู„ุฃูˆู„ูŠุฉ (ุจุฏูŠู„ ู…ุคู‚ุช)"""
primes = []
for num in range(2, limit):
if all(num % i != 0 for i in range(2, int(num**0.5) + 1)):
primes.append(num)
return primes
def data_processing(size: int) -> dict:
"""ู…ุนุงู„ุฌุฉ ุงู„ุจูŠุงู†ุงุช (ุจุฏูŠู„ ู…ุคู‚ุช)"""
return {i: i**2 for i in range(size)}
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฎุงุฏู… Flask โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
flask_app = Flask(__name__)
CORS(flask_app, resources={r"/*": {"origins": "*"}})
@flask_app.route("/run_task", methods=["POST"])
def run_task():
try:
data = request.get_json() if request.is_json else request.form
task_id = data.get("task_id")
if not task_id:
return jsonify(error="ูŠุฌุจ ุชุญุฏูŠุฏ task_id"), 400
if task_id == "1":
result = matrix_multiply(500)
elif task_id == "2":
result = prime_calculation(100_000)
elif task_id == "3":
result = data_processing(10_000)
else:
return jsonify(error="ู…ุนุฑู ุงู„ู…ู‡ู…ุฉ ุบูŠุฑ ุตุญูŠุญ"), 400
return jsonify(result=result)
except Exception as e:
logging.error(f"ุฎุทุฃ ููŠ ู…ุนุงู„ุฌุฉ ุงู„ู…ู‡ู…ุฉ: {str(e)}", exc_info=True)
return jsonify(error="ุญุฏุซ ุฎุทุฃ ุฏุงุฎู„ูŠ ููŠ ุงู„ุฎุงุฏู…"), 500
def start_flask_server():
ip_public = os.getenv("PUBLIC_IP", "127.0.0.1")
logging.info(f"Flask ู…ุชูˆูุฑ ุนู„ู‰: http://{ip_public}:{CPU_PORT}/run_task")
flask_app.run(host="0.0.0.0", port=CPU_PORT, debug=False)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ ุฏูˆุงู„ ุงู„ู†ุธุงู… ุงู„ุฃุณุงุณูŠุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def connect_until_success():
global CPU_PORT, current_server_index
peer_module = load_and_run_peer_discovery()
if peer_module is None:
logging.warning("ุณูŠุณุชู…ุฑ ุงู„ุชุดุบูŠู„ ุจุฏูˆู† peer_discovery.py")
return None, []
CENTRAL_REGISTRY_SERVERS = getattr(peer_module, 'CENTRAL_REGISTRY_SERVERS', [])
if not CENTRAL_REGISTRY_SERVERS:
logging.error("ู‚ุงุฆู…ุฉ ุงู„ุณูŠุฑูุฑุงุช ุงู„ู…ุฑูƒุฒูŠุฉ ูุงุฑุบุฉ")
return None, []
while True:
for port in [CPU_PORT, 5298, 5299]:
for idx, server in enumerate(CENTRAL_REGISTRY_SERVERS):
info = {
"node_id": os.getenv("NODE_ID", socket.gethostname()),
"ip": get_local_ip(),
"port": port
}
try:
resp = requests.post(f"{server}/register", json=info, timeout=5)
resp.raise_for_status()
CPU_PORT = port
current_server_index = idx
logging.info(f"ุชู… ุงู„ุงุชุตุงู„ ุจุงู„ุณูŠุฑูุฑ: {server} ุนู„ู‰ ุงู„ู…ู†ูุฐ {CPU_PORT}")
# ู…ุนุงู„ุฌุฉ ู‚ุงุฆู…ุฉ ุงู„ุฃู‚ุฑุงู† ุงู„ู…ุณุชู„ู…ุฉ
peers_list = resp.json()
peer_urls = []
for p in peers_list:
peer_url = add_peer(p)
peer_urls.append(peer_url)
return server, peer_urls
except Exception as e:
logging.warning(f"ูุดู„ ุงู„ุงุชุตุงู„ ุจู€ {server}: {str(e)}")
time.sleep(5)
def main():
"""ุงู„ุฏุงู„ุฉ ุงู„ุฑุฆูŠุณูŠุฉ ู„ุชุดุบูŠู„ ุงู„ู†ุธุงู…"""
# ุชุดุบูŠู„ ุงู„ุฎุฏู…ุงุช ุงู„ุฃุณุงุณูŠุฉ
try:
subprocess.Popen([PYTHON_EXE, "peer_server.py", "--port", str(CPU_PORT)])
subprocess.Popen([PYTHON_EXE, "load_balancer.py"])
logging.info("ุชู… ุชุดุบูŠู„ ุงู„ุฎุฏู…ุงุช ุงู„ุฎู„ููŠู‘ุฉ")
except Exception as exc:
logging.error(f"ุฎุทุฃ ุจุชุดุบูŠู„ ุงู„ุฎุฏู…ุงุช ุงู„ุฎู„ููŠุฉ: {exc}")
# ุงู„ุงุชุตุงู„ ุจุงู„ุณูŠุฑูุฑ ุงู„ู…ุฑูƒุฒูŠ
server, initial_peers = connect_until_success()
# ุชุดุบูŠู„ ุฎุงุฏู… Flask
threading.Thread(target=start_flask_server, daemon=True).start()
# ุงู„ุจู‚ุงุก ููŠ ุญู„ู‚ุฉ ุฑุฆูŠุณูŠุฉ
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("ุชู… ุฅู†ู‡ุงุก ุงู„ุจุฑู†ุงู…ุฌ.")
if __name__ == "__main__":
# ุฅุถุงูุฉ ุงู„ู‚ุฑูŠู† ุงู„ู…ุญู„ูŠ
add_peer({"ip": "127.0.0.1", "port": CPU_PORT})
# ุชุดุบูŠู„ ุฎุฏู…ุงุช ุงูƒุชุดุงู ุงู„ุฃู‚ุฑุงู†
threading.Thread(target=register_service_lan, daemon=True).start()
threading.Thread(target=discover_lan_loop, daemon=True).start()
threading.Thread(target=fetch_central_loop, daemon=True).start()
# ุจุฏุก ุงู„ู†ุธุงู… ุงู„ุฑุฆูŠุณูŠ
main()