| """Orchestration & compute manager: local scheduler + placeholders for Ray/K8s.""" | |
| import threading | |
| import queue | |
| import time | |
| from typing import Callable, Any | |
| class LocalScheduler: | |
| def __init__(self, worker_count: int = 2): | |
| self.q = queue.Queue() | |
| self.workers = [] | |
| self.worker_count = worker_count | |
| self._stop = threading.Event() | |
| def start(self): | |
| for _ in range(self.worker_count): | |
| t = threading.Thread(target=self._worker_loop, daemon=True) | |
| t.start() | |
| self.workers.append(t) | |
| def stop(self): | |
| self._stop.set() | |
| # push None sentinel | |
| for _ in self.workers: | |
| self.q.put(None) | |
| def submit(self, fn: Callable[..., Any], *args, **kwargs): | |
| self.q.put((fn, args, kwargs)) | |
| def _worker_loop(self): | |
| while not self._stop.is_set(): | |
| item = self.q.get() | |
| if item is None: | |
| break | |
| fn, args, kwargs = item | |
| try: | |
| fn(*args, **kwargs) | |
| except Exception: | |
| pass | |
| time.sleep(0.01) | |
| class RayStub: | |
| def submit(self, fn, *args, **kwargs): | |
| # placeholder for remote execution | |
| return fn(*args, **kwargs) | |