Spaces:
Sleeping
Sleeping
File size: 4,291 Bytes
1b86fa5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | from __future__ import annotations
import asyncio
import os
import time
import traceback
from job_models import JobState, JobPhase
from agent import run_agent_job
from email_sender import send_result_email
class WorkerManager:
def __init__(self, max_concurrent: int = 2):
self.max_concurrent = max_concurrent
self.active_jobs: dict[str, asyncio.Task] = {}
async def run(self):
"""Main worker loop. Polls for queued jobs every 2 seconds."""
print("[worker] Started")
while True:
try:
await self._tick()
except Exception:
traceback.print_exc()
await asyncio.sleep(2)
async def _tick(self):
# Clean up finished tasks
finished = [jid for jid, task in self.active_jobs.items() if task.done()]
for jid in finished:
task = self.active_jobs.pop(jid)
exc = task.exception() if not task.cancelled() else None
if exc:
print(f"[worker] Job {jid} failed: {exc}")
if len(self.active_jobs) >= self.max_concurrent:
return
# Find queued jobs + recover stale ones
all_jobs = JobState.list_all()
queued = []
for j in all_jobs:
phase = j.get_phase()
if phase == JobPhase.QUEUED:
queued.append(j)
elif phase not in (JobPhase.COMPLETED, JobPhase.FAILED):
# Running job — check if worker is alive
if j.worker_pid and not _pid_alive(j.worker_pid):
j.set_phase(JobPhase.QUEUED)
j.worker_pid = 0
j.add_status("Resumed after worker restart")
j.save()
queued.append(j)
queued.sort(key=lambda j: j.created_at)
for job in queued:
if len(self.active_jobs) >= self.max_concurrent:
break
if job.job_id not in self.active_jobs:
task = asyncio.create_task(self._process_job(job.job_id))
self.active_jobs[job.job_id] = task
print(f"[worker] Started job {job.job_id}: {job.question[:60]}")
async def _process_job(self, job_id: str):
job = JobState.load(job_id)
if not job:
return
job.worker_pid = os.getpid()
job.started_at = job.started_at or time.time()
job.save()
try:
await run_agent_job(job)
except Exception as e:
job.set_phase(JobPhase.FAILED)
job.error = f"{type(e).__name__}: {e}"
job.finished_at = time.time()
job.add_status(f"Error: {job.error}")
job.save()
traceback.print_exc()
# Reload to get latest state (run_agent_job saves throughout)
job = JobState.load(job_id)
if not job:
return
job.worker_pid = 0
job.save()
# Send email
if job.email:
_send_email(job)
print(f"[worker] Email sent for job {job_id}")
phase = job.get_phase()
print(f"[worker] Job {job_id} finished: {phase.value}, cost=${job.total_cost:.4f}")
def status(self) -> dict:
return {
"active_jobs": list(self.active_jobs.keys()),
"max_concurrent": self.max_concurrent,
}
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
def _send_email(job: JobState):
elapsed = (job.finished_at or time.time()) - (job.started_at or job.created_at)
stats = {
"elapsed_seconds": round(elapsed, 1),
"total_input_tokens": job.total_input_tokens,
"total_output_tokens": job.total_output_tokens,
"total_cost_usd": round(job.total_cost, 4),
"tool_counts": job.tool_counts,
}
verified = job.best_code_verified and job.best_code_sorry_free
send_result_email(
to_email=job.email,
question=job.question,
status_log="\n".join(f"- {s}" for s in job.status_log),
full_log="\n\n".join(job.full_log),
answer=job.answer,
lean_code=job.best_lean_code,
verified=verified,
stats=stats,
)
|