|
|
"""
|
|
|
BOINC Client Integration
|
|
|
Handles distributed computing task submission and monitoring
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import time
|
|
|
import requests
|
|
|
from typing import Dict, List, Optional
|
|
|
from pathlib import Path
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from datetime import datetime
|
|
|
import yaml
|
|
|
|
|
|
@dataclass
|
|
|
class WorkUnit:
|
|
|
"""Represents a BOINC work unit"""
|
|
|
id: str
|
|
|
name: str
|
|
|
workunit_type: str
|
|
|
input_file: str
|
|
|
status: str
|
|
|
created_at: str
|
|
|
completed_at: Optional[str] = None
|
|
|
result_file: Optional[str] = None
|
|
|
error: Optional[str] = None
|
|
|
|
|
|
class BOINCClient:
|
|
|
"""BOINC client for distributed computing integration"""
|
|
|
|
|
|
def __init__(self, config_path: str = "config.yml"):
|
|
|
with open(config_path, 'r') as f:
|
|
|
self.config = yaml.safe_load(f)['boinc']
|
|
|
|
|
|
self.project_url = self.config['project_url']
|
|
|
self.work_dir = Path(self.config['work_dir'])
|
|
|
self.work_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
self.tasks_file = self.work_dir / "tasks.json"
|
|
|
self.tasks = self._load_tasks()
|
|
|
|
|
|
def _load_tasks(self) -> Dict[str, WorkUnit]:
|
|
|
"""Load existing tasks from disk"""
|
|
|
if self.tasks_file.exists():
|
|
|
with open(self.tasks_file, 'r') as f:
|
|
|
data = json.load(f)
|
|
|
return {k: WorkUnit(**v) for k, v in data.items()}
|
|
|
return {}
|
|
|
|
|
|
def _save_tasks(self):
|
|
|
"""Save tasks to disk"""
|
|
|
with open(self.tasks_file, 'w') as f:
|
|
|
data = {k: asdict(v) for k, v in self.tasks.items()}
|
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def submit_task(
|
|
|
self,
|
|
|
workunit_type: str,
|
|
|
input_file: str,
|
|
|
name: Optional[str] = None
|
|
|
) -> str:
|
|
|
"""
|
|
|
Submit a new work unit to BOINC
|
|
|
|
|
|
Args:
|
|
|
workunit_type: Type of analysis (variant_calling, blast_search, etc.)
|
|
|
input_file: Path to input data file
|
|
|
name: Optional custom name for the work unit
|
|
|
|
|
|
Returns:
|
|
|
Work unit ID
|
|
|
"""
|
|
|
task_id = f"wu_{int(time.time() * 1000)}"
|
|
|
|
|
|
if name is None:
|
|
|
name = f"{workunit_type}_{task_id}"
|
|
|
|
|
|
|
|
|
work_unit = WorkUnit(
|
|
|
id=task_id,
|
|
|
name=name,
|
|
|
workunit_type=workunit_type,
|
|
|
input_file=input_file,
|
|
|
status="pending",
|
|
|
created_at=datetime.now().isoformat()
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
self._simulate_submission(work_unit)
|
|
|
|
|
|
self.tasks[task_id] = work_unit
|
|
|
self._save_tasks()
|
|
|
|
|
|
return task_id
|
|
|
|
|
|
def _simulate_submission(self, work_unit: WorkUnit):
|
|
|
"""
|
|
|
Simulate BOINC submission (for development/demo purposes)
|
|
|
In production, replace with actual BOINC API calls
|
|
|
"""
|
|
|
|
|
|
task_dir = self.work_dir / work_unit.id
|
|
|
task_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
input_path = Path(work_unit.input_file)
|
|
|
if input_path.exists():
|
|
|
import shutil
|
|
|
shutil.copy(input_path, task_dir / input_path.name)
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
"task_id": work_unit.id,
|
|
|
"type": work_unit.workunit_type,
|
|
|
"input": work_unit.input_file,
|
|
|
"submitted": work_unit.created_at
|
|
|
}
|
|
|
|
|
|
with open(task_dir / "metadata.json", 'w') as f:
|
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
|
|
def get_task_status(self, task_id: str) -> Optional[WorkUnit]:
|
|
|
"""Get status of a specific task"""
|
|
|
return self.tasks.get(task_id)
|
|
|
|
|
|
def list_tasks(
|
|
|
self,
|
|
|
status: Optional[str] = None,
|
|
|
workunit_type: Optional[str] = None
|
|
|
) -> List[WorkUnit]:
|
|
|
"""
|
|
|
List all tasks with optional filtering
|
|
|
|
|
|
Args:
|
|
|
status: Filter by status (pending, running, completed, failed)
|
|
|
workunit_type: Filter by work unit type
|
|
|
"""
|
|
|
tasks = list(self.tasks.values())
|
|
|
|
|
|
if status:
|
|
|
tasks = [t for t in tasks if t.status == status]
|
|
|
|
|
|
if workunit_type:
|
|
|
tasks = [t for t in tasks if t.workunit_type == workunit_type]
|
|
|
|
|
|
return sorted(tasks, key=lambda t: t.created_at, reverse=True)
|
|
|
|
|
|
def update_task_status(self, task_id: str, status: str, **kwargs):
|
|
|
"""Update task status and additional fields"""
|
|
|
if task_id in self.tasks:
|
|
|
self.tasks[task_id].status = status
|
|
|
|
|
|
for key, value in kwargs.items():
|
|
|
if hasattr(self.tasks[task_id], key):
|
|
|
setattr(self.tasks[task_id], key, value)
|
|
|
|
|
|
if status == "completed":
|
|
|
self.tasks[task_id].completed_at = datetime.now().isoformat()
|
|
|
|
|
|
self._save_tasks()
|
|
|
|
|
|
def cancel_task(self, task_id: str) -> bool:
|
|
|
"""Cancel a pending or running task"""
|
|
|
if task_id in self.tasks:
|
|
|
task = self.tasks[task_id]
|
|
|
if task.status in ["pending", "running"]:
|
|
|
task.status = "cancelled"
|
|
|
self._save_tasks()
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def get_results(self, task_id: str) -> Optional[Path]:
|
|
|
"""Get results file for a completed task"""
|
|
|
if task_id in self.tasks:
|
|
|
task = self.tasks[task_id]
|
|
|
if task.status == "completed" and task.result_file:
|
|
|
result_path = Path(task.result_file)
|
|
|
if result_path.exists():
|
|
|
return result_path
|
|
|
return None
|
|
|
|
|
|
def get_statistics(self) -> Dict:
|
|
|
"""Get overall statistics about BOINC tasks"""
|
|
|
total = len(self.tasks)
|
|
|
by_status = {}
|
|
|
by_type = {}
|
|
|
|
|
|
for task in self.tasks.values():
|
|
|
by_status[task.status] = by_status.get(task.status, 0) + 1
|
|
|
by_type[task.workunit_type] = by_type.get(task.workunit_type, 0) + 1
|
|
|
|
|
|
completed = [t for t in self.tasks.values() if t.completed_at]
|
|
|
|
|
|
if completed:
|
|
|
avg_time = sum([
|
|
|
(datetime.fromisoformat(t.completed_at) -
|
|
|
datetime.fromisoformat(t.created_at)).total_seconds()
|
|
|
for t in completed
|
|
|
]) / len(completed)
|
|
|
else:
|
|
|
avg_time = 0
|
|
|
|
|
|
return {
|
|
|
"total_tasks": total,
|
|
|
"by_status": by_status,
|
|
|
"by_type": by_type,
|
|
|
"completed_tasks": len(completed),
|
|
|
"average_completion_time_seconds": avg_time
|
|
|
}
|
|
|
|
|
|
|
|
|
class BOINCTaskManager:
|
|
|
"""High-level task manager for common workflows"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.client = BOINCClient()
|
|
|
|
|
|
def submit_variant_calling(self, fastq_file: str) -> str:
|
|
|
"""Submit variant calling task"""
|
|
|
return self.client.submit_task(
|
|
|
workunit_type="variant_calling",
|
|
|
input_file=fastq_file,
|
|
|
name=f"variant_calling_{Path(fastq_file).stem}"
|
|
|
)
|
|
|
|
|
|
def submit_blast_search(self, sequence_file: str) -> str:
|
|
|
"""Submit BLAST search task"""
|
|
|
return self.client.submit_task(
|
|
|
workunit_type="blast_search",
|
|
|
input_file=sequence_file,
|
|
|
name=f"blast_{Path(sequence_file).stem}"
|
|
|
)
|
|
|
|
|
|
def submit_alignment(self, fastq_file: str) -> str:
|
|
|
"""Submit sequence alignment task"""
|
|
|
return self.client.submit_task(
|
|
|
workunit_type="alignment",
|
|
|
input_file=fastq_file,
|
|
|
name=f"alignment_{Path(fastq_file).stem}"
|
|
|
)
|
|
|
|
|
|
def submit_annotation(self, vcf_file: str) -> str:
|
|
|
"""Submit variant annotation task"""
|
|
|
return self.client.submit_task(
|
|
|
workunit_type="annotation",
|
|
|
input_file=vcf_file,
|
|
|
name=f"annotation_{Path(vcf_file).stem}"
|
|
|
)
|
|
|
|
|
|
def batch_submit(
|
|
|
self,
|
|
|
workunit_type: str,
|
|
|
input_files: List[str]
|
|
|
) -> List[str]:
|
|
|
"""Submit multiple tasks at once"""
|
|
|
task_ids = []
|
|
|
for input_file in input_files:
|
|
|
task_id = self.client.submit_task(workunit_type, input_file)
|
|
|
task_ids.append(task_id)
|
|
|
return task_ids
|
|
|
|