VeriDeepResearch / worker.py
Vilin97's picture
Refactor to FastAPI + background job architecture
1b86fa5
from __future__ import annotations
import asyncio
import os
import time
import traceback
from job_models import JobState, JobPhase
from agent import run_agent_job
from email_sender import send_result_email
class WorkerManager:
def __init__(self, max_concurrent: int = 2):
self.max_concurrent = max_concurrent
self.active_jobs: dict[str, asyncio.Task] = {}
async def run(self):
"""Main worker loop. Polls for queued jobs every 2 seconds."""
print("[worker] Started")
while True:
try:
await self._tick()
except Exception:
traceback.print_exc()
await asyncio.sleep(2)
async def _tick(self):
# Clean up finished tasks
finished = [jid for jid, task in self.active_jobs.items() if task.done()]
for jid in finished:
task = self.active_jobs.pop(jid)
exc = task.exception() if not task.cancelled() else None
if exc:
print(f"[worker] Job {jid} failed: {exc}")
if len(self.active_jobs) >= self.max_concurrent:
return
# Find queued jobs + recover stale ones
all_jobs = JobState.list_all()
queued = []
for j in all_jobs:
phase = j.get_phase()
if phase == JobPhase.QUEUED:
queued.append(j)
elif phase not in (JobPhase.COMPLETED, JobPhase.FAILED):
# Running job — check if worker is alive
if j.worker_pid and not _pid_alive(j.worker_pid):
j.set_phase(JobPhase.QUEUED)
j.worker_pid = 0
j.add_status("Resumed after worker restart")
j.save()
queued.append(j)
queued.sort(key=lambda j: j.created_at)
for job in queued:
if len(self.active_jobs) >= self.max_concurrent:
break
if job.job_id not in self.active_jobs:
task = asyncio.create_task(self._process_job(job.job_id))
self.active_jobs[job.job_id] = task
print(f"[worker] Started job {job.job_id}: {job.question[:60]}")
async def _process_job(self, job_id: str):
job = JobState.load(job_id)
if not job:
return
job.worker_pid = os.getpid()
job.started_at = job.started_at or time.time()
job.save()
try:
await run_agent_job(job)
except Exception as e:
job.set_phase(JobPhase.FAILED)
job.error = f"{type(e).__name__}: {e}"
job.finished_at = time.time()
job.add_status(f"Error: {job.error}")
job.save()
traceback.print_exc()
# Reload to get latest state (run_agent_job saves throughout)
job = JobState.load(job_id)
if not job:
return
job.worker_pid = 0
job.save()
# Send email
if job.email:
_send_email(job)
print(f"[worker] Email sent for job {job_id}")
phase = job.get_phase()
print(f"[worker] Job {job_id} finished: {phase.value}, cost=${job.total_cost:.4f}")
def status(self) -> dict:
return {
"active_jobs": list(self.active_jobs.keys()),
"max_concurrent": self.max_concurrent,
}
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
def _send_email(job: JobState):
elapsed = (job.finished_at or time.time()) - (job.started_at or job.created_at)
stats = {
"elapsed_seconds": round(elapsed, 1),
"total_input_tokens": job.total_input_tokens,
"total_output_tokens": job.total_output_tokens,
"total_cost_usd": round(job.total_cost, 4),
"tool_counts": job.tool_counts,
}
verified = job.best_code_verified and job.best_code_sorry_free
send_result_email(
to_email=job.email,
question=job.question,
status_log="\n".join(f"- {s}" for s in job.status_log),
full_log="\n\n".join(job.full_log),
answer=job.answer,
lean_code=job.best_lean_code,
verified=verified,
stats=stats,
)