File size: 5,332 Bytes
1a252b6 fc2f017 1a252b6 fc2f017 1a252b6 75d74eb 1a252b6 fc2f017 1a252b6 fc2f017 bf8297e dcf66e9 1a252b6 fc2f017 1a252b6 fc2f017 bf8297e fc2f017 bf8297e fc2f017 15a97e1 bf8297e 15a97e1 bf8297e 15a97e1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | import asyncio
import json
import shutil
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud
worker_task = None
worker_running = False
def is_worker_running():
return worker_running
async def start_worker():
global worker_task, worker_running
logger.info(f"start_worker called: worker_running={worker_running}")
if not worker_running:
worker_running = True
worker_task = asyncio.create_task(worker_loop())
logger.info("Worker task started")
else:
logger.info("Worker already running")
async def worker_loop():
global worker_running
logger.info("TTT Worker started. Monitoring for new tasks...")
try:
from ttt.runner import initiate
# Warm up: load the engine
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: initiate({'text': 'Hi', 'model': 'qwen', 'max_new_tokens': 1}))
logger.info("✅ Qwen model ready. Monitoring for new tasks...")
except Exception as e:
logger.warning(f"⚠️ Qwen model not available (opencode-only tasks will still work): {e}")
while worker_running:
logger.debug("Worker loop iteration, checking for files...")
await crud.cleanup_old_entries()
try:
row = await crud.get_next_not_started()
if row:
task_id = row['id']
input_text = row['input_text']
system_prompt = row['system_prompt'] or "You are a helpful assistant."
model = row['model'] if 'model' in row.keys() else 'qwen'
logger.info(f"\n{'='*60}\nProcessing task: {task_id} (model: {model})\n📌 Input: {input_text[:100]}...\n{'='*60}")
await crud.update_status(task_id, 'processing')
loop = asyncio.get_event_loop()
def progress_cb(percent, text):
asyncio.run_coroutine_threadsafe(
crud.update_progress(task_id, percent, text),
loop
)
try:
await crud.update_progress(task_id, 5, "Starting...")
if model == 'opencode':
await crud.update_progress(task_id, 10, "Running opencode...")
result = await _run_opencode(input_text)
logger.success(f"Successfully processed (opencode): {task_id}")
await crud.update_progress(task_id, 100, "Completed")
await crud.update_status(task_id, 'completed', result=json.dumps({"response": result}))
else:
result = await loop.run_in_executor(None, lambda: initiate(
{
'text': input_text,
'system_prompt': system_prompt,
'model': 'qwen',
},
progress_callback=progress_cb
))
if result:
logger.success(f"Successfully processed: {task_id}")
await crud.update_status(task_id, 'completed', result=json.dumps(result))
else:
raise Exception("initiate() returned empty result")
except Exception as e:
logger.error(f"Failed to process {task_id}: {str(e)}")
await crud.update_status(task_id, 'failed', error=str(e))
else:
await asyncio.sleep(settings.POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker error: {str(e)}")
await asyncio.sleep(settings.POLL_INTERVAL)
async def _run_opencode(text: str) -> str:
if not shutil.which('opencode'):
raise FileNotFoundError(
"opencode CLI not found. Install it from https://opencode.ai"
)
proc = await asyncio.create_subprocess_exec(
'opencode', 'run', '--print-logs', '--model', 'opencode/big-pickle', text,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_lines = []
stderr_lines = []
async def _read_stream(stream, lines, label):
while True:
line = await stream.readline()
if not line:
break
decoded = line.decode(errors='replace').rstrip()
lines.append(decoded)
logger.info(f"opencode {label}: {decoded}")
try:
await asyncio.wait_for(
asyncio.gather(
_read_stream(proc.stdout, stdout_lines, "stdout"),
_read_stream(proc.stderr, stderr_lines, "stderr"),
),
timeout=120
)
except asyncio.TimeoutError:
proc.kill()
raise TimeoutError("opencode timed out after 120s")
await proc.wait()
stdout = '\n'.join(stdout_lines)
stderr = '\n'.join(stderr_lines)
if proc.returncode != 0:
raise RuntimeError(f"opencode failed ({proc.returncode}): {stderr or 'unknown error'}")
return stdout
|