import asyncio import json import shutil from app.core.config import settings from custom_logger import logger_config as logger from app.db import crud worker_task = None worker_running = False def is_worker_running(): return worker_running async def start_worker(): global worker_task, worker_running logger.info(f"start_worker called: worker_running={worker_running}") if not worker_running: worker_running = True worker_task = asyncio.create_task(worker_loop()) logger.info("Worker task started") else: logger.info("Worker already running") async def worker_loop(): global worker_running logger.info("TTT Worker started. Monitoring for new tasks...") try: from ttt.runner import initiate # Warm up: load the engine loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: initiate({'text': 'Hi', 'model': 'qwen', 'max_new_tokens': 1})) logger.info("āœ… Qwen model ready. Monitoring for new tasks...") except Exception as e: logger.warning(f"āš ļø Qwen model not available (opencode-only tasks will still work): {e}") while worker_running: logger.debug("Worker loop iteration, checking for files...") await crud.cleanup_old_entries() try: row = await crud.get_next_not_started() if row: task_id = row['id'] input_text = row['input_text'] system_prompt = row['system_prompt'] or "You are a helpful assistant." model = row['model'] if 'model' in row.keys() else 'qwen' logger.info(f"\n{'='*60}\nProcessing task: {task_id} (model: {model})\nšŸ“Œ Input: {input_text[:100]}...\n{'='*60}") await crud.update_status(task_id, 'processing') loop = asyncio.get_event_loop() def progress_cb(percent, text): asyncio.run_coroutine_threadsafe( crud.update_progress(task_id, percent, text), loop ) try: await crud.update_progress(task_id, 5, "Starting...") if model == 'opencode': await crud.update_progress(task_id, 10, "Running opencode...") result = await _run_opencode(input_text) logger.success(f"Successfully processed (opencode): {task_id}") await crud.update_progress(task_id, 100, "Completed") await crud.update_status(task_id, 'completed', result=json.dumps({"response": result})) else: result = await loop.run_in_executor(None, lambda: initiate( { 'text': input_text, 'system_prompt': system_prompt, 'model': 'qwen', }, progress_callback=progress_cb )) if result: logger.success(f"Successfully processed: {task_id}") await crud.update_status(task_id, 'completed', result=json.dumps(result)) else: raise Exception("initiate() returned empty result") except Exception as e: logger.error(f"Failed to process {task_id}: {str(e)}") await crud.update_status(task_id, 'failed', error=str(e)) else: await asyncio.sleep(settings.POLL_INTERVAL) except Exception as e: logger.error(f"Worker error: {str(e)}") await asyncio.sleep(settings.POLL_INTERVAL) async def _run_opencode(text: str) -> str: if not shutil.which('opencode'): raise FileNotFoundError( "opencode CLI not found. Install it from https://opencode.ai" ) proc = await asyncio.create_subprocess_exec( 'opencode', 'run', '--print-logs', '--model', 'opencode/big-pickle', text, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout_lines = [] stderr_lines = [] async def _read_stream(stream, lines, label): while True: line = await stream.readline() if not line: break decoded = line.decode(errors='replace').rstrip() lines.append(decoded) logger.info(f"opencode {label}: {decoded}") try: await asyncio.wait_for( asyncio.gather( _read_stream(proc.stdout, stdout_lines, "stdout"), _read_stream(proc.stderr, stderr_lines, "stderr"), ), timeout=120 ) except asyncio.TimeoutError: proc.kill() raise TimeoutError("opencode timed out after 120s") await proc.wait() stdout = '\n'.join(stdout_lines) stderr = '\n'.join(stderr_lines) if proc.returncode != 0: raise RuntimeError(f"opencode failed ({proc.returncode}): {stderr or 'unknown error'}") return stdout