3d_model / ylff /utils /job_manager.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Job management utilities for background task execution.
"""
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
# Thread pool for running long-running CLI commands
executor = ThreadPoolExecutor(max_workers=2)
# Job storage (in production, use Redis or a database)
jobs: Dict[str, Dict[str, Any]] = {}
class LiveCapture:
"""Stream capturing object that calls a callback for each line/progress update."""
def __init__(self, original_stream, on_output: Optional[Callable[[str], None]] = None):
self.original_stream = original_stream
self.on_output = on_output
self.target_thread_id = threading.get_ident()
from io import StringIO
self.buffer = StringIO()
self.line_buffer = ""
def write(self, s: str) -> int:
# If this write is from a different thread (e.g. Uvicorn logging),
# just pass it through to the original stream and skip capturing.
if threading.get_ident() != self.target_thread_id:
if self.original_stream:
return self.original_stream.write(s)
return len(s)
# Write to original stream so it still shows in terminal
if self.original_stream:
self.original_stream.write(s)
self.buffer.write(s)
if self.on_output and s:
self.line_buffer += s
if "\n" in self.line_buffer or "\r" in self.line_buffer:
parts = self.line_buffer.replace("\r", "\n").split("\n")
if len(parts) > 1:
last_valid = ""
for p in reversed(parts[:-1]):
if p.strip():
last_valid = p.strip()
break
if last_valid:
self.on_output(last_valid)
self.line_buffer = parts[-1]
return len(s)
def flush(self):
if self.original_stream:
self.original_stream.flush()
self.buffer.flush()
def getvalue(self):
return self.buffer.getvalue()
def run_cli_command(
command_func: Callable[..., None],
*args: Any,
on_output: Optional[Callable[[str], None]] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""
Run a CLI command function and capture output with comprehensive error handling.
"""
import sys
import traceback
import typer
command_name = getattr(command_func, "__name__", "unknown_command")
start_time = time.time()
logger.info(f"Starting CLI command: {command_name}")
# Capture stdout/stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = stdout_capture = LiveCapture(old_stdout, on_output)
sys.stderr = stderr_capture = LiveCapture(old_stderr, on_output)
try:
command_func(*args, **kwargs)
duration = time.time() - start_time
stdout_content = stdout_capture.getvalue()
stderr_content = stderr_capture.getvalue()
logger.info(
f"CLI command completed successfully: {command_name}",
extra={
"command": command_name,
"duration": duration,
"stdout_length": len(stdout_content),
"stderr_length": len(stderr_content),
},
)
return {
"success": True,
"stdout": stdout_content,
"stderr": stderr_content,
"error": None,
"duration": duration,
}
except typer.Exit as e:
# Typer uses Exit exceptions for clean exits
duration = time.time() - start_time
stdout_content = stdout_capture.getvalue()
stderr_content = stderr_capture.getvalue()
# Exit code 0 means success, non-zero means failure
success = e.exit_code == 0
if success:
logger.info(
f"CLI command exited successfully: {command_name}",
extra={
"command": command_name,
"duration": duration,
"exit_code": e.exit_code,
},
)
else:
logger.warning(
f"CLI command exited with error: {command_name}",
extra={
"command": command_name,
"duration": duration,
"exit_code": e.exit_code,
"stdout": stdout_content[:500], # First 500 chars
"stderr": stderr_content[:500],
},
)
return {
"success": success,
"exit_code": e.exit_code,
"stdout": stdout_content,
"stderr": stderr_content,
"error": None if success else f"Command exited with code {e.exit_code}",
"duration": duration,
}
except KeyboardInterrupt:
duration = time.time() - start_time
stdout_content = stdout_capture.getvalue()
stderr_content = stderr_capture.getvalue()
logger.error(
f"CLI command interrupted: {command_name}",
extra={
"command": command_name,
"duration": duration,
},
)
return {
"success": False,
"error": "Command interrupted by user",
"stdout": stdout_content,
"stderr": stderr_content,
"duration": duration,
}
except Exception as e:
duration = time.time() - start_time
stdout_content = stdout_capture.getvalue()
stderr_content = stderr_capture.getvalue()
error_traceback = traceback.format_exc()
logger.error(
f"CLI command failed with exception: {command_name}",
extra={
"command": command_name,
"duration": duration,
"error_type": type(e).__name__,
"error": str(e),
"stdout": stdout_content[:500],
"stderr": stderr_content[:500],
},
exc_info=True,
)
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"traceback": error_traceback,
"stdout": stdout_content,
"stderr": stderr_content,
"duration": duration,
}
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr