Axiovora-X / backend /orchestrator.py
ZAIDX11's picture
Add files using upload-large-folder tool
effde1c verified
"""Orchestration & compute manager: local scheduler + placeholders for Ray/K8s."""
import threading
import queue
import time
from typing import Callable, Any
class LocalScheduler:
def __init__(self, worker_count: int = 2):
self.q = queue.Queue()
self.workers = []
self.worker_count = worker_count
self._stop = threading.Event()
def start(self):
for _ in range(self.worker_count):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def stop(self):
self._stop.set()
# push None sentinel
for _ in self.workers:
self.q.put(None)
def submit(self, fn: Callable[..., Any], *args, **kwargs):
self.q.put((fn, args, kwargs))
def _worker_loop(self):
while not self._stop.is_set():
item = self.q.get()
if item is None:
break
fn, args, kwargs = item
try:
fn(*args, **kwargs)
except Exception:
pass
time.sleep(0.01)
class RayStub:
def submit(self, fn, *args, **kwargs):
# placeholder for remote execution
return fn(*args, **kwargs)