File size: 4,291 Bytes
1b86fa5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from __future__ import annotations

import asyncio
import os
import time
import traceback
from job_models import JobState, JobPhase
from agent import run_agent_job
from email_sender import send_result_email


class WorkerManager:
    def __init__(self, max_concurrent: int = 2):
        self.max_concurrent = max_concurrent
        self.active_jobs: dict[str, asyncio.Task] = {}

    async def run(self):
        """Main worker loop. Polls for queued jobs every 2 seconds."""
        print("[worker] Started")
        while True:
            try:
                await self._tick()
            except Exception:
                traceback.print_exc()
            await asyncio.sleep(2)

    async def _tick(self):
        # Clean up finished tasks
        finished = [jid for jid, task in self.active_jobs.items() if task.done()]
        for jid in finished:
            task = self.active_jobs.pop(jid)
            exc = task.exception() if not task.cancelled() else None
            if exc:
                print(f"[worker] Job {jid} failed: {exc}")

        if len(self.active_jobs) >= self.max_concurrent:
            return

        # Find queued jobs + recover stale ones
        all_jobs = JobState.list_all()
        queued = []
        for j in all_jobs:
            phase = j.get_phase()
            if phase == JobPhase.QUEUED:
                queued.append(j)
            elif phase not in (JobPhase.COMPLETED, JobPhase.FAILED):
                # Running job — check if worker is alive
                if j.worker_pid and not _pid_alive(j.worker_pid):
                    j.set_phase(JobPhase.QUEUED)
                    j.worker_pid = 0
                    j.add_status("Resumed after worker restart")
                    j.save()
                    queued.append(j)

        queued.sort(key=lambda j: j.created_at)

        for job in queued:
            if len(self.active_jobs) >= self.max_concurrent:
                break
            if job.job_id not in self.active_jobs:
                task = asyncio.create_task(self._process_job(job.job_id))
                self.active_jobs[job.job_id] = task
                print(f"[worker] Started job {job.job_id}: {job.question[:60]}")

    async def _process_job(self, job_id: str):
        job = JobState.load(job_id)
        if not job:
            return

        job.worker_pid = os.getpid()
        job.started_at = job.started_at or time.time()
        job.save()

        try:
            await run_agent_job(job)
        except Exception as e:
            job.set_phase(JobPhase.FAILED)
            job.error = f"{type(e).__name__}: {e}"
            job.finished_at = time.time()
            job.add_status(f"Error: {job.error}")
            job.save()
            traceback.print_exc()

        # Reload to get latest state (run_agent_job saves throughout)
        job = JobState.load(job_id)
        if not job:
            return

        job.worker_pid = 0
        job.save()

        # Send email
        if job.email:
            _send_email(job)
            print(f"[worker] Email sent for job {job_id}")

        phase = job.get_phase()
        print(f"[worker] Job {job_id} finished: {phase.value}, cost=${job.total_cost:.4f}")

    def status(self) -> dict:
        return {
            "active_jobs": list(self.active_jobs.keys()),
            "max_concurrent": self.max_concurrent,
        }


def _pid_alive(pid: int) -> bool:
    try:
        os.kill(pid, 0)
        return True
    except (OSError, ProcessLookupError):
        return False


def _send_email(job: JobState):
    elapsed = (job.finished_at or time.time()) - (job.started_at or job.created_at)
    stats = {
        "elapsed_seconds": round(elapsed, 1),
        "total_input_tokens": job.total_input_tokens,
        "total_output_tokens": job.total_output_tokens,
        "total_cost_usd": round(job.total_cost, 4),
        "tool_counts": job.tool_counts,
    }
    verified = job.best_code_verified and job.best_code_sorry_free
    send_result_email(
        to_email=job.email,
        question=job.question,
        status_log="\n".join(f"- {s}" for s in job.status_log),
        full_log="\n\n".join(job.full_log),
        answer=job.answer,
        lean_code=job.best_lean_code,
        verified=verified,
        stats=stats,
    )