Spaces:
Runtime error
Runtime error
| import threading | |
| import time | |
| import uuid | |
| class TaskManager: | |
| def handle_completion (self, task_id): | |
| print(f"Task {task_id} completed") | |
| self.tasks[task_id]['status'] = 'completed' | |
| on_task_complete = handle_completion | |
| def __init__(self): | |
| self.tasks = {} | |
| self.lock = threading.Lock() | |
| def create_task(self, target, *args): | |
| task_id = str(uuid.uuid4()) | |
| thread = threading.Thread(target=self._run_task, args=(task_id, target, *args)) | |
| with self.lock: | |
| self.tasks[task_id] = {'progress': "0", 'status': 'running'} | |
| thread.start() | |
| return task_id | |
| def _run_task(self, task_id, target, *args): | |
| try: | |
| target(task_id,*args, progress_callback=lambda progress: self._update_progress(task_id, progress)) | |
| with self.lock: | |
| self.tasks[task_id]['status'] = 'completed' | |
| except Exception as e: | |
| with self.lock: | |
| self.tasks[task_id]['status'] = 'failed' | |
| self.tasks[task_id]['error'] = str(e) | |
| def _update_progress(self, task_id, progress): | |
| with self.lock: | |
| if task_id in self.tasks: | |
| self.tasks[task_id]['progress'] = progress | |
| def get_progress(self, task_id): | |
| with self.lock: | |
| return self.tasks.get(task_id, {'status': 'not found'}) | |