Spaces:
Running
Running
| """ | |
| Learning scheduler with safeguards for laptop deployment. | |
| Schedules learning tasks with CPU, battery, and system idle checks. | |
| """ | |
| import asyncio | |
| import logging | |
| import psutil | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Callable, Optional | |
| logger = logging.getLogger(__name__) | |
| class LearningScheduler: | |
| """Schedules learning tasks with system safeguards.""" | |
| def __init__( | |
| self, | |
| max_cpu_percent: float = 50.0, | |
| min_battery_percent: float = 30.0, | |
| check_interval_seconds: int = 60, | |
| ): | |
| self.max_cpu_percent = max_cpu_percent | |
| self.min_battery_percent = min_battery_percent | |
| self.check_interval_seconds = check_interval_seconds | |
| self.scheduled_tasks = {} | |
| self.running = False | |
| self.last_run = {} | |
| def schedule_task( | |
| self, | |
| task_name: str, | |
| task_fn: Callable, | |
| interval_hours: int, | |
| run_immediately: bool = False, | |
| ): | |
| """ | |
| Schedule a learning task. | |
| Args: | |
| task_name: Task name | |
| task_fn: Async function to run | |
| interval_hours: Interval in hours | |
| run_immediately: Whether to run immediately on first check | |
| """ | |
| self.scheduled_tasks[task_name] = { | |
| "fn": task_fn, | |
| "interval": timedelta(hours=interval_hours), | |
| "last_run": None if run_immediately else datetime.utcnow(), | |
| } | |
| logger.info(f"Scheduled task: {task_name} (interval={interval_hours}h)") | |
| async def start(self): | |
| """Start the scheduler.""" | |
| if self.running: | |
| logger.warning("Scheduler already running") | |
| return | |
| self.running = True | |
| logger.info("Learning scheduler started") | |
| while self.running: | |
| try: | |
| await self._check_and_run_tasks() | |
| await asyncio.sleep(self.check_interval_seconds) | |
| except Exception as e: | |
| logger.error(f"Scheduler error: {e}") | |
| await asyncio.sleep(self.check_interval_seconds) | |
| def stop(self): | |
| """Stop the scheduler.""" | |
| self.running = False | |
| logger.info("Learning scheduler stopped") | |
| async def run_once(self, task_name: str) -> Dict[str, Any]: | |
| """ | |
| Run a task once manually. | |
| Args: | |
| task_name: Task name | |
| Returns: | |
| Task result | |
| """ | |
| if task_name not in self.scheduled_tasks: | |
| raise ValueError(f"Task not found: {task_name}") | |
| task = self.scheduled_tasks[task_name] | |
| logger.info(f"Running task manually: {task_name}") | |
| try: | |
| result = await task["fn"]() | |
| task["last_run"] = datetime.utcnow() | |
| self.last_run[task_name] = task["last_run"].isoformat() | |
| return { | |
| "task_name": task_name, | |
| "status": "success", | |
| "result": result, | |
| "timestamp": task["last_run"].isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Task failed: {task_name}: {e}") | |
| return { | |
| "task_name": task_name, | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| def is_system_idle(self) -> bool: | |
| """ | |
| Check if system is idle (low CPU usage). | |
| Returns: | |
| True if system is idle | |
| """ | |
| try: | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| is_idle = cpu_percent < self.max_cpu_percent | |
| if not is_idle: | |
| logger.debug(f"System not idle: CPU={cpu_percent:.1f}% (max={self.max_cpu_percent}%)") | |
| return is_idle | |
| except Exception as e: | |
| logger.error(f"Failed to check CPU usage: {e}") | |
| return False # Assume not idle on error | |
| def is_battery_ok(self) -> bool: | |
| """ | |
| Check if battery level is sufficient. | |
| Returns: | |
| True if battery is OK or plugged in | |
| """ | |
| try: | |
| battery = psutil.sensors_battery() | |
| # If no battery (desktop) or plugged in, always OK | |
| if battery is None or battery.power_plugged: | |
| return True | |
| # Check battery percentage | |
| is_ok = battery.percent >= self.min_battery_percent | |
| if not is_ok: | |
| logger.debug(f"Battery too low: {battery.percent:.1f}% (min={self.min_battery_percent}%)") | |
| return is_ok | |
| except Exception as e: | |
| logger.error(f"Failed to check battery: {e}") | |
| return True # Assume OK on error (might be desktop) | |
| async def _check_and_run_tasks(self): | |
| """Check and run scheduled tasks if conditions are met.""" | |
| # Check system conditions | |
| if not self.is_system_idle(): | |
| logger.debug("Skipping scheduled tasks: system not idle") | |
| return | |
| if not self.is_battery_ok(): | |
| logger.debug("Skipping scheduled tasks: battery too low") | |
| return | |
| # Check each task | |
| now = datetime.utcnow() | |
| for task_name, task in self.scheduled_tasks.items(): | |
| # Check if task is due | |
| if task["last_run"] is not None: | |
| time_since_last_run = now - task["last_run"] | |
| if time_since_last_run < task["interval"]: | |
| continue | |
| # Run task | |
| logger.info(f"Running scheduled task: {task_name}") | |
| try: | |
| result = await task["fn"]() | |
| task["last_run"] = now | |
| self.last_run[task_name] = now.isoformat() | |
| logger.info(f"Task completed: {task_name}") | |
| except Exception as e: | |
| logger.error(f"Task failed: {task_name}: {e}") | |
| # Still update last_run to avoid retry loop | |
| task["last_run"] = now | |
| self.last_run[task_name] = now.isoformat() | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get scheduler status.""" | |
| tasks_status = [] | |
| for task_name, task in self.scheduled_tasks.items(): | |
| last_run = task["last_run"] | |
| next_run = None | |
| if last_run is not None: | |
| next_run = (last_run + task["interval"]).isoformat() | |
| tasks_status.append({ | |
| "name": task_name, | |
| "interval_hours": task["interval"].total_seconds() / 3600, | |
| "last_run": last_run.isoformat() if last_run else None, | |
| "next_run": next_run, | |
| }) | |
| return { | |
| "running": self.running, | |
| "system_idle": self.is_system_idle(), | |
| "battery_ok": self.is_battery_ok(), | |
| "tasks": tasks_status, | |
| } | |