samchun-gemini / work_queue.py
JHyeok5's picture
Upload folder using huggingface_hub
766d29f verified
"""
μž‘μ—… 큐 μ‹œμŠ€ν…œ
μ—μ΄μ „νŠΈ κ°„ 파일 μ ‘κ·Ό μΆ©λŒμ„ λ°©μ§€ν•˜κ³  μž‘μ—… μˆœμ„œλ₯Ό μžλ™μœΌλ‘œ 쑰율
"""
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from pathlib import Path
from dataclasses import dataclass, asdict
from enum import Enum
from fnmatch import fnmatch
class TaskStatus(Enum):
"""μž‘μ—… μƒνƒœ"""
PENDING = "pending" # λŒ€κΈ° 쀑
QUEUED = "queued" # 큐에 등둝됨
IN_PROGRESS = "in_progress" # μ§„ν–‰ 쀑
COMPLETED = "completed" # μ™„λ£Œ
FAILED = "failed" # μ‹€νŒ¨
PAUSED = "paused" # μΌμ‹œ 쀑지
@dataclass
class QueuedTask:
"""큐에 λ“±λ‘λœ μž‘μ—…"""
task_id: str
command: str
agent: str
status: TaskStatus
required_files: List[str]
locked_files: List[str]
depends_on: Optional[str]
created_at: str
estimated_completion: Optional[str]
progress: int = 0
def to_dict(self) -> Dict:
return {
**asdict(self),
'status': self.status.value
}
class WorkQueueManager:
"""μž‘μ—… 큐 κ΄€λ¦¬μž"""
def __init__(self, queue_dir: str = None, config_path: str = None):
import os
if queue_dir is None:
queue_dir = os.getenv("WORK_QUEUE_DIR", "/tmp/work-queue")
if config_path is None:
config_path = os.getenv("WORK_QUEUE_CONFIG", "/tmp/work-queue/config.json")
self.queue_dir = Path(queue_dir)
self.config_path = Path(config_path)
self.queue_dir.mkdir(parents=True, exist_ok=True)
self.config = self._load_config()
self.tasks: Dict[str, QueuedTask] = {}
self._load_tasks()
def _load_config(self) -> Dict:
"""μ„€μ • λ‘œλ“œ"""
if self.config_path.exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _load_tasks(self):
"""μ €μž₯된 μž‘μ—… λ‘œλ“œ"""
# μ—¬κΈ°μ„œλŠ” λ©”λͺ¨λ¦¬μ—λ§Œ μœ μ§€ (ν”„λ‘œλ•μ…˜μ—μ„œλŠ” DB μ‚¬μš© ꢌμž₯)
self.tasks = {}
def enqueue_task(
self,
command: str,
agent: str,
required_files: List[str],
depends_on: Optional[str] = None
) -> str:
"""μž‘μ—…μ„ 큐에 μΆ”κ°€"""
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{agent}"
# μ˜μ‘΄μ„± 확인
if depends_on:
if depends_on not in self.tasks:
raise ValueError(f"Task {depends_on} not found")
if self.tasks[depends_on].status != TaskStatus.COMPLETED:
status = TaskStatus.QUEUED
else:
status = TaskStatus.PENDING
else:
status = TaskStatus.PENDING
# 파일 잠금 확인
locked_files = self._check_file_conflicts(required_files, agent)
task = QueuedTask(
task_id=task_id,
command=command,
agent=agent,
status=status,
required_files=required_files,
locked_files=locked_files,
depends_on=depends_on,
created_at=datetime.now().isoformat() + "Z",
estimated_completion=None,
progress=0
)
self.tasks[task_id] = task
self._save_queue_status()
return task_id
def _check_file_conflicts(self, required_files: List[str], agent: str) -> List[str]:
"""파일 좩돌 확인"""
locked_files = []
for file in required_files:
# ν˜„μž¬ μ“°κΈ° κΆŒν•œμ„ κ°€μ§„ λ‹€λ₯Έ μ—μ΄μ „νŠΈ 확인
for task_id, task in self.tasks.items():
if task.status == TaskStatus.IN_PROGRESS and task.agent != agent:
if self._files_conflict(file, task.required_files):
locked_files.append(file)
break
return locked_files
def _files_conflict(self, file1: str, files2: List[str]) -> bool:
"""두 파일 λͺ©λ‘μ΄ μΆ©λŒν•˜λŠ”μ§€ 확인"""
for file2 in files2:
if fnmatch(file1, file2) or fnmatch(file2, file1):
return True
return False
def start_task(self, task_id: str) -> bool:
"""μž‘μ—… μ‹œμž‘"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
# μ˜μ‘΄μ„± 확인
if task.depends_on:
if self.tasks[task.depends_on].status != TaskStatus.COMPLETED:
return False
# 파일 잠금 λ‹€μ‹œ 확인
if task.locked_files:
return False
task.status = TaskStatus.IN_PROGRESS
task.estimated_completion = (
datetime.now() + timedelta(minutes=30)
).isoformat() + "Z"
self._save_queue_status()
return True
def complete_task(self, task_id: str, progress: int = 100) -> bool:
"""μž‘μ—… μ™„λ£Œ"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
task.status = TaskStatus.COMPLETED
task.progress = 100
# λ‹€μŒ λŒ€κΈ° 쀑인 μž‘μ—… μ‹œμž‘ κ°€λŠ₯ν•˜λ„λ‘ μ„€μ •
self._trigger_dependent_tasks(task_id)
self._save_queue_status()
return True
def fail_task(self, task_id: str, reason: str) -> bool:
"""μž‘μ—… μ‹€νŒ¨ 처리"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
task.status = TaskStatus.FAILED
self._save_queue_status()
return True
def pause_task(self, task_id: str, reason: str) -> bool:
"""μž‘μ—… μΌμ‹œ 쀑지"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
task.status = TaskStatus.PAUSED
self._save_queue_status()
return True
def update_progress(self, task_id: str, progress: int) -> bool:
"""μž‘μ—… μ§„ν–‰λ₯  μ—…λ°μ΄νŠΈ"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
task.progress = min(100, max(0, progress))
self._save_queue_status()
return True
def _trigger_dependent_tasks(self, completed_task_id: str):
"""μ™„λ£Œλœ μž‘μ—…μ— μ˜μ‘΄ν•˜λŠ” μž‘μ—… 트리거"""
for task in self.tasks.values():
if task.depends_on == completed_task_id and task.status == TaskStatus.QUEUED:
task.status = TaskStatus.PENDING
# 파일 잠금 λ‹€μ‹œ 확인
task.locked_files = self._check_file_conflicts(task.required_files, task.agent)
def get_next_runnable_task(self) -> Optional[QueuedTask]:
"""μ‹€ν–‰ κ°€λŠ₯ν•œ λ‹€μŒ μž‘μ—… λ°˜ν™˜"""
# μš°μ„ μˆœμœ„: μ˜μ‘΄μ„± μ—†λŠ” μž‘μ—… > μ˜μ‘΄μ„± μ™„λ£Œλœ μž‘μ—…
pending_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.PENDING]
if not pending_tasks:
return None
# 좩돌 μ—†λŠ” μž‘μ—… μš°μ„ 
for task in pending_tasks:
if not task.locked_files:
return task
return pending_tasks[0] if pending_tasks else None
def get_queue_status(self) -> Dict:
"""ν˜„μž¬ 큐 μƒνƒœ 쑰회"""
return {
"total_tasks": len(self.tasks),
"in_progress": len([t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]),
"queued": len([t for t in self.tasks.values() if t.status == TaskStatus.QUEUED]),
"pending": len([t for t in self.tasks.values() if t.status == TaskStatus.PENDING]),
"completed": len([t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]),
"failed": len([t for t in self.tasks.values() if t.status == TaskStatus.FAILED]),
"tasks": {task_id: task.to_dict() for task_id, task in self.tasks.items()}
}
def get_task(self, task_id: str) -> Optional[QueuedTask]:
"""μž‘μ—… 정보 쑰회"""
return self.tasks.get(task_id)
def list_tasks(self, status: Optional[TaskStatus] = None, agent: Optional[str] = None) -> List[QueuedTask]:
"""μž‘μ—… λͺ©λ‘ 쑰회"""
tasks = list(self.tasks.values())
if status:
tasks = [t for t in tasks if t.status == status]
if agent:
tasks = [t for t in tasks if t.agent == agent]
return sorted(tasks, key=lambda t: t.created_at)
def _save_queue_status(self):
"""큐 μƒνƒœλ₯Ό work-queue.md둜 μ €μž₯"""
queue_file = self.queue_dir / "work-queue.md"
# μƒνƒœλ³„λ‘œ μž‘μ—… λΆ„λ₯˜
in_progress_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.IN_PROGRESS]
queued_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.QUEUED]
pending_tasks = [t for t in self.tasks.values() if t.status == TaskStatus.PENDING]
# Markdown 생성
md_content = f"""# πŸ“‹ ν™œμ„± μž‘μ—… 큐
**생성일**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**λ§ˆμ§€λ§‰ μ—…λ°μ΄νŠΈ**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## πŸ”΄ μ§„ν–‰ 쀑 (ν˜„μž¬ 잠금)
"""
if in_progress_tasks:
for idx, task in enumerate(in_progress_tasks, 1):
md_content += f"""### [{idx}] {task.agent} - {task.command} (in_progress)
- **μž‘μ—…**: {task.command}
- **λŒ€μƒ 파일**: {', '.join([f'`{f}`' for f in task.required_files])}
- **μ‹œμž‘**: {task.created_at}
- **진행도**: {task.progress}%
- **잠금 μƒνƒœ**: {', '.join([f'βœ… {f}' for f in task.required_files])}
---
"""
else:
md_content += "ν˜„μž¬ μ§„ν–‰ 쀑인 μž‘μ—… μ—†μŒ\n\n---\n\n"
md_content += """## 🟠 λŒ€κΈ° 쀑 (μ˜ˆμ•½λ¨)
"""
if queued_tasks:
for idx, task in enumerate(queued_tasks, 1):
md_content += f"""### [{idx}] {task.agent} - {task.command} (queued)
- **μž‘μ—…**: {task.command}
- **λŒ€μƒ 파일**: {', '.join([f'`{f}`' for f in task.required_files])}
- **μ„ ν–‰ 쑰건**: {task.depends_on or 'None'}
- **잠금 λŒ€κΈ° 파일**: {', '.join(task.locked_files) if task.locked_files else 'None'}
---
"""
else:
md_content += "λŒ€κΈ° 쀑인 μž‘μ—… μ—†μŒ\n\n---\n\n"
md_content += """## 🟑 μ€€λΉ„ 쀑
"""
if pending_tasks:
for idx, task in enumerate(pending_tasks, 1):
md_content += f"""### [{idx}] {task.agent} - {task.command} (pending)
- **μž‘μ—…**: {task.command}
- **λŒ€μƒ 파일**: {', '.join([f'`{f}`' for f in task.required_files])}
- **μ„ ν–‰ 쑰건**: {task.depends_on or 'None'}
---
"""
else:
md_content += "μ€€λΉ„ 쀑인 μž‘μ—… μ—†μŒ\n\n"
# 파일 μ €μž₯
with open(queue_file, 'w', encoding='utf-8') as f:
f.write(md_content)
def cleanup_old_tasks(self, days: int = 7) -> int:
"""였래된 μž‘μ—… 정리"""
from datetime import timezone
# timezone-aware cutoff_date μ‚¬μš©
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
tasks_to_remove = []
for task_id, task in self.tasks.items():
created = datetime.fromisoformat(task.created_at.replace("Z", "+00:00"))
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and created < cutoff_date:
tasks_to_remove.append(task_id)
for task_id in tasks_to_remove:
del self.tasks[task_id]
if tasks_to_remove:
self._save_queue_status()
return len(tasks_to_remove)