File size: 5,332 Bytes
1a252b6
 
fc2f017
1a252b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc2f017
1a252b6
 
 
 
 
 
 
 
 
 
 
 
75d74eb
1a252b6
fc2f017
1a252b6
 
 
 
 
 
 
 
 
 
 
 
 
 
fc2f017
 
bf8297e
dcf66e9
 
 
1a252b6
fc2f017
 
 
 
 
 
 
 
 
 
 
 
 
 
1a252b6
 
 
 
 
 
 
 
 
 
 
fc2f017
 
bf8297e
fc2f017
 
 
 
bf8297e
 
 
 
 
fc2f017
15a97e1
bf8297e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15a97e1
bf8297e
 
15a97e1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import json
import shutil
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud

worker_task = None
worker_running = False

def is_worker_running():
    return worker_running

async def start_worker():
    global worker_task, worker_running
    
    logger.info(f"start_worker called: worker_running={worker_running}")
    
    if not worker_running:
        worker_running = True
        worker_task = asyncio.create_task(worker_loop())
        logger.info("Worker task started")
    else:
        logger.info("Worker already running")

async def worker_loop():
    global worker_running
    logger.info("TTT Worker started. Monitoring for new tasks...")
    
    try:
        from ttt.runner import initiate
        # Warm up: load the engine
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, lambda: initiate({'text': 'Hi', 'model': 'qwen', 'max_new_tokens': 1}))
        logger.info("✅ Qwen model ready. Monitoring for new tasks...")
    except Exception as e:
        logger.warning(f"⚠️ Qwen model not available (opencode-only tasks will still work): {e}")

    while worker_running:
        logger.debug("Worker loop iteration, checking for files...")
        await crud.cleanup_old_entries()
        
        try:
            row = await crud.get_next_not_started()
            
            if row:
                task_id = row['id']
                input_text = row['input_text']
                system_prompt = row['system_prompt'] or "You are a helpful assistant."
                model = row['model'] if 'model' in row.keys() else 'qwen'
                
                logger.info(f"\n{'='*60}\nProcessing task: {task_id} (model: {model})\n📌 Input: {input_text[:100]}...\n{'='*60}")
                
                await crud.update_status(task_id, 'processing')
                
                loop = asyncio.get_event_loop()

                def progress_cb(percent, text):
                    asyncio.run_coroutine_threadsafe(
                        crud.update_progress(task_id, percent, text),
                        loop
                    )

                try:
                    await crud.update_progress(task_id, 5, "Starting...")

                    if model == 'opencode':
                        await crud.update_progress(task_id, 10, "Running opencode...")
                        result = await _run_opencode(input_text)
                        logger.success(f"Successfully processed (opencode): {task_id}")
                        await crud.update_progress(task_id, 100, "Completed")
                        await crud.update_status(task_id, 'completed', result=json.dumps({"response": result}))
                    else:
                        result = await loop.run_in_executor(None, lambda: initiate(
                            {
                                'text': input_text,
                                'system_prompt': system_prompt,
                                'model': 'qwen',
                            },
                            progress_callback=progress_cb
                        ))

                        if result:
                            logger.success(f"Successfully processed: {task_id}")
                            await crud.update_status(task_id, 'completed', result=json.dumps(result))
                        else:
                            raise Exception("initiate() returned empty result")

                except Exception as e:
                    logger.error(f"Failed to process {task_id}: {str(e)}")
                    await crud.update_status(task_id, 'failed', error=str(e))
                    
            else:
                await asyncio.sleep(settings.POLL_INTERVAL)
                
        except Exception as e:
            logger.error(f"Worker error: {str(e)}")
            await asyncio.sleep(settings.POLL_INTERVAL)


async def _run_opencode(text: str) -> str:
    if not shutil.which('opencode'):
        raise FileNotFoundError(
            "opencode CLI not found. Install it from https://opencode.ai"
        )

    proc = await asyncio.create_subprocess_exec(
        'opencode', 'run', '--print-logs', '--model', 'opencode/big-pickle', text,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout_lines = []
    stderr_lines = []

    async def _read_stream(stream, lines, label):
        while True:
            line = await stream.readline()
            if not line:
                break
            decoded = line.decode(errors='replace').rstrip()
            lines.append(decoded)
            logger.info(f"opencode {label}: {decoded}")

    try:
        await asyncio.wait_for(
            asyncio.gather(
                _read_stream(proc.stdout, stdout_lines, "stdout"),
                _read_stream(proc.stderr, stderr_lines, "stderr"),
            ),
            timeout=120
        )
    except asyncio.TimeoutError:
        proc.kill()
        raise TimeoutError("opencode timed out after 120s")

    await proc.wait()

    stdout = '\n'.join(stdout_lines)
    stderr = '\n'.join(stderr_lines)

    if proc.returncode != 0:
        raise RuntimeError(f"opencode failed ({proc.returncode}): {stderr or 'unknown error'}")
    return stdout