TTT / app /services /worker.py
github-actions[bot]
Auto-deploy from GitHub: b9fb517f214504859c7a4a90ba3431ca627e9cde
bf8297e
import asyncio
import json
import shutil
from app.core.config import settings
from custom_logger import logger_config as logger
from app.db import crud
worker_task = None
worker_running = False
def is_worker_running():
return worker_running
async def start_worker():
global worker_task, worker_running
logger.info(f"start_worker called: worker_running={worker_running}")
if not worker_running:
worker_running = True
worker_task = asyncio.create_task(worker_loop())
logger.info("Worker task started")
else:
logger.info("Worker already running")
async def worker_loop():
global worker_running
logger.info("TTT Worker started. Monitoring for new tasks...")
try:
from ttt.runner import initiate
# Warm up: load the engine
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: initiate({'text': 'Hi', 'model': 'qwen', 'max_new_tokens': 1}))
logger.info("✅ Qwen model ready. Monitoring for new tasks...")
except Exception as e:
logger.warning(f"⚠️ Qwen model not available (opencode-only tasks will still work): {e}")
while worker_running:
logger.debug("Worker loop iteration, checking for files...")
await crud.cleanup_old_entries()
try:
row = await crud.get_next_not_started()
if row:
task_id = row['id']
input_text = row['input_text']
system_prompt = row['system_prompt'] or "You are a helpful assistant."
model = row['model'] if 'model' in row.keys() else 'qwen'
logger.info(f"\n{'='*60}\nProcessing task: {task_id} (model: {model})\n📌 Input: {input_text[:100]}...\n{'='*60}")
await crud.update_status(task_id, 'processing')
loop = asyncio.get_event_loop()
def progress_cb(percent, text):
asyncio.run_coroutine_threadsafe(
crud.update_progress(task_id, percent, text),
loop
)
try:
await crud.update_progress(task_id, 5, "Starting...")
if model == 'opencode':
await crud.update_progress(task_id, 10, "Running opencode...")
result = await _run_opencode(input_text)
logger.success(f"Successfully processed (opencode): {task_id}")
await crud.update_progress(task_id, 100, "Completed")
await crud.update_status(task_id, 'completed', result=json.dumps({"response": result}))
else:
result = await loop.run_in_executor(None, lambda: initiate(
{
'text': input_text,
'system_prompt': system_prompt,
'model': 'qwen',
},
progress_callback=progress_cb
))
if result:
logger.success(f"Successfully processed: {task_id}")
await crud.update_status(task_id, 'completed', result=json.dumps(result))
else:
raise Exception("initiate() returned empty result")
except Exception as e:
logger.error(f"Failed to process {task_id}: {str(e)}")
await crud.update_status(task_id, 'failed', error=str(e))
else:
await asyncio.sleep(settings.POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker error: {str(e)}")
await asyncio.sleep(settings.POLL_INTERVAL)
async def _run_opencode(text: str) -> str:
if not shutil.which('opencode'):
raise FileNotFoundError(
"opencode CLI not found. Install it from https://opencode.ai"
)
proc = await asyncio.create_subprocess_exec(
'opencode', 'run', '--print-logs', '--model', 'opencode/big-pickle', text,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_lines = []
stderr_lines = []
async def _read_stream(stream, lines, label):
while True:
line = await stream.readline()
if not line:
break
decoded = line.decode(errors='replace').rstrip()
lines.append(decoded)
logger.info(f"opencode {label}: {decoded}")
try:
await asyncio.wait_for(
asyncio.gather(
_read_stream(proc.stdout, stdout_lines, "stdout"),
_read_stream(proc.stderr, stderr_lines, "stderr"),
),
timeout=120
)
except asyncio.TimeoutError:
proc.kill()
raise TimeoutError("opencode timed out after 120s")
await proc.wait()
stdout = '\n'.join(stdout_lines)
stderr = '\n'.join(stderr_lines)
if proc.returncode != 0:
raise RuntimeError(f"opencode failed ({proc.returncode}): {stderr or 'unknown error'}")
return stdout