""" 작업 큐 시스템 에이전트 간 파일 접근 충돌을 방지하고 작업 순서를 자동으로 조율 """ import json from datetime import datetime, timedelta from typing import Dict, List, Optional from pathlib import Path from dataclasses import dataclass, asdict from enum import Enum from fnmatch import fnmatch class TaskStatus(Enum): """작업 상태""" PENDING = "pending" # 대기 중 QUEUED = "queued" # 큐에 등록됨 IN_PROGRESS = "in_progress" # 진행 중 COMPLETED = "completed" # 완료 FAILED = "failed" # 실패 PAUSED = "paused" # 일시 중지 @dataclass class QueuedTask: """큐에 등록된 작업""" task_id: str command: str agent: str status: TaskStatus required_files: List[str] locked_files: List[str] depends_on: Optional[str] created_at: str estimated_completion: Optional[str] progress: int = 0 def to_dict(self) -> Dict: return { **asdict(self), 'status': self.status.value } class WorkQueueManager: """작업 큐 관리자""" def __init__(self, queue_dir: str = None, config_path: str = None): import os if queue_dir is None: queue_dir = os.getenv("WORK_QUEUE_DIR", "/tmp/work-queue") if config_path is None: config_path = os.getenv("WORK_QUEUE_CONFIG", "/tmp/work-queue/config.json") self.queue_dir = Path(queue_dir) self.config_path = Path(config_path) self.queue_dir.mkdir(parents=True, exist_ok=True) self.config = self._load_config() self.tasks: Dict[str, QueuedTask] = {} self._load_tasks() def _load_config(self) -> Dict: """설정 로드""" if self.config_path.exists(): with open(self.config_path, 'r', encoding='utf-8') as f: return json.load(f) return {} def _load_tasks(self): """저장된 작업 로드""" # 여기서는 메모리에만 유지 (프로덕션에서는 DB 사용 권장) self.tasks = {} def enqueue_task( self, command: str, agent: str, required_files: List[str], depends_on: Optional[str] = None ) -> str: """작업을 큐에 추가""" task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{agent}" # 의존성 확인 if depends_on: if depends_on not in self.tasks: raise ValueError(f"Task {depends_on} not found") if self.tasks[depends_on].status != TaskStatus.COMPLETED: status = TaskStatus.QUEUED else: status = TaskStatus.PENDING else: status = TaskStatus.PENDING # 파일 잠금 확인 locked_files = self._check_file_conflicts(required_files, agent) task = QueuedTask( task_id=task_id, command=command, agent=agent, status=status, required_files=required_files, locked_files=locked_files, depends_on=depends_on, created_at=datetime.now().isoformat() + "Z", estimated_completion=None, progress=0 ) self.tasks[task_id] = task self._save_queue_status() return task_id def _check_file_conflicts(self, required_files: List[str], agent: str) -> List[str]: """파일 충돌 확인""" locked_files = [] for file in required_files: # 현재 쓰기 권한을 가진 다른 에이전트 확인 for task_id, task in self.tasks.items(): if task.status == TaskStatus.IN_PROGRESS and task.agent != agent: if self._files_conflict(file, task.required_files): locked_files.append(file) break return locked_files def _files_conflict(self, file1: str, files2: List[str]) -> bool: """두 파일 목록이 충돌하는지 확인""" for file2 in files2: if fnmatch(file1, file2) or fnmatch(file2, file1): return True return False def start_task(self, task_id: str) -> bool: """작업 시작""" if task_id not in self.tasks: return False task = self.tasks[task_id] # 의존성 확인 if task.depends_on: if self.tasks[task.depends_on].status != TaskStatus.COMPLETED: return False # 파일 잠금 다시 확인 if task.locked_files: return False task.status = TaskStatus.IN_PROGRESS task.estimated_completion = ( datetime.now() + timedelta(minutes=30) ).isoformat() + "Z" self._save_queue_status() return True def complete_task(self, task_id: str, progress: int = 100) -> bool: """작업 완료""" if task_id not in self.tasks: return False task = self.tasks[task_id] task.status = TaskStatus.COMPLETED task.progress = 100 # 다음 대기 중인 작업 시작 가능하도록 설정 self._trigger_dependent_tasks(task_id) self._save_queue_status() return True def fail_task(self, task_id: str, reason: str) -> bool: """작업 실패 처리""" if task_id not in self.tasks: return False task = self.tasks[task_id] task.status = TaskStatus.FAILED self._save_queue_status() return True def pause_task(self, task_id: str, reason: str) -> bool: """작업 일시 중지""" if task_id not in self.tasks: return False task = self.tasks[task_id] task.status = TaskStatus.PAUSED self._save_queue_status() return True def update_progress(self, task_id: str, progress: int) -> bool: """작업 진행률 업데이트""" if task_id not in self.tasks: return False task = self.tasks[task_id] task.progress = min(100, max(0, progress)) self._save_queue_status() return True def _trigger_dependent_tasks(self, completed_task_id: str): """완료된 작업에 의존하는 작업 트리거""" for task in self.tasks.values(): if task.depends_on == completed_task_id and task.status == TaskStatus.QUEUED: task.status = TaskStatus.PENDING # 파일 잠금 다시 확인 task.locked_files = self._check_file_conflicts(task.required_files, task.agent) def get_next_runnable_task(self) -> Optional[QueuedTask]: """실행 가능한 다음 작업 반환""" # 우선순위: 의존성 없는 작업 > 의존성 완료된 작업 pending_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.PENDING] if not pending_tasks: return None # 충돌 없는 작업 우선 for task in pending_tasks: if not task.locked_files: return task return pending_tasks[0] if pending_tasks else None def get_queue_status(self) -> Dict: """현재 큐 상태 조회""" return { "total_tasks": len(self.tasks), "in_progress": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]), "queued": len([t for t in self.tasks.values() if t.status == TaskStatus.QUEUED]), "pending": len([t for t in self.tasks.values() if t.status == TaskStatus.PENDING]), "completed": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]), "failed": len([t for t in self.tasks.values() if t.status == TaskStatus.FAILED]), "tasks": {task_id: task.to_dict() for task_id, task in self.tasks.items()} } def get_task(self, task_id: str) -> Optional[QueuedTask]: """작업 정보 조회""" return self.tasks.get(task_id) def list_tasks(self, status: Optional[TaskStatus] = None, agent: Optional[str] = None) -> List[QueuedTask]: """작업 목록 조회""" tasks = list(self.tasks.values()) if status: tasks = [t for t in tasks if t.status == status] if agent: tasks = [t for t in tasks if t.agent == agent] return sorted(tasks, key=lambda t: t.created_at) def _save_queue_status(self): """큐 상태를 work-queue.md로 저장""" queue_file = self.queue_dir / "work-queue.md" # 상태별로 작업 분류 in_progress_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS] queued_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.QUEUED] pending_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.PENDING] # Markdown 생성 md_content = f"""# 📋 활성 작업 큐 **생성일**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} **마지막 업데이트**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ## 🔴 진행 중 (현재 잠금) """ if in_progress_tasks: for idx, task in enumerate(in_progress_tasks, 1): md_content += f"""### [{idx}] {task.agent} - {task.command} (in_progress) - **작업**: {task.command} - **대상 파일**: {', '.join([f'`{f}`' for f in task.required_files])} - **시작**: {task.created_at} - **진행도**: {task.progress}% - **잠금 상태**: {', '.join([f'✅ {f}' for f in task.required_files])} --- """ else: md_content += "현재 진행 중인 작업 없음\n\n---\n\n" md_content += """## 🟠 대기 중 (예약됨) """ if queued_tasks: for idx, task in enumerate(queued_tasks, 1): md_content += f"""### [{idx}] {task.agent} - {task.command} (queued) - **작업**: {task.command} - **대상 파일**: {', '.join([f'`{f}`' for f in task.required_files])} - **선행 조건**: {task.depends_on or 'None'} - **잠금 대기 파일**: {', '.join(task.locked_files) if task.locked_files else 'None'} --- """ else: md_content += "대기 중인 작업 없음\n\n---\n\n" md_content += """## 🟡 준비 중 """ if pending_tasks: for idx, task in enumerate(pending_tasks, 1): md_content += f"""### [{idx}] {task.agent} - {task.command} (pending) - **작업**: {task.command} - **대상 파일**: {', '.join([f'`{f}`' for f in task.required_files])} - **선행 조건**: {task.depends_on or 'None'} --- """ else: md_content += "준비 중인 작업 없음\n\n" # 파일 저장 with open(queue_file, 'w', encoding='utf-8') as f: f.write(md_content) def cleanup_old_tasks(self, days: int = 7) -> int: """오래된 작업 정리""" from datetime import timezone # timezone-aware cutoff_date 사용 cutoff_date = datetime.now(timezone.utc) - timedelta(days=days) tasks_to_remove = [] for task_id, task in self.tasks.items(): created = datetime.fromisoformat(task.created_at.replace("Z", "+00:00")) if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and created < cutoff_date: tasks_to_remove.append(task_id) for task_id in tasks_to_remove: del self.tasks[task_id] if tasks_to_remove: self._save_queue_status() return len(tasks_to_remove)