from __future__ import annotations
import copy
import json
import math
import os
import re
import sys
import time
import ffmpeg
from datetime import datetime
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable
from PIL import Image, ImageColor
from shared.utils.audio_video import extract_audio_tracks
from shared.utils.utils import get_video_frame, get_video_info
from shared.deepy.config import (
DEEPY_AUTO_CANCEL_QUEUE_TASKS_DEFAULT,
DEEPY_AUTO_CANCEL_QUEUE_TASKS_KEY,
DEEPY_CONTEXT_TOKENS_DEFAULT,
DEEPY_CONTEXT_TOKENS_KEY,
DEEPY_CUSTOM_SYSTEM_PROMPT_KEY,
DEEPY_VRAM_MODE_ALWAYS_LOADED,
DEEPY_VRAM_MODE_UNLOAD,
DEEPY_VRAM_MODE_UNLOAD_ON_REQUEST,
get_deepy_config_value,
normalize_deepy_auto_cancel_queue_tasks,
normalize_deepy_context_tokens,
normalize_deepy_custom_system_prompt,
normalize_deepy_vram_mode,
)
from shared.deepy import DEFAULT_SYSTEM_PROMPT as ASSISTANT_SYSTEM_PROMPT
from shared.deepy import media_registry, tool_settings as deepy_tool_settings, ui_settings as deepy_ui_settings, video_tools as deepy_video_tools, vision as deepy_vision
from shared.gradio import assistant_chat
from shared.prompt_enhancer import qwen35_text
from shared.prompt_enhancer.qwen35_assistant_runtime import (
Qwen35AssistantRuntime,
extract_tool_calls,
render_assistant_messages,
render_text_user_turn_suffix,
render_tool_turn_suffix,
strip_inline_tool_call_text,
strip_tool_blocks,
strip_trailing_stop_markup,
)
ASSISTANT_DEBUG = False
_TOOL_TYPE_MAP = {
"str": "string",
"int": "integer",
"float": "number",
"bool": "boolean",
}
_AI_GEN_NO = 0
_DOCS_DIR = Path(__file__).resolve().parents[2] / "docs"
_DEEPY_DOCS = {
"finetunes": {"title": "Finetunes", "path": _DOCS_DIR / "FINETUNES.md"},
"getting_started": {"title": "Getting Started", "path": _DOCS_DIR / "GETTING_STARTED.md"},
"loras": {"title": "Loras", "path": _DOCS_DIR / "LORAS.md"},
"overview": {"title": "Overview", "path": _DOCS_DIR / "OVERVIEW.md"},
"prompts": {"title": "Prompts", "path": _DOCS_DIR / "PROMPTS.md"},
"vace": {"title": "VACE", "path": _DOCS_DIR / "VACE.md"},
}
_DOC_HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
_DOC_TOKEN_RE = re.compile(r"[a-z0-9]+")
_SELECTED_REFERENCE_RE = re.compile(r"\b(selected|current(?:ly)?\s+selected|current\s+(?:item|media))\b", flags=re.IGNORECASE)
_RUNTIME_UPDATE_BLOCK_RE = re.compile(r"\s*.*?\s*", flags=re.DOTALL | re.IGNORECASE)
_POST_TRIM_WINDOW_FRACTION = 0.25
_INJECT_SELECTED_MEDIA_RUNTIME_UPDATES = False
_RUNTIME_STATUS_VISUAL_KEYS = (
"selected_visual_media_id",
"selected_visual_media_type",
"selected_visual_media_label",
"selected_visual_current_time_seconds",
"selected_visual_current_frame_no",
)
_RUNTIME_STATUS_AUDIO_KEYS = (
"selected_audio_media_id",
"selected_audio_media_type",
"selected_audio_media_label",
)
_RUNTIME_STATUS_ALL_KEYS = _RUNTIME_STATUS_VISUAL_KEYS + _RUNTIME_STATUS_AUDIO_KEYS
def set_assistant_debug(enabled: bool) -> None:
global ASSISTANT_DEBUG
ASSISTANT_DEBUG = bool(enabled)
def _json_type_from_annotation(annotation) -> str:
annotation_name = getattr(annotation, "__name__", str(annotation))
return _TOOL_TYPE_MAP.get(annotation_name, "string")
def assistant_tool(
name: str | None = None,
description: str = "",
parameters: dict[str, dict[str, Any]] | None = None,
display_name: str | None = None,
pause_runtime: bool = True,
pause_reason: str = "tool",
):
def decorator(func):
func._assistant_tool = {
"name": str(name or func.__name__).strip(),
"display_name": str(display_name or name or func.__name__).strip(),
"description": str(description or "").strip(),
"parameters": dict(parameters or {}),
"pause_runtime": bool(pause_runtime),
"pause_reason": str(pause_reason or "tool").strip() or "tool",
}
return func
return decorator
def _doc_relative_path(doc_path: Path) -> str:
return str(doc_path.relative_to(_DOCS_DIR.parent)).replace("\\", "/")
def _normalize_doc_text(value: str) -> str:
return " ".join(_DOC_TOKEN_RE.findall(str(value or "").lower()))
def _tokenize_doc_query(value: str) -> list[str]:
return _DOC_TOKEN_RE.findall(str(value or "").lower())
def _extract_doc_sections(doc_id: str) -> tuple[dict[str, Any], list[dict[str, Any]]]:
lookup_id = str(doc_id or "").strip().lower()
doc_entry = _DEEPY_DOCS.get(lookup_id, None)
if doc_entry is None:
raise KeyError(lookup_id)
doc_path = Path(doc_entry["path"])
content = doc_path.read_text(encoding="utf-8").replace("\r\n", "\n").replace("\r", "\n").strip()
lines = content.split("\n") if len(content) > 0 else []
headings = []
in_code_block = False
for index, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("```"):
in_code_block = not in_code_block
continue
if in_code_block:
continue
match = _DOC_HEADING_RE.match(line)
if match is None:
continue
headings.append((index, len(match.group(1)), match.group(2).strip()))
include_top_level = not any(level > 1 for _line_no, level, _title in headings)
sections = []
stack: list[tuple[int, str]] = []
for heading_index, (start_line, level, title) in enumerate(headings):
while stack and stack[-1][0] >= level:
stack.pop()
stack.append((level, title))
if not include_top_level and level == 1:
continue
end_line = len(lines)
for next_start_line, next_level, _next_title in headings[heading_index + 1 :]:
if next_level <= level:
end_line = next_start_line
break
section_parts = [item_title for item_level, item_title in stack if include_top_level or item_level > 1]
section_name = " > ".join(section_parts or [title])
markdown = "\n".join(lines[start_line:end_line]).strip()
body = "\n".join(lines[start_line + 1 : end_line]).strip()
sections.append(
{
"section": section_name,
"heading": title,
"heading_level": int(level),
"content": markdown,
"body": body,
}
)
if not sections and len(content) > 0:
sections.append(
{
"section": str(doc_entry["title"]).strip() or lookup_id,
"heading": str(doc_entry["title"]).strip() or lookup_id,
"heading_level": 1,
"content": content,
"body": content,
}
)
return {
"doc_id": lookup_id,
"title": str(doc_entry["title"]).strip() or lookup_id,
"path": _doc_relative_path(doc_path),
}, sections
def _build_doc_excerpt(section: dict[str, Any], query: str, query_tokens: list[str], limit: int = 260) -> str:
lines = [line.strip() for line in str(section.get("body", "") or "").splitlines() if len(line.strip()) > 0]
if not lines:
lines = [line.strip() for line in str(section.get("content", "") or "").splitlines() if len(line.strip()) > 0]
if not lines:
return ""
query_lower = str(query or "").strip().lower()
best_line = ""
if len(query_lower) > 0:
best_line = next((line for line in lines if query_lower in line.lower()), "")
if len(best_line) == 0 and query_tokens:
best_line = max(lines, key=lambda line: sum(token in line.lower() for token in query_tokens))
if len(best_line) == 0:
best_line = lines[0]
best_line = re.sub(r"\s+", " ", best_line).strip()
return best_line if len(best_line) <= limit else best_line[: limit - 3].rstrip() + "..."
def _score_doc_section(query: str, query_tokens: list[str], doc_title: str, section: dict[str, Any]) -> int:
query_lower = str(query or "").strip().lower()
path_text = f"{doc_title} {section.get('section', '')}".lower()
content_text = str(section.get("body", "") or section.get("content", "")).lower()
score = 0
if len(query_lower) > 0 and query_lower in path_text:
score += 100
if len(query_lower) > 0 and query_lower in content_text:
score += 40
for token in query_tokens:
if token in path_text:
score += 12
if token in content_text:
score += 3
return score
def _resolve_doc_section(doc_id: str, section_name: str) -> tuple[dict[str, Any], dict[str, Any], list[str]]:
doc_info, sections = _extract_doc_sections(doc_id)
normalized_target = _normalize_doc_text(section_name)
if len(normalized_target) == 0:
return doc_info, {}, []
exact_path_matches = [section for section in sections if _normalize_doc_text(section["section"]) == normalized_target]
if len(exact_path_matches) == 1:
return doc_info, exact_path_matches[0], []
exact_heading_matches = [section for section in sections if _normalize_doc_text(section["heading"]) == normalized_target]
if len(exact_path_matches) == 0 and len(exact_heading_matches) == 1:
return doc_info, exact_heading_matches[0], []
partial_matches = [section for section in sections if normalized_target in _normalize_doc_text(section["section"])]
if len(exact_path_matches) == 0 and len(exact_heading_matches) == 0 and len(partial_matches) == 1:
return doc_info, partial_matches[0], []
candidate_matches = exact_path_matches or exact_heading_matches or partial_matches
candidate_names = [str(section["section"]) for section in candidate_matches[:5]]
return doc_info, {}, candidate_names
def _format_avg_tokens_per_second(value: float) -> str:
try:
speed = float(value or 0.0)
except Exception:
speed = 0.0
if not math.isfinite(speed) or speed < 0.0:
speed = 0.0
return f"{speed:.1f}"
def build_assistant_chat_stats(
session: AssistantSessionState,
*,
max_tokens: int,
active_sequence_token_count: int | None = None,
live_prefill_tokens: int = 0,
live_prefill_seconds: float = 0.0,
live_generated_tokens: int = 0,
live_generation_seconds: float = 0.0,
) -> dict[str, Any]:
max_tokens = max(0, int(max_tokens or 0))
consumed_tokens = None if active_sequence_token_count is None else max(0, int(active_sequence_token_count))
if consumed_tokens is None:
snapshot_sequence = None if session.runtime_snapshot is None else session.runtime_snapshot.get("sequence", None)
if isinstance(snapshot_sequence, dict):
snapshot_token_ids = snapshot_sequence.get("token_ids", []) or []
if len(snapshot_token_ids) > 0:
consumed_tokens = len(snapshot_token_ids)
if consumed_tokens is None:
consumed_tokens = len(session.rendered_token_ids or [])
total_prefill_tokens = max(0, int(session.prefill_token_total or 0)) + max(0, int(live_prefill_tokens or 0))
total_prefill_seconds = max(0.0, float(session.prefill_seconds_total or 0.0)) + max(0.0, float(live_prefill_seconds or 0.0))
total_generated_tokens = max(0, int(session.generated_token_total or 0)) + max(0, int(live_generated_tokens or 0))
total_generation_seconds = max(0.0, float(session.generated_seconds_total or 0.0)) + max(0.0, float(live_generation_seconds or 0.0))
avg_prefill_tokens_per_second = (float(total_prefill_tokens) / float(total_prefill_seconds)) if total_prefill_seconds > 1e-9 else 0.0
avg_generated_tokens_per_second = (float(total_generated_tokens) / float(total_generation_seconds)) if total_generation_seconds > 1e-9 else 0.0
return {
"visible": True,
"text": f"prefill {_format_avg_tokens_per_second(avg_prefill_tokens_per_second)} tk/s | gen {_format_avg_tokens_per_second(avg_generated_tokens_per_second)} tk/s | {int(consumed_tokens):,} / {int(max_tokens):,} tk",
"avg_prefill_tokens_per_second": avg_prefill_tokens_per_second,
"avg_generated_tokens_per_second": avg_generated_tokens_per_second,
"consumed_tokens": int(consumed_tokens),
"max_tokens": int(max_tokens),
}
@dataclass(slots=True)
class AssistantSessionState:
messages: list[dict[str, Any]] = field(default_factory=list)
rendered_token_ids: list[int] = field(default_factory=list)
rendered_messages_len: int = 0
runtime_snapshot: dict[str, Any] | None = None
discard_runtime_snapshot_on_release: bool = False
media_registry: list[dict[str, Any]] = field(default_factory=list)
media_registry_counter: int = 0
chat_html: str = ""
chat_transcript: list[dict[str, Any]] = field(default_factory=list)
chat_transcript_counter: int = 0
interrupt_requested: bool = False
drop_state_requested: bool = False
worker_active: bool = False
control_queue: Any | None = None
queued_job_count: int = 0
chat_epoch: int = 0
release_vram_callback: Callable[[], None] | None = None
force_loading_status_once: bool = False
current_turn: dict[str, Any] | None = None
interruption_notice: str = ""
runtime_status_note: str = ""
runtime_status_signature: str = ""
rendered_system_prompt_signature: str = ""
rendered_context_window_tokens: int = 0
pending_replay_reason: str = ""
tool_ui_settings: dict[str, Any] = field(default_factory=dict)
prefill_token_total: int = 0
prefill_seconds_total: float = 0.0
generated_token_total: int = 0
generated_seconds_total: float = 0.0
runtime_max_model_len: int = 0
chat_stats_signature: str = ""
@dataclass(slots=True)
class AssistantRuntimeHooks:
acquire_gpu: Callable[[], None]
release_gpu: Callable[..., None]
register_gpu_resident: Callable[[Callable[[], None] | None, bool], None]
clear_gpu_resident: Callable[[], None]
ensure_loaded: Callable[[], tuple[Any, Any]]
unload_runtime: Callable[[], None]
unload_weights: Callable[[], None]
ensure_vision_loaded: Callable[[], tuple[Any, Any]] | None = None
def get_or_create_assistant_session(state) -> AssistantSessionState:
session = state.get("assistant_session", None)
if isinstance(session, AssistantSessionState):
return session
session = AssistantSessionState()
state["assistant_session"] = session
return session
def clear_assistant_session(session: AssistantSessionState) -> None:
session.messages.clear()
session.rendered_token_ids.clear()
session.rendered_messages_len = 0
session.runtime_snapshot = None
session.discard_runtime_snapshot_on_release = False
session.media_registry.clear()
session.media_registry_counter = 0
session.chat_html = ""
session.queued_job_count = 0
session.release_vram_callback = None
session.force_loading_status_once = False
session.current_turn = None
session.interruption_notice = ""
session.runtime_status_note = ""
session.runtime_status_signature = ""
session.rendered_system_prompt_signature = ""
session.rendered_context_window_tokens = 0
session.pending_replay_reason = ""
session.tool_ui_settings = {}
session.prefill_token_total = 0
session.prefill_seconds_total = 0.0
session.generated_token_total = 0
session.generated_seconds_total = 0.0
session.runtime_max_model_len = 0
session.chat_stats_signature = ""
assistant_chat.reset_session_chat(session)
def begin_assistant_turn(session: AssistantSessionState, user_message_id: str, user_text: str) -> None:
session.current_turn = {
"user_message_id": str(user_message_id or "").strip(),
"user_text": str(user_text or "").strip(),
"messages_len": len(session.messages),
"rendered_token_ids": list(session.rendered_token_ids),
"rendered_messages_len": int(session.rendered_messages_len or 0),
"runtime_snapshot": session.runtime_snapshot,
"rendered_system_prompt_signature": session.rendered_system_prompt_signature,
"rendered_context_window_tokens": session.rendered_context_window_tokens,
"assistant_message_id": "",
"interrupt_recorded": False,
"chat_transcript": copy.deepcopy(session.chat_transcript),
"chat_transcript_counter": int(session.chat_transcript_counter or 0),
}
def mark_assistant_turn_message(session: AssistantSessionState, message_id: str) -> None:
checkpoint = session.current_turn
if not isinstance(checkpoint, dict):
return
checkpoint["assistant_message_id"] = str(message_id or "").strip()
def checkpoint_assistant_turn(session: AssistantSessionState) -> bool:
checkpoint = session.current_turn
if not isinstance(checkpoint, dict):
return False
checkpoint["messages_len"] = len(session.messages)
checkpoint["rendered_token_ids"] = [int(token_id) for token_id in session.rendered_token_ids]
checkpoint["rendered_messages_len"] = int(session.rendered_messages_len or 0)
checkpoint["runtime_snapshot"] = None if session.runtime_snapshot is None else copy.deepcopy(session.runtime_snapshot)
checkpoint["rendered_system_prompt_signature"] = str(session.rendered_system_prompt_signature or "")
checkpoint["rendered_context_window_tokens"] = int(session.rendered_context_window_tokens or 0)
checkpoint["chat_transcript"] = copy.deepcopy(session.chat_transcript)
checkpoint["chat_transcript_counter"] = int(session.chat_transcript_counter or 0)
return True
def _build_interruption_notice(user_text: str) -> str:
collapsed = re.sub(r"\s+", " ", str(user_text or "").strip())
if len(collapsed) > 280:
collapsed = collapsed[:277].rstrip() + "..."
if len(collapsed) == 0:
return "The previous user request was interrupted by the user before completion. Do not continue that cancelled turn unless the user explicitly asks to resume it."
return f"The previous user request was interrupted by the user before completion. Do not continue that cancelled turn unless the user explicitly asks to resume it. Cancelled request: {collapsed}"
def _describe_prefix_mismatch(current_token_ids: list[int], target_tokens: list[int]) -> str:
current_len = len(current_token_ids)
target_len = len(target_tokens)
shared = min(current_len, target_len)
mismatch_index = next((idx for idx, (current_token, target_token) in enumerate(zip(current_token_ids, target_tokens)) if int(current_token) != int(target_token)), shared)
if mismatch_index >= shared:
if current_len == target_len:
return f"live sequence and canonicalized prompt had the same length ({current_len} tokens) but different token identity at the end"
if current_len < target_len:
return f"canonicalized prompt diverged right after the live prefix at token {mismatch_index} (live={current_len}, canonical={target_len})"
return f"live runtime contained {current_len - target_len} extra trailing tokens beyond the canonicalized prompt (live={current_len}, canonical={target_len})"
return f"live sequence diverged from canonicalized prompt at token {mismatch_index} (live={current_len}, canonical={target_len})"
def rollback_assistant_turn(session: AssistantSessionState, interrupted_badge: str = "Interrupted") -> bool:
checkpoint = session.current_turn
if not isinstance(checkpoint, dict):
return False
target_len = int(checkpoint.get("messages_len", len(session.messages)))
if len(session.messages) > target_len:
del session.messages[target_len:]
session.rendered_token_ids = [int(token_id) for token_id in checkpoint.get("rendered_token_ids", []) or []]
try:
session.rendered_messages_len = int(checkpoint.get("rendered_messages_len", 0) or 0)
except Exception:
session.rendered_messages_len = 0
session.runtime_snapshot = checkpoint.get("runtime_snapshot", None)
session.rendered_system_prompt_signature = str(checkpoint.get("rendered_system_prompt_signature", "") or "")
try:
session.rendered_context_window_tokens = int(checkpoint.get("rendered_context_window_tokens", 0) or 0)
except Exception:
session.rendered_context_window_tokens = 0
transcript_snapshot = checkpoint.get("chat_transcript", None)
if isinstance(transcript_snapshot, list):
session.chat_transcript = copy.deepcopy(transcript_snapshot)
try:
session.chat_transcript_counter = int(checkpoint.get("chat_transcript_counter", session.chat_transcript_counter) or 0)
except Exception:
pass
else:
assistant_message_id = str(checkpoint.get("assistant_message_id", "") or "").strip()
if len(assistant_message_id) > 0:
assistant_chat.remove_message(session, assistant_message_id)
user_message_id = str(checkpoint.get("user_message_id", "") or "").strip()
if len(user_message_id) > 0:
assistant_chat.set_message_badge(session, user_message_id, interrupted_badge)
if not bool(checkpoint.get("interrupt_recorded", False)):
session.interruption_notice = _build_interruption_notice(str(checkpoint.get("user_text", "") or ""))
if ASSISTANT_DEBUG:
print("[Assistant] Interruption notice computed:")
print(session.interruption_notice)
checkpoint["interrupt_recorded"] = True
return True
def finish_assistant_turn(session: AssistantSessionState) -> None:
session.current_turn = None
def request_assistant_interrupt(session: AssistantSessionState) -> None:
session.interrupt_requested = True
def request_assistant_reset(session: AssistantSessionState) -> None:
request_assistant_interrupt(session)
session.drop_state_requested = True
session.chat_epoch += 1
session.queued_job_count = 0
def set_assistant_tool_ui_settings(session: AssistantSessionState, **kwargs) -> dict[str, Any]:
normalized = deepy_ui_settings.normalize_assistant_tool_ui_settings(**kwargs)
session.tool_ui_settings = dict(normalized)
return session.tool_ui_settings
def _next_ai_client_id() -> str:
global _AI_GEN_NO
_AI_GEN_NO += 1
return f"ai_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{_AI_GEN_NO}"
def _json_dumps(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=True, sort_keys=True)
def _strip_partial_tool_markup(text: str) -> str:
stripped = strip_trailing_stop_markup(str(text or ""))
lowered = stripped.lower()
cut_points = []
for marker in ("", "= 0:
cut_points.append(idx)
if cut_points:
stripped = stripped[: min(cut_points)]
return stripped.rstrip()
class tools:
def __init__(self, gen, get_processed_queue, send_cmd, session: AssistantSessionState | None = None, get_output_filepath: Callable[[str, bool, bool], str] | None = None, record_file_metadata: Callable[..., None] | None = None, get_server_config: Callable[[], dict[str, Any]] | None = None):
self.gen = gen
self.get_processed_queue = get_processed_queue
self.send_cmd = send_cmd
self.session = session
self.get_output_filepath = get_output_filepath
self.record_file_metadata = record_file_metadata
self.get_server_config = get_server_config
self._vision_query_callback: Callable[[dict[str, Any], str], dict[str, Any]] | None = None
self._tool_progress_callback: Callable[..., None] | None = None
def _log(self, message: str) -> None:
if ASSISTANT_DEBUG:
print(f"[AssistantTool] {message}")
def _is_interrupted(self) -> bool:
return self.session is not None and self.session.interrupt_requested
def _interrupted_result(self, client_id: str, task: dict[str, Any]) -> dict[str, Any]:
self._log(f"Generation interrupted for {client_id}")
cancel_result = {}
if self._auto_cancel_queue_tasks_enabled() and len(str(client_id or "").strip()) > 0:
queue = list((self.gen or {}).get("queue", []) or [])
if self._queue_contains_client_id(queue, client_id):
self.send_cmd("abort_client_id", str(client_id))
cancel_result = {"client_id": str(client_id), "mode": "abort_client_id"}
elif self._clear_inline_queue_client_id(client_id):
cancel_result = {"client_id": str(client_id), "mode": "inline_queue"}
result = {
"status": "interrupted",
"client_id": client_id,
"output_file": "",
"prompt": task["prompt"],
"resolution": task["resolution"],
"error": "Interrupted by user.",
}
if isinstance(cancel_result, dict) and len(cancel_result) > 0:
result["queue_cancel"] = cancel_result
self._update_tool_progress("error", "Interrupted", result)
return result
def _set_status(self, text: str | None, kind: str = "working") -> None:
self.send_cmd("chat_output", assistant_chat.build_status_event(text, kind=kind, visible=text is not None and len(str(text).strip()) > 0))
def bind_runtime_tools(self, vision_query_callback: Callable[[dict[str, Any], str], dict[str, Any]] | None = None, tool_progress_callback: Callable[..., None] | None = None) -> None:
self._vision_query_callback = vision_query_callback
self._tool_progress_callback = tool_progress_callback
def _update_tool_progress(self, status: str | None = None, status_text: str | None = None, result: dict[str, Any] | None = None) -> None:
if callable(self._tool_progress_callback):
self._tool_progress_callback(status=status, status_text=status_text, result=result)
def _get_tool_ui_settings(self) -> dict[str, Any]:
if self.session is not None and isinstance(self.session.tool_ui_settings, dict) and len(self.session.tool_ui_settings) > 0:
return deepy_ui_settings.normalize_assistant_tool_ui_settings(**self.session.tool_ui_settings)
return deepy_ui_settings.normalize_assistant_tool_ui_settings()
def _auto_cancel_queue_tasks_enabled(self) -> bool:
return normalize_deepy_auto_cancel_queue_tasks(self._server_config().get(DEEPY_AUTO_CANCEL_QUEUE_TASKS_KEY, DEEPY_AUTO_CANCEL_QUEUE_TASKS_DEFAULT))
def _clear_inline_queue_client_id(self, client_id: str) -> bool:
client_id = str(client_id or "").strip()
if len(client_id) == 0 or not isinstance(self.gen, dict):
return False
def _matches(item):
if not isinstance(item, dict):
return False
if str(item.get("client_id", "") or "").strip() == client_id:
return True
params = item.get("params", None)
return isinstance(params, dict) and str(params.get("client_id", "") or "").strip() == client_id
inline_queue = self.gen.get("inline_queue", None)
if _matches(inline_queue):
self.gen.pop("inline_queue", None)
return True
if isinstance(inline_queue, list):
remaining_inline = [item for item in inline_queue if not _matches(item)]
if len(remaining_inline) != len(inline_queue):
if remaining_inline:
self.gen["inline_queue"] = remaining_inline
else:
self.gen.pop("inline_queue", None)
return True
return False
def _get_effective_tool_model_def(self, tool_name: str) -> dict[str, Any]:
variant = self.get_tool_variant(tool_name)
if len(variant) == 0:
return {}
try:
model_def = deepy_tool_settings.get_tool_variant_model_def(tool_name, variant)
except Exception:
return {}
return dict(model_def or {}) if isinstance(model_def, dict) else {}
def _get_deepy_tool_config(self, tool_name: str) -> dict[str, Any]:
deepy_tools = self._get_effective_tool_model_def(tool_name).get("deepy_tools", None)
if not isinstance(deepy_tools, dict):
return {}
tool_config = deepy_tools.get(str(tool_name or "").strip(), None)
return dict(tool_config or {}) if isinstance(tool_config, dict) else {}
def _get_image_start_target(self, tool_name: str) -> str:
target = str(self._get_deepy_tool_config(tool_name).get("image_start", "image_start") or "image_start").strip()
return "image_refs" if target == "image_refs" else "image_start"
def get_tool_variant(self, tool_name: str) -> str:
lookup_name = str(tool_name or "").strip()
setting_key = {
"gen_image": "image_generator_variant",
"edit_image": "image_editor_variant",
"gen_video": "video_generator_variant",
"gen_video_with_speech": "video_with_speech_variant",
"gen_speech_from_description": "speech_from_description_variant",
"gen_speech_from_sample": "speech_from_sample_variant",
}.get(lookup_name, "")
if len(setting_key) > 0:
return str(self._get_tool_ui_settings().get(setting_key, "") or "").strip()
return ""
def get_tool_template_filename(self, tool_name: str) -> str:
try:
variant = self.get_tool_variant(tool_name)
except Exception:
variant = ""
if len(variant) == 0:
return ""
template_name = Path(variant).name
if len(template_name) == 0:
return ""
if template_name.lower().endswith(".json"):
return template_name
return f"{template_name}.json"
def get_tool_transcript_label(self, tool_name: str) -> str:
label = self.get_tool_display_name(tool_name)
if str(tool_name or "").strip() not in {"gen_image", "edit_image", "gen_video", "gen_speech_from_description", "gen_speech_from_sample", "gen_video_with_speech"}:
return label
template_label = Path(self.get_tool_template_filename(tool_name)).stem.strip()
return label if len(template_label) == 0 else f"{label} [{template_label}]"
def _apply_generation_overrides(self, task: dict[str, Any], *, include_num_frames: bool) -> dict[str, Any]:
ui_settings = self._get_tool_ui_settings()
if ui_settings["use_template_properties"]:
return task
task["resolution"] = f"{ui_settings['width']}x{ui_settings['height']}"
task["seed"] = int(ui_settings["seed"])
if include_num_frames:
task["video_length"] = int(ui_settings["num_frames"])
return task
def _build_generation_task(self, tool_name: str, variant: str, *, prompt: str, client_id: str, **kwargs) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
try:
task = deepy_tool_settings.build_generation_task(tool_name, variant, prompt=prompt, client_id=client_id, **kwargs)
except ValueError as exc:
return None, {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": str(prompt or "").strip(),
"error": str(exc),
}
return task, None
def _sync_recent_media(self, max_items: int = 5) -> None:
if self.session is None:
return
file_list, file_settings_list, audio_file_list, audio_file_settings_list = self.get_processed_queue(self.gen)
media_registry.sync_recent_generated_media(self.session, file_list, file_settings_list, max_items=max_items)
media_registry.sync_recent_generated_media(self.session, audio_file_list, audio_file_settings_list, max_items=max_items)
def _queue_contains_client_id(self, queue: list[Any], client_id: str) -> bool:
lookup_client_id = str(client_id or "").strip()
if len(lookup_client_id) == 0:
return False
return any(isinstance(item, dict) and isinstance(item.get("params"), dict) and str(item["params"].get("client_id", "") or "").strip() == lookup_client_id for item in list(queue or []))
def _compact_media_payload(self, record: dict[str, Any], why: str = "") -> dict[str, Any]:
payload = {
"media_id": record.get("media_id", ""),
"media_type": record.get("media_type", ""),
"label": record.get("label", ""),
"source": record.get("source", ""),
"filename": record.get("filename", ""),
}
prompt_summary = str(record.get("prompt_summary", "") or "").strip()
if len(prompt_summary) > 0:
payload["prompt_summary"] = prompt_summary
if len(str(why or "").strip()) > 0:
payload["why"] = str(why).strip()
return payload
def _normalize_selected_media_type(self, media_type: str | None) -> str:
normalized = str(media_type or "").strip().lower()
if normalized in {"", "any", "all"}:
return "all"
if normalized in {"image", "video", "audio"}:
return normalized
return "all"
def _selected_media_payload(self, media_record: dict[str, Any], why: str = "") -> dict[str, Any]:
payload = self._compact_media_payload(media_record, why=why)
payload["path"] = str(media_record.get("path", "")).strip()
payload.update(self._get_selected_video_position(media_record))
return payload
def _get_selected_runtime_snapshot(self) -> dict[str, Any] | None:
snapshot = {}
visual_media_record, _error_result = self._get_selected_media_record_from_source("video", "all")
if visual_media_record is not None:
snapshot["selected_visual_media_id"] = str(visual_media_record.get("media_id", "") or "").strip()
snapshot["selected_visual_media_type"] = str(visual_media_record.get("media_type", "") or "").strip()
label = str(visual_media_record.get("label", "") or "").strip()
if len(label) > 0:
snapshot["selected_visual_media_label"] = label
if snapshot["selected_visual_media_type"] == "video":
video_position = self._get_selected_video_position(visual_media_record)
if "current_time_seconds" in video_position:
snapshot["selected_visual_current_time_seconds"] = video_position["current_time_seconds"]
if "current_frame_no" in video_position:
snapshot["selected_visual_current_frame_no"] = video_position["current_frame_no"]
audio_media_record, _error_result = self._get_selected_media_record_from_source("audio", "audio")
if audio_media_record is not None:
snapshot["selected_audio_media_id"] = str(audio_media_record.get("media_id", "") or "").strip()
snapshot["selected_audio_media_type"] = str(audio_media_record.get("media_type", "") or "").strip()
label = str(audio_media_record.get("label", "") or "").strip()
if len(label) > 0:
snapshot["selected_audio_media_label"] = label
return snapshot if len(snapshot) > 1 else None
def _is_selected_reference(self, reference: str) -> bool:
return _SELECTED_REFERENCE_RE.search(str(reference or "").strip()) is not None
def _get_selected_media_record_from_source(self, source: str, requested_media_type: str = "all") -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
requested_label = self._normalize_selected_media_type(requested_media_type)
if self.session is None:
return None, {"status": "error", "media_type": requested_label, "error": "Assistant session is not available."}
file_list, file_settings_list, audio_file_list, audio_file_settings_list = self.get_processed_queue(self.gen)
source = "audio" if str(source or "").strip().lower() == "audio" else "video"
if source == "audio":
raw_choice = (self.gen or {}).get("audio_selected", -1)
file_list, file_settings_list = list(audio_file_list or []), list(audio_file_settings_list or [])
else:
raw_choice = (self.gen or {}).get("selected", -1)
file_list, file_settings_list = list(file_list or []), list(file_settings_list or [])
try:
choice = int(raw_choice if raw_choice is not None else -1)
except Exception:
choice = -1
if choice < 0 or choice >= len(file_list):
gallery_label = "audio gallery" if source == "audio" else "image/video gallery"
return None, {"status": "error", "media_type": requested_label, "error": f"No media is currently selected in the WanGP {gallery_label}."}
selected_path = str(file_list[choice] or "").strip()
selected_settings = file_settings_list[choice] if choice < len(file_settings_list) and isinstance(file_settings_list[choice], dict) else None
selected_client_id = str((selected_settings or {}).get("client_id", "") or "").strip()
selected_gallery_media_type = "audio" if source == "audio" else "video"
if len(selected_client_id) > 0 and (source == "audio" or deepy_video_tools.has_video_extension(selected_path)):
latest_path, latest_settings = media_registry.find_last_gallery_media_by_client(file_list, file_settings_list, selected_client_id, media_type=selected_gallery_media_type)
if latest_path is not None:
selected_path = latest_path
selected_settings = latest_settings if isinstance(latest_settings, dict) else None
media_record = media_registry.register_media(
self.session,
selected_path,
settings=selected_settings,
source="deepy" if str((selected_settings or {}).get("client_id", "") or "").strip().startswith("ai_") else "wangp",
client_id=str((selected_settings or {}).get("client_id", "") or "").strip(),
)
if media_record is None:
return None, {"status": "error", "media_type": requested_label, "error": "The currently selected gallery item is not a supported media file."}
actual_media_type = str(media_record.get("media_type", "") or "").strip() or "unknown media type"
resolved_media_type = media_registry.normalize_media_type(requested_media_type)
if resolved_media_type != "any" and actual_media_type != resolved_media_type:
return None, {
"status": "error",
"media_type": resolved_media_type,
"selected_media_type": actual_media_type,
"actual_media_type": actual_media_type,
"error": f"The currently selected media is a {actual_media_type}, not a {resolved_media_type}.",
}
return media_record, None
def _get_all_selected_media_records(self) -> tuple[dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None]:
visual_media_record, _visual_error = self._get_selected_media_record_from_source("video", "all")
audio_media_record, _audio_error = self._get_selected_media_record_from_source("audio", "audio")
if visual_media_record is None and audio_media_record is None:
return None, None, {"status": "error", "media_type": "all", "error": "No media is currently selected in either WanGP gallery."}
return visual_media_record, audio_media_record, None
def _get_selected_media_record(self, requested_media_type: str = "all") -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
resolved_media_type = self._normalize_selected_media_type(requested_media_type)
if resolved_media_type == "audio":
return self._get_selected_media_record_from_source("audio", "audio")
if resolved_media_type in {"image", "video"}:
return self._get_selected_media_record_from_source("video", resolved_media_type)
visual_media_record, audio_media_record, error_result = self._get_all_selected_media_records()
if error_result is not None:
return None, error_result
if visual_media_record is None:
return audio_media_record, None
if audio_media_record is None:
return visual_media_record, None
return None, {
"status": "error",
"media_type": "all",
"error": "Both a visual selection and an audio selection exist. Request image, video, or audio explicitly, or use Get Selected Media with media_type='all'.",
}
def _get_selected_video_position(self, media_record: dict[str, Any]) -> dict[str, Any]:
if str(media_record.get("media_type", "") or "").strip() != "video":
return {}
try:
current_time = float((self.gen or {}).get("selected_video_time", 0.0) or 0.0)
except Exception:
current_time = 0.0
current_time = max(0.0, current_time)
try:
media_path = str(media_record.get("path", "")).strip()
_fps, _width, _height, _frame_count = get_video_info(media_path)
except Exception:
media_path = ""
try:
frame_no = deepy_video_tools.resolve_video_frame_no(media_path, time_seconds=current_time) if len(media_path) > 0 else 0
except Exception:
frame_no = 0
return {"current_time_seconds": round(current_time, 3), "current_frame_no": frame_no}
def _register_tool_media(self, path: str, settings: dict[str, Any], label: str | None = None) -> dict[str, Any] | None:
if self.session is None:
return None
return media_registry.register_media(
self.session,
path,
settings=settings,
source="deepy",
client_id=str(settings.get("client_id", "") or "").strip(),
label=label,
)
def _resolve_direct_output_path(self, file_path: str, is_image: bool, audio_only: bool) -> str:
file_path = str(file_path or "").strip()
if len(file_path) == 0:
raise RuntimeError("Output file path is empty.")
if callable(self.get_output_filepath):
resolved = str(self.get_output_filepath(file_path, is_image, audio_only) or "").strip()
if len(resolved) > 0:
return resolved
return os.path.abspath(os.path.normpath(file_path))
def _record_direct_media(self, output_path: str, settings: dict[str, Any], *, is_image: bool, audio_only: bool, label: str | None = None, persist_metadata: bool = True) -> dict[str, Any] | None:
if not os.path.isfile(output_path):
raise RuntimeError(f"Output file was not created: {output_path}")
if not callable(self.record_file_metadata):
raise RuntimeError("WanGP direct media recording is not available.")
self.record_file_metadata(output_path, settings if persist_metadata else None, is_image, audio_only, self.gen)
self.send_cmd("refresh_gallery", {"path": output_path})
return self._register_tool_media(output_path, settings, label=label)
def _server_config(self) -> dict[str, Any]:
if callable(self.get_server_config):
return dict(self.get_server_config() or {})
return {}
def _get_video_output_settings(self) -> tuple[str, str]:
server_config = self._server_config()
return str(server_config.get("video_output_codec", "libx264_8") or "libx264_8"), str(server_config.get("video_container", "mp4") or "mp4")
def _get_standalone_audio_output_codec(self) -> str:
server_config = self._server_config()
return str(server_config.get("audio_stand_alone_output_codec", "wav") or "wav")
def _get_video_audio_output_codec(self) -> str:
server_config = self._server_config()
return str(server_config.get("audio_output_codec", "aac_128") or "aac_128")
def _resolve_image_media(self, media_id: str, parameter_name: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
media_id = str(media_id or "").strip()
if len(media_id) == 0:
return None, None
if self.session is None:
return None, {"status": "error", parameter_name: media_id, "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return None, {"status": "error", parameter_name: media_id, "error": f"Unknown media id for {parameter_name}."}
if media_record.get("media_type") != "image":
actual_media_type = str(media_record.get("media_type", "") or "").strip() or "unknown media type"
return None, {
"status": "error",
parameter_name: media_record.get("media_id", ""),
"actual_media_type": actual_media_type,
"media_type": actual_media_type,
"error": f"{parameter_name} must reference an image, not a {actual_media_type}.",
}
return media_record, None
def _resolve_video_media(self, media_id: str, parameter_name: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
media_id = str(media_id or "").strip()
if len(media_id) == 0:
return None, {"status": "error", parameter_name: media_id, "error": f"{parameter_name} is required."}
if self.session is None:
return None, {"status": "error", parameter_name: media_id, "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return None, {"status": "error", parameter_name: media_id, "error": f"Unknown media id for {parameter_name}."}
if media_record.get("media_type") != "video":
actual_media_type = str(media_record.get("media_type", "") or "").strip() or "unknown media type"
return None, {
"status": "error",
parameter_name: media_record.get("media_id", ""),
"actual_media_type": actual_media_type,
"media_type": actual_media_type,
"error": f"{parameter_name} must reference a video, not a {actual_media_type}.",
}
return media_record, None
def _resolve_audio_media(self, media_id: str, parameter_name: str) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
media_id = str(media_id or "").strip()
if len(media_id) == 0:
return None, {"status": "error", parameter_name: media_id, "error": f"{parameter_name} is required."}
if self.session is None:
return None, {"status": "error", parameter_name: media_id, "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return None, {"status": "error", parameter_name: media_id, "error": f"Unknown media id for {parameter_name}."}
if media_record.get("media_type") != "audio":
actual_media_type = str(media_record.get("media_type", "") or "").strip() or "unknown media type"
return None, {
"status": "error",
parameter_name: media_record.get("media_id", ""),
"actual_media_type": actual_media_type,
"media_type": actual_media_type,
"error": f"{parameter_name} must reference an audio file, not a {actual_media_type}.",
}
return media_record, None
def _parse_time_value(self, value: Any, parameter_name: str, *, required: bool = False) -> tuple[float | None, dict[str, Any] | None]:
if value is None or str(value).strip() == "":
return (None, {"status": "error", "error": f"{parameter_name} is required."}) if required else (None, None)
try:
resolved = float(value)
except Exception:
return None, {"status": "error", "error": f"{parameter_name} must be a number."}
if resolved < 0:
return None, {"status": "error", "error": f"{parameter_name} must be >= 0."}
return resolved, None
def _build_direct_media_settings(self, source_media: dict[str, Any], comments: str, **updates: Any) -> dict[str, Any]:
settings = dict(source_media.get("settings", {}) or {})
settings["client_id"] = _next_ai_client_id()
settings["comments"] = str(comments or "").strip()
end_time = time.time()
settings["creation_date"] = datetime.fromtimestamp(end_time).isoformat(timespec="seconds")
settings["creation_timestamp"] = int(end_time)
for key, value in updates.items():
if value is not None:
settings[key] = value
return settings
def _build_direct_image_settings(self, comments: str, width: int, height: int, **updates: Any) -> dict[str, Any]:
end_time = time.time()
settings = {
"client_id": _next_ai_client_id(),
"comments": str(comments or "").strip(),
"creation_date": datetime.fromtimestamp(end_time).isoformat(timespec="seconds"),
"creation_timestamp": int(end_time),
"image_mode": 1,
"resolution": f"{int(width)}x{int(height)}",
}
for key, value in updates.items():
if value is not None:
settings[key] = value
return settings
def _update_video_metadata_fields(self, output_path: str, settings: dict[str, Any]) -> None:
try:
fps, width, height, frames_count = get_video_info(output_path)
settings["resolution"] = f"{width}x{height}"
settings["video_length"] = int(frames_count)
if fps > 0:
settings["duration_seconds"] = round(frames_count / fps, 3)
except Exception:
pass
def _update_audio_metadata_fields(self, output_path: str, settings: dict[str, Any]) -> None:
duration = deepy_video_tools.get_media_duration(output_path)
if duration is not None:
settings["duration_seconds"] = round(duration, 3)
def _get_output_duration_seconds(self, output_path: str, file_settings: dict[str, Any] | None = None) -> float | None:
duration = deepy_video_tools.get_media_duration(output_path)
return None if duration is None else round(duration, 3)
def _queue_generation_task(self, task: dict[str, Any], *, activity_label: str, output_label: str | None = None, gallery_media_type: str = "image") -> dict[str, Any]:
if not isinstance(self.gen, dict):
raise RuntimeError("WanGP generation queue is not available.")
client_id = str(task.get("client_id", "") or "").strip()
prompt = str(task.get("prompt", "") or "").strip()
resolution = str(task.get("resolution", "") or "").strip()
gen = self.gen
self.get_processed_queue(gen)
self._set_status(f"Queueing {activity_label}...", kind="tool")
self._update_tool_progress("running", "Queued", {"status": "queued", "client_id": client_id, "prompt": prompt, "resolution": resolution})
task["priority"] = True
gen["inline_queue"] = task
self.send_cmd("load_queue_trigger", {"client_id": client_id})
self._log(f"Queued {activity_label} for {client_id}")
queue_detect_deadline = time.time() + 5
while time.time() < queue_detect_deadline:
if self._is_interrupted():
return self._interrupted_result(client_id, task)
queue_errors = gen.get("queue_errors", None) or {}
if client_id in queue_errors:
error_text = str(queue_errors[client_id][0])
self._log(f"Queue error detected for {client_id}: {error_text}")
self._set_status(f"{activity_label.capitalize()} failed: {error_text}", kind="error")
result = {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": prompt,
"resolution": resolution,
"error": error_text,
}
self._update_tool_progress("error", "Error", result)
return result
queue = list(gen.get("queue", []) or [])
if self._queue_contains_client_id(queue, client_id):
self._set_status(f"{activity_label.capitalize()} started...", kind="tool")
self._update_tool_progress("running", "Running", {"status": "running", "client_id": client_id, "prompt": prompt, "resolution": resolution})
break
time.sleep(0.25)
while True:
if self._is_interrupted():
return self._interrupted_result(client_id, task)
queue_errors = gen.get("queue_errors", None) or {}
if client_id in queue_errors:
error_text = str(queue_errors[client_id][0])
self._log(f"Generation error detected for {client_id}: {error_text}")
self._set_status(f"{activity_label.capitalize()} failed: {error_text}", kind="error")
result = {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": prompt,
"resolution": resolution,
"error": error_text,
}
self._update_tool_progress("error", "Error", result)
return result
file_list, file_settings_list, audio_file_list, audio_file_settings_list = self.get_processed_queue(gen)
media_file_list = list(audio_file_list or []) if gallery_media_type == "audio" else list(file_list or [])
media_settings_list = list(audio_file_settings_list or []) if gallery_media_type == "audio" else list(file_settings_list or [])
queue = list(gen.get("queue", []) or [])
client_id_still_in_queue = self._queue_contains_client_id(queue, client_id)
if client_id_still_in_queue:
time.sleep(0.5)
continue
file_path, file_settings = media_registry.find_last_gallery_media_by_client(media_file_list, media_settings_list, client_id, media_type=gallery_media_type)
if file_path is not None and isinstance(file_settings, dict):
media_record = self._register_tool_media(str(file_path), file_settings, label=output_label)
result = {
"status": "done",
"client_id": client_id,
"output_file": str(file_path),
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"prompt": prompt,
"resolution": resolution,
"error": "",
}
if gallery_media_type in {"video", "audio"}:
result["output_duration"] = self._get_output_duration_seconds(str(file_path), file_settings)
self._log(f"{activity_label.capitalize()} completed for {client_id}: {file_path}")
self._set_status(f"{activity_label.capitalize()} finished.", kind="tool")
self.send_cmd("refresh_gallery", {"path": str(file_path)})
self._update_tool_progress("done", "Done", result)
return result
error_text = f"{activity_label.capitalize()} finished queue processing but no {gallery_media_type} output with client_id '{client_id}' was found in the gallery."
self._log(error_text)
self._set_status(error_text, kind="error")
result = {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": prompt,
"resolution": resolution,
"error": error_text,
}
self._update_tool_progress("error", "Error", result)
return result
@assistant_tool(
display_name="Generate Image",
description="Queue and generate an image from a text prompt inside WanGP, then wait until the output image is available.",
parameters={
"prompt": {
"type": "string",
"description": "The image generation prompt to send to WanGP.",
}
},
)
def gen_image(self, prompt: str) -> dict[str, Any]:
client_id = _next_ai_client_id()
generator_variant = self._get_tool_ui_settings()["image_generator_variant"]
template_file = self.get_tool_template_filename("gen_image")
task, error_result = self._build_generation_task("gen_image", generator_variant, prompt=prompt, client_id=client_id)
if error_result is not None:
error_result["generator_variant"] = generator_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
return error_result
task = self._apply_generation_overrides(task, include_num_frames=False)
if len(task["prompt"]) == 0:
self._set_status("Image generation failed: prompt is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": "",
"resolution": task["resolution"],
"error": "Prompt is empty.",
}
result = self._queue_generation_task(task, activity_label="image generation", output_label="Generated image")
result["generator_variant"] = generator_variant
if len(template_file) > 0:
result["template_file"] = template_file
return result
@assistant_tool(
display_name="Generate Video",
description="Queue and generate a video from a text prompt inside WanGP, optionally using a start image and an end image, then wait until the output video is available.",
parameters={
"prompt": {
"type": "string",
"description": "The video generation prompt to send to WanGP.",
},
"image_start": {
"type": "string",
"description": "Optional media id of the start image returned by Resolve Media.",
"required": False,
},
"image_end": {
"type": "string",
"description": "Optional media id of the end image returned by Resolve Media.",
"required": False,
},
},
)
def gen_video(self, prompt: str, image_start: str | None = None, image_end: str | None = None) -> dict[str, Any]:
self._sync_recent_media()
start_media, error_result = self._resolve_image_media(image_start or "", "image_start")
if error_result is not None:
error_result.update({"prompt": str(prompt or "").strip(), "output_file": ""})
return error_result
end_media, error_result = self._resolve_image_media(image_end or "", "image_end")
if error_result is not None:
error_result.update({"prompt": str(prompt or "").strip(), "output_file": ""})
return error_result
client_id = _next_ai_client_id()
generator_variant = self._get_tool_ui_settings()["video_generator_variant"]
template_file = self.get_tool_template_filename("gen_video")
task, error_result = self._build_generation_task(
"gen_video",
generator_variant,
prompt=prompt,
client_id=client_id,
image_start=None if start_media is None else str(start_media.get("path", "")).strip(),
image_end=None if end_media is None else str(end_media.get("path", "")).strip(),
)
if error_result is not None:
error_result["generator_variant"] = generator_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
if start_media is not None:
error_result["source_start_media_id"] = start_media.get("media_id", "")
if end_media is not None:
error_result["source_end_media_id"] = end_media.get("media_id", "")
return error_result
task = self._apply_generation_overrides(task, include_num_frames=True)
if len(task["prompt"]) == 0:
self._set_status("Video generation failed: prompt is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": "",
"resolution": task.get("resolution", ""),
"error": "Prompt is empty.",
}
result = self._queue_generation_task(task, activity_label="video generation", output_label="Generated video", gallery_media_type="video")
result["generator_variant"] = generator_variant
if len(template_file) > 0:
result["template_file"] = template_file
if start_media is not None:
result["source_start_media_id"] = start_media.get("media_id", "")
if end_media is not None:
result["source_end_media_id"] = end_media.get("media_id", "")
return result
@assistant_tool(
display_name="Generate Video With Speech",
description="Queue and generate a talking video from a text prompt, a start image, and a speech audio clip inside WanGP, then wait until the output video is available.",
parameters={
"prompt": {
"type": "string",
"description": "The video generation prompt to send to WanGP.",
},
"image_start": {
"type": "string",
"description": "The media id of the start image returned by Resolve Media.",
},
"audio_media_id": {
"type": "string",
"description": "The media id of the speech audio returned by Resolve Media.",
},
},
)
def gen_video_with_speech(self, prompt: str, image_start: str, audio_media_id: str) -> dict[str, Any]:
self._sync_recent_media()
start_media, error_result = self._resolve_image_media(image_start, "image_start")
if error_result is not None:
error_result.update({"prompt": str(prompt or "").strip(), "output_file": ""})
return error_result
audio_media, error_result = self._resolve_audio_media(audio_media_id, "audio_media_id")
if error_result is not None:
error_result.update({"prompt": str(prompt or "").strip(), "output_file": ""})
return error_result
client_id = _next_ai_client_id()
generator_variant = self.get_tool_variant("gen_video_with_speech")
template_file = self.get_tool_template_filename("gen_video_with_speech")
task, error_result = self._build_generation_task(
"gen_video_with_speech",
generator_variant,
prompt=prompt,
client_id=client_id,
audio_guide=str(audio_media.get("path", "")).strip(),
image_start_target=self._get_image_start_target("gen_video_with_speech"),
image_start=str(start_media.get("path", "")).strip(),
)
if error_result is not None:
error_result["generator_variant"] = generator_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
error_result["source_start_media_id"] = start_media.get("media_id", "")
error_result["source_audio_media_id"] = audio_media.get("media_id", "")
error_result["image_start_target"] = self._get_image_start_target("gen_video_with_speech")
return error_result
task = self._apply_generation_overrides(task, include_num_frames=True)
if len(task["prompt"]) == 0:
self._set_status("Video generation failed: prompt is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": "",
"resolution": task.get("resolution", ""),
"error": "Prompt is empty.",
}
if len(str(task.get("audio_guide", "") or "").strip()) == 0:
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": str(prompt or "").strip(),
"resolution": task.get("resolution", ""),
"error": "Speech audio path is empty.",
}
result = self._queue_generation_task(task, activity_label="video generation", output_label="Generated video", gallery_media_type="video")
result["generator_variant"] = generator_variant
if len(template_file) > 0:
result["template_file"] = template_file
result["source_start_media_id"] = start_media.get("media_id", "")
result["source_audio_media_id"] = audio_media.get("media_id", "")
result["image_start_target"] = self._get_image_start_target("gen_video_with_speech")
return result
@assistant_tool(
display_name="Generate Speech From Description",
description="Queue and generate a speech audio clip from text, using a voice description stored in alt_prompt, then wait until the output audio is available.",
parameters={
"prompt": {
"type": "string",
"description": "The speech content to synthesize.",
},
"voice_description": {
"type": "string",
"description": "A short description of the desired voice, tone, or speaking style.",
},
},
)
def gen_speech_from_description(self, prompt: str, voice_description: str) -> dict[str, Any]:
client_id = _next_ai_client_id()
generator_variant = self.get_tool_variant("gen_speech_from_description")
template_file = self.get_tool_template_filename("gen_speech_from_description")
task, error_result = self._build_generation_task("gen_speech_from_description", generator_variant, prompt=prompt, client_id=client_id, alt_prompt=voice_description)
if error_result is not None:
error_result["generator_variant"] = generator_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
return error_result
if len(task["prompt"]) == 0:
self._set_status("Speech generation failed: prompt is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": "",
"error": "Prompt is empty.",
}
if len(str(task.get("alt_prompt", "") or "").strip()) == 0:
self._set_status("Speech generation failed: voice description is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": str(prompt or "").strip(),
"error": "voice_description is required.",
}
result = self._queue_generation_task(task, activity_label="speech generation", output_label="Generated speech", gallery_media_type="audio")
result["generator_variant"] = generator_variant
if len(template_file) > 0:
result["template_file"] = template_file
result["voice_description"] = str(task.get("alt_prompt", "") or "").strip()
return result
@assistant_tool(
display_name="Generate Speech From Sample",
description="Queue and generate a speech audio clip from text, cloning the voice from a previously resolved audio sample, then wait until the output audio is available.",
parameters={
"prompt": {
"type": "string",
"description": "The speech content to synthesize.",
},
"media_id": {
"type": "string",
"description": "The media id of the audio sample returned by Resolve Media.",
},
},
)
def gen_speech_from_sample(self, prompt: str, media_id: str) -> dict[str, Any]:
self._sync_recent_media()
sample_media, error_result = self._resolve_audio_media(media_id, "media_id")
if error_result is not None:
error_result.update({"prompt": str(prompt or "").strip(), "output_file": ""})
return error_result
client_id = _next_ai_client_id()
generator_variant = self.get_tool_variant("gen_speech_from_sample")
template_file = self.get_tool_template_filename("gen_speech_from_sample")
task, error_result = self._build_generation_task(
"gen_speech_from_sample",
generator_variant,
prompt=prompt,
client_id=client_id,
audio_guide=str(sample_media.get("path", "")).strip(),
)
if error_result is not None:
error_result["generator_variant"] = generator_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
error_result["source_media_id"] = sample_media.get("media_id", "")
return error_result
if len(task["prompt"]) == 0:
self._set_status("Speech generation failed: prompt is empty.", kind="error")
return {
"status": "error",
"client_id": client_id,
"output_file": "",
"prompt": "",
"error": "Prompt is empty.",
}
if len(str(task.get("audio_guide", "") or "").strip()) == 0:
return {
"status": "error",
"client_id": client_id,
"media_id": sample_media.get("media_id", ""),
"output_file": "",
"prompt": str(prompt or "").strip(),
"error": "Audio sample path is empty.",
}
result = self._queue_generation_task(task, activity_label="speech generation", output_label="Generated speech", gallery_media_type="audio")
result["generator_variant"] = generator_variant
if len(template_file) > 0:
result["template_file"] = template_file
result["source_media_id"] = sample_media.get("media_id", "")
return result
@assistant_tool(
display_name="Edit Image",
description="Edit a previously resolved image using an instruction prompt inside WanGP and wait until the edited image is available.",
parameters={
"media_id": {
"type": "string",
"description": "The media id returned by Resolve Media.",
},
"prompt": {
"type": "string",
"description": "The instruction prompt describing how to modify the image.",
},
},
)
def edit_image(self, media_id: str, prompt: str) -> dict[str, Any]:
self._sync_recent_media()
if self.session is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "prompt": str(prompt or "").strip(), "output_file": "", "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "prompt": str(prompt or "").strip(), "output_file": "", "error": "Unknown media id."}
if media_record.get("media_type") != "image":
return {
"status": "error",
"media_id": media_record.get("media_id", ""),
"media_type": media_record.get("media_type", ""),
"prompt": str(prompt or "").strip(),
"output_file": "",
"error": "Edit Image currently supports images only.",
}
editor_variant = self._get_tool_ui_settings()["image_editor_variant"]
template_file = self.get_tool_template_filename("edit_image")
client_id = _next_ai_client_id()
task, error_result = self._build_generation_task(
"edit_image",
editor_variant,
prompt=prompt,
client_id=client_id,
image_refs=[str(media_record.get("path", "")).strip()],
)
if error_result is not None:
error_result["editor_variant"] = editor_variant
if len(template_file) > 0:
error_result["template_file"] = template_file
error_result["media_id"] = media_record.get("media_id", "")
return error_result
task = self._apply_generation_overrides(task, include_num_frames=False)
if len(task["prompt"]) == 0:
return {
"status": "error",
"media_id": media_record.get("media_id", ""),
"prompt": "",
"output_file": "",
"error": "Prompt is empty.",
}
result = self._queue_generation_task(task, activity_label="image editing", output_label="Edited image")
result["editor_variant"] = editor_variant
if len(template_file) > 0:
result["template_file"] = template_file
result["source_media_id"] = media_record.get("media_id", "")
return result
@assistant_tool(
display_name="Create Color Frame",
description="Create a solid-color image with the requested width and height, rounded to the nearest multiple of 16, and add it to WanGP galleries. Use this for blank frames, color cards, or transition plates.",
parameters={
"width": {
"type": "integer",
"description": "Output image width in pixels.",
},
"height": {
"type": "integer",
"description": "Output image height in pixels.",
},
"color": {
"type": "string",
"description": "Optional fill color. Accepts common names like black, white, red, or hex values like #000000.",
"required": False,
},
},
pause_runtime=False,
)
def create_color_frame(self, width: int, height: int, color: str = "black") -> dict[str, Any]:
try:
width = int(width)
height = int(height)
except Exception:
return {"status": "error", "width": width, "height": height, "color": str(color or "").strip() or "black", "output_file": "", "error": "width and height must be integers."}
if width <= 0 or height <= 0:
return {"status": "error", "width": width, "height": height, "color": str(color or "").strip() or "black", "output_file": "", "error": "width and height must be >= 1."}
width = max(16, int(round(width / 16.0) * 16))
height = max(16, int(round(height / 16.0) * 16))
resolved_color = str(color or "black").strip() or "black"
try:
rgb_color = ImageColor.getrgb(resolved_color)
except Exception:
return {"status": "error", "width": width, "height": height, "color": resolved_color, "output_file": "", "error": "color must be a valid color name or hex value."}
if len(rgb_color) == 4:
rgb_color = tuple(rgb_color[:3])
safe_color_name = re.sub(r"[^a-z0-9]+", "_", resolved_color.lower()).strip("_") or "color"
output_name = f"color_{safe_color_name}_{width}x{height}.png"
self._set_status("Creating color frame...", kind="tool")
self._update_tool_progress("running", "Creating", {"status": "running", "width": width, "height": height, "color": resolved_color})
output_path = self._resolve_direct_output_path(output_name, True, False)
try:
image = Image.new("RGB", (width, height), rgb_color)
image.save(output_path)
except Exception as exc:
result = {"status": "error", "width": width, "height": height, "color": resolved_color, "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Color frame creation failed: {exc}", kind="error")
return result
settings = self._build_direct_image_settings(f'Created solid {resolved_color} image at {width}x{height}', width, height, prompt=f"Solid {resolved_color} image", seed=-1)
media_record = self._record_direct_media(output_path, settings, is_image=True, audio_only=False, label="Color frame", persist_metadata=False)
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"width": width,
"height": height,
"resolution": f"{width}x{height}",
"color": resolved_color,
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Color frame created.", kind="tool")
return result
@assistant_tool(
display_name="Extract Image",
description="Extract one image from a previously resolved video at a specific frame number or exact playback time and add it to WanGP galleries.",
parameters={
"media_id": {
"type": "string",
"description": "The media id for the source video returned by Resolve Media.",
},
"frame_no": {
"type": "integer",
"description": "Optional frame number to extract from the source video.",
"required": False,
},
"time_seconds": {
"type": "number",
"description": "Optional exact playback time in seconds. Prefer this for the currently selected video frame because it matches the player position more accurately.",
"required": False,
},
},
pause_runtime=False,
)
def extract_image(self, media_id: str, frame_no: int | None = None, time_seconds: float | None = None) -> dict[str, Any]:
self._sync_recent_media()
source_media, error_result = self._resolve_video_media(media_id, "media_id")
if error_result is not None:
return error_result
try:
frame_no = None if frame_no is None or str(frame_no).strip() == "" else int(frame_no)
except Exception:
return {"status": "error", "media_id": str(media_id or "").strip(), "frame_no": frame_no, "output_file": "", "error": "frame_no must be an integer."}
time_seconds, error_result = self._parse_time_value(time_seconds, "time_seconds", required=False)
if error_result is not None:
return {"status": "error", "media_id": str(media_id or "").strip(), "frame_no": frame_no, "time_seconds": time_seconds, "output_file": "", "error": error_result["error"]}
if frame_no is None and time_seconds is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "frame_no": None, "time_seconds": None, "output_file": "", "error": "frame_no or time_seconds is required."}
self._set_status("Extracting image...", kind="tool")
self._update_tool_progress("running", "Extracting", {"status": "running", "media_id": source_media.get("media_id", ""), "frame_no": frame_no, "time_seconds": time_seconds})
source_path = str(source_media.get("path", "")).strip()
try:
resolved_frame_no = deepy_video_tools.resolve_video_frame_no(source_path, frame_no=frame_no, time_seconds=time_seconds)
except Exception as exc:
return {"status": "error", "media_id": source_media.get("media_id", ""), "frame_no": frame_no, "time_seconds": time_seconds, "output_file": "", "error": str(exc)}
source_name = os.path.splitext(os.path.basename(source_path))[0]
output_suffix = f"frame{resolved_frame_no}" if time_seconds is None else f"frame{resolved_frame_no}_t{int(round(float(time_seconds or 0.0) * 1000.0))}ms"
output_name = f"{source_name}_{output_suffix}.png"
output_path = self._resolve_direct_output_path(output_name, True, False)
try:
deepy_video_tools.extract_video_frame(source_path, output_path, frame_no=frame_no, time_seconds=time_seconds)
except Exception as exc:
result = {
"status": "error",
"media_id": source_media.get("media_id", ""),
"frame_no": resolved_frame_no,
"time_seconds": time_seconds,
"output_file": "",
"error": str(exc),
}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Image extraction failed: {exc}", kind="error")
return result
comments = f'Extracted frame {resolved_frame_no} from "{os.path.basename(source_path)}"' if time_seconds is None else f'Extracted frame {resolved_frame_no} at {time_seconds:.3f}s from "{os.path.basename(source_path)}"'
extracted_settings = self._build_direct_media_settings(source_media, comments)
media_record = self._record_direct_media(output_path, extracted_settings, is_image=True, audio_only=False, label="Extracted image")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_media_id": source_media.get("media_id", ""),
"frame_no": resolved_frame_no,
"time_seconds": time_seconds,
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Image extracted.", kind="tool")
return result
@assistant_tool(
display_name="Extract Video",
description="Extract a video segment from a previously resolved video using start_time and either end_time or duration.",
parameters={
"media_id": {
"type": "string",
"description": "The media id for the source video returned by Resolve Media.",
},
"start_time": {
"type": "number",
"description": "Start time in seconds.",
},
"end_time": {
"type": "number",
"description": "Optional end time in seconds.",
"required": False,
},
"duration": {
"type": "number",
"description": "Optional segment duration in seconds.",
"required": False,
},
},
pause_runtime=False,
)
def extract_video(self, media_id: str, start_time: float, end_time: float | None = None, duration: float | None = None) -> dict[str, Any]:
self._sync_recent_media()
source_media, error_result = self._resolve_video_media(media_id, "media_id")
if error_result is not None:
return error_result
start_time, error_result = self._parse_time_value(start_time, "start_time", required=True)
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
end_time, error_result = self._parse_time_value(end_time, "end_time")
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
duration, error_result = self._parse_time_value(duration, "duration")
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
if end_time is not None and duration is not None:
return {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": "Specify either end_time or duration, not both."}
self._set_status("Extracting video...", kind="tool")
self._update_tool_progress("running", "Extracting", {"status": "running", "media_id": source_media.get("media_id", ""), "start_time": start_time, "end_time": end_time, "duration": duration})
source_path = str(source_media.get("path", "")).strip()
video_codec, video_container = self._get_video_output_settings()
base_name = os.path.splitext(os.path.basename(source_path))[0]
output_path = self._resolve_direct_output_path(f"{base_name}_clip{deepy_video_tools.get_video_container_extension(video_container)}", False, False)
try:
output_path = deepy_video_tools.extract_video(source_path, output_path, start_time=start_time, end_time=end_time, duration=duration, video_codec=video_codec, video_container=video_container, audio_codec=self._get_video_audio_output_codec())
except Exception as exc:
result = {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Video extraction failed: {exc}", kind="error")
return result
comments = f'Extracted video segment from "{os.path.basename(source_path)}" starting at {start_time}s'
if end_time is not None:
comments += f" ending at {end_time}s"
elif duration is not None:
comments += f" with duration {duration}s"
extracted_settings = self._build_direct_media_settings(source_media, comments)
self._update_video_metadata_fields(output_path, extracted_settings)
media_record = self._record_direct_media(output_path, extracted_settings, is_image=False, audio_only=False, label="Extracted video")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_media_id": source_media.get("media_id", ""),
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Video extracted.", kind="tool")
return result
@assistant_tool(
display_name="Extract Audio",
description="Extract audio from a previously resolved video or audio file using optional start_time, end_time, duration, and audio_track_no.",
parameters={
"media_id": {
"type": "string",
"description": "The media id for the source video or audio returned by Resolve Media.",
},
"start_time": {
"type": "number",
"description": "Optional start time in seconds. Defaults to the beginning.",
"required": False,
},
"end_time": {
"type": "number",
"description": "Optional end time in seconds.",
"required": False,
},
"duration": {
"type": "number",
"description": "Optional segment duration in seconds.",
"required": False,
},
"audio_track_no": {
"type": "integer",
"description": "Optional 1-based audio track number to extract. Defaults to 1.",
"required": False,
},
},
pause_runtime=False,
)
def extract_audio(self, media_id: str, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, audio_track_no: int | None = None) -> dict[str, Any]:
self._sync_recent_media()
if self.session is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "output_file": "", "error": "Assistant session is not available."}
source_media = media_registry.get_media_record(self.session, media_id)
if source_media is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "output_file": "", "error": "Unknown media id."}
if source_media.get("media_type") not in {"audio", "video"}:
actual_media_type = str(source_media.get("media_type", "") or "").strip() or "unknown media type"
return {"status": "error", "media_id": source_media.get("media_id", ""), "actual_media_type": actual_media_type, "media_type": actual_media_type, "output_file": "", "error": f"media_id must reference audio or video, not a {actual_media_type}."}
start_time, error_result = self._parse_time_value(start_time, "start_time")
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
end_time, error_result = self._parse_time_value(end_time, "end_time")
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
duration, error_result = self._parse_time_value(duration, "duration")
if error_result is not None:
error_result.update({"media_id": source_media.get("media_id", ""), "output_file": ""})
return error_result
try:
audio_track_no = None if audio_track_no is None or str(audio_track_no).strip() == "" else int(audio_track_no)
except Exception:
return {"status": "error", "media_id": source_media.get("media_id", ""), "audio_track_no": audio_track_no, "output_file": "", "error": "audio_track_no must be an integer."}
if audio_track_no is not None and audio_track_no <= 0:
return {"status": "error", "media_id": source_media.get("media_id", ""), "audio_track_no": audio_track_no, "output_file": "", "error": "audio_track_no must be >= 1."}
if end_time is not None and duration is not None:
return {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": "Specify either end_time or duration, not both."}
self._set_status("Extracting audio...", kind="tool")
self._update_tool_progress("running", "Extracting", {"status": "running", "media_id": source_media.get("media_id", ""), "start_time": start_time, "end_time": end_time, "duration": duration, "audio_track_no": audio_track_no})
source_path = str(source_media.get("path", "")).strip()
base_name = os.path.splitext(os.path.basename(source_path))[0]
audio_codec = self._get_standalone_audio_output_codec()
output_path = self._resolve_direct_output_path(f"{base_name}_audio{deepy_video_tools.get_audio_standalone_extension(audio_codec)}", False, True)
try:
output_path = deepy_video_tools.extract_audio(source_path, output_path, start_time=start_time, end_time=end_time, duration=duration, audio_track_no=audio_track_no, audio_codec=audio_codec)
except Exception as exc:
result = {"status": "error", "media_id": source_media.get("media_id", ""), "audio_track_no": audio_track_no, "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Audio extraction failed: {exc}", kind="error")
return result
comments = f'Extracted audio from "{os.path.basename(source_path)}"'
if audio_track_no is not None:
comments += f" using audio track {audio_track_no}"
if start_time is not None:
comments += f" starting at {start_time}s"
if end_time is not None:
comments += f" ending at {end_time}s"
elif duration is not None:
comments += f" with duration {duration}s"
extracted_settings = self._build_direct_media_settings(source_media, comments)
self._update_audio_metadata_fields(output_path, extracted_settings)
media_record = self._record_direct_media(output_path, extracted_settings, is_image=False, audio_only=True, label="Extracted audio")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_media_id": source_media.get("media_id", ""),
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"audio_track_no": 1 if audio_track_no is None else audio_track_no,
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Audio extracted.", kind="tool")
return result
@assistant_tool(
display_name="Mute Video",
description="Create a copy of a previously resolved video with all audio removed.",
parameters={
"media_id": {
"type": "string",
"description": "The media id for the source video returned by Resolve Media.",
},
},
pause_runtime=False,
)
def mute_video(self, media_id: str) -> dict[str, Any]:
self._sync_recent_media()
source_media, error_result = self._resolve_video_media(media_id, "media_id")
if error_result is not None:
return error_result
self._set_status("Muting video...", kind="tool")
self._update_tool_progress("running", "Muting", {"status": "running", "media_id": source_media.get("media_id", "")})
source_path = str(source_media.get("path", "")).strip()
_video_codec, video_container = self._get_video_output_settings()
base_name = os.path.splitext(os.path.basename(source_path))[0]
output_path = self._resolve_direct_output_path(f"{base_name}_muted{deepy_video_tools.get_video_container_extension(video_container)}", False, False)
try:
output_path = deepy_video_tools.mute_video(source_path, output_path)
except Exception as exc:
result = {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Video muting failed: {exc}", kind="error")
return result
muted_settings = self._build_direct_media_settings(source_media, f'Removed audio from "{os.path.basename(source_path)}"')
self._update_video_metadata_fields(output_path, muted_settings)
media_record = self._record_direct_media(output_path, muted_settings, is_image=False, audio_only=False, label="Muted video")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_media_id": source_media.get("media_id", ""),
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Video muted.", kind="tool")
return result
@assistant_tool(
display_name="Replace Audio",
description="Replace the soundtrack of a previously resolved video with a previously resolved audio file.",
parameters={
"video_id": {
"type": "string",
"description": "The media id for the source video returned by Resolve Media.",
},
"audio_id": {
"type": "string",
"description": "The media id for the replacement audio returned by Resolve Media.",
},
},
pause_runtime=False,
)
def replace_audio(self, video_id: str, audio_id: str) -> dict[str, Any]:
self._sync_recent_media()
video_media, error_result = self._resolve_video_media(video_id, "video_id")
if error_result is not None:
return error_result
audio_media, error_result = self._resolve_audio_media(audio_id, "audio_id")
if error_result is not None:
return error_result
self._set_status("Replacing video audio...", kind="tool")
self._update_tool_progress("running", "Replacing", {"status": "running", "video_id": video_media.get("media_id", ""), "audio_id": audio_media.get("media_id", "")})
video_path = str(video_media.get("path", "")).strip()
audio_path = str(audio_media.get("path", "")).strip()
_video_codec, video_container = self._get_video_output_settings()
base_name = os.path.splitext(os.path.basename(video_path))[0]
output_path = self._resolve_direct_output_path(f"{base_name}_audio_replaced{deepy_video_tools.get_video_container_extension(video_container)}", False, False)
try:
output_path = deepy_video_tools.replace_audio(video_path, audio_path, output_path, audio_codec=self._get_video_audio_output_codec())
except Exception as exc:
result = {"status": "error", "video_id": video_media.get("media_id", ""), "audio_id": audio_media.get("media_id", ""), "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Audio replacement failed: {exc}", kind="error")
return result
replaced_settings = self._build_direct_media_settings(video_media, f'Replaced audio of "{os.path.basename(video_path)}" with "{os.path.basename(audio_path)}"')
self._update_video_metadata_fields(output_path, replaced_settings)
media_record = self._record_direct_media(output_path, replaced_settings, is_image=False, audio_only=False, label="Video with replaced audio")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_video_id": video_media.get("media_id", ""),
"source_audio_id": audio_media.get("media_id", ""),
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Video audio replaced.", kind="tool")
return result
@assistant_tool(
display_name="Resize Crop",
description="Resize and crop a previously resolved image or video in one step. Crop values can be expressed in pixels or percent.",
parameters={
"media_id": {
"type": "string",
"description": "The media id for the source image or video returned by Resolve Media.",
},
"width": {
"type": "integer",
"description": "Optional output width in pixels after cropping.",
"required": False,
},
"height": {
"type": "integer",
"description": "Optional output height in pixels after cropping.",
"required": False,
},
"crop_left": {
"type": "number",
"description": "Optional amount to crop from the left side.",
"required": False,
},
"crop_top": {
"type": "number",
"description": "Optional amount to crop from the top side.",
"required": False,
},
"crop_right": {
"type": "number",
"description": "Optional amount to crop from the right side.",
"required": False,
},
"crop_bottom": {
"type": "number",
"description": "Optional amount to crop from the bottom side.",
"required": False,
},
"crop_unit": {
"type": "string",
"description": "Crop unit: pixels or percent.",
"required": False,
},
},
pause_runtime=False,
)
def resize_crop(self, media_id: str, width: int | None = None, height: int | None = None, crop_left: float | None = None, crop_top: float | None = None, crop_right: float | None = None, crop_bottom: float | None = None, crop_unit: str | None = None) -> dict[str, Any]:
self._sync_recent_media()
if self.session is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "output_file": "", "error": "Assistant session is not available."}
source_media = media_registry.get_media_record(self.session, media_id)
if source_media is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "output_file": "", "error": "Unknown media id."}
if source_media.get("media_type") not in {"image", "video"}:
actual_media_type = str(source_media.get("media_type", "") or "").strip() or "unknown media type"
return {"status": "error", "media_id": source_media.get("media_id", ""), "actual_media_type": actual_media_type, "media_type": actual_media_type, "output_file": "", "error": f"media_id must reference an image or video, not a {actual_media_type}."}
try:
width = None if width is None or str(width).strip() == "" else int(width)
height = None if height is None or str(height).strip() == "" else int(height)
crop_left = 0 if crop_left is None or str(crop_left).strip() == "" else float(crop_left)
crop_top = 0 if crop_top is None or str(crop_top).strip() == "" else float(crop_top)
crop_right = 0 if crop_right is None or str(crop_right).strip() == "" else float(crop_right)
crop_bottom = 0 if crop_bottom is None or str(crop_bottom).strip() == "" else float(crop_bottom)
except Exception:
return {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": "width and height must be integers, crop values must be numbers."}
crop_unit = str(crop_unit or "pixels").strip().lower() or "pixels"
if crop_unit not in {"pixels", "percent"}:
return {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": "crop_unit must be 'pixels' or 'percent'."}
source_media_type = str(source_media.get("media_type", "") or "").strip() or "media"
self._set_status(f"Resizing and cropping {source_media_type}...", kind="tool")
self._update_tool_progress("running", "Processing", {"status": "running", "media_id": source_media.get("media_id", ""), "width": width, "height": height, "crop_left": crop_left, "crop_top": crop_top, "crop_right": crop_right, "crop_bottom": crop_bottom, "crop_unit": crop_unit})
source_path = str(source_media.get("path", "")).strip()
base_name = os.path.splitext(os.path.basename(source_path))[0]
try:
if source_media_type == "video":
video_codec, video_container = self._get_video_output_settings()
output_path = self._resolve_direct_output_path(f"{base_name}_resized{deepy_video_tools.get_video_container_extension(video_container)}", False, False)
output_path = deepy_video_tools.resize_crop_video(source_path, output_path, width=width, height=height, crop_left=crop_left, crop_top=crop_top, crop_right=crop_right, crop_bottom=crop_bottom, crop_unit=crop_unit, video_codec=video_codec, video_container=video_container, audio_codec=self._get_video_audio_output_codec())
else:
image_ext = os.path.splitext(source_path)[1].lower()
if image_ext not in {".png", ".jpg", ".jpeg", ".webp"}:
image_ext = ".png"
output_path = self._resolve_direct_output_path(f"{base_name}_resized{image_ext}", True, False)
output_path = deepy_video_tools.resize_crop_image(source_path, output_path, width=width, height=height, crop_left=crop_left, crop_top=crop_top, crop_right=crop_right, crop_bottom=crop_bottom, crop_unit=crop_unit)
except Exception as exc:
result = {"status": "error", "media_id": source_media.get("media_id", ""), "output_file": "", "error": str(exc)}
self._update_tool_progress("error", "Error", result)
self._set_status(f"Resize/crop failed: {exc}", kind="error")
return result
comments = f'Resized/cropped "{os.path.basename(source_path)}"'
if width is not None or height is not None:
comments += f" to {width if width is not None else 'auto'}x{height if height is not None else 'auto'}"
if any(value > 0 for value in (crop_left, crop_top, crop_right, crop_bottom)):
comments += f" with crop {crop_left}/{crop_top}/{crop_right}/{crop_bottom} {crop_unit}"
resized_settings = self._build_direct_media_settings(source_media, comments)
if source_media_type == "video":
self._update_video_metadata_fields(output_path, resized_settings)
media_record = self._record_direct_media(output_path, resized_settings, is_image=source_media_type == "image", audio_only=False, label=f"Resized/cropped {source_media_type}")
result = {
"status": "done",
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"source_media_id": source_media.get("media_id", ""),
"output_file": output_path,
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status(f"{source_media_type.capitalize()} resize/crop finished.", kind="tool")
return result
@assistant_tool(
display_name="Merge Videos",
description="Merge two previously resolved videos into one clip, resizing the second video when needed so it matches the first video dimensions.",
parameters={
"video_first": {
"type": "string",
"description": "The media id for the first video returned by Resolve Media.",
},
"video_second": {
"type": "string",
"description": "The media id for the second video returned by Resolve Media.",
},
},
pause_runtime=False,
)
def merge_videos(self, video_first: str, video_second: str) -> dict[str, Any]:
self._sync_recent_media()
first_media, error_result = self._resolve_video_media(video_first, "video_first")
if error_result is not None:
return error_result
second_media, error_result = self._resolve_video_media(video_second, "video_second")
if error_result is not None:
return error_result
self._set_status("Merging videos...", kind="tool")
self._update_tool_progress("running", "Merging", {"status": "running", "video_first": first_media.get("media_id", ""), "video_second": second_media.get("media_id", "")})
first_path = str(first_media.get("path", "")).strip()
second_path = str(second_media.get("path", "")).strip()
first_name = os.path.basename(first_path)
second_name = os.path.basename(second_path)
video_codec, video_container = self._get_video_output_settings()
output_name = f"merged_{first_media.get('media_id', 'video')}_{second_media.get('media_id', 'video')}{deepy_video_tools.get_video_container_extension(video_container)}"
output_path = self._resolve_direct_output_path(output_name, False, False)
output_path = deepy_video_tools.merge_videos(first_path, second_path, output_path=output_path, video_codec=video_codec, video_container=video_container, audio_codec=self._get_video_audio_output_codec())
merged_settings = dict(second_media.get("settings", {}) or {})
merged_settings["client_id"] = _next_ai_client_id()
merged_settings["comments"] = f'Merged from "{first_name} & {second_name}"'
end_time = time.time()
merged_settings["creation_date"] = datetime.fromtimestamp(end_time).isoformat(timespec="seconds")
merged_settings["creation_timestamp"] = int(end_time)
try:
fps, width, height, frames_count = get_video_info(output_path)
merged_settings["resolution"] = f"{width}x{height}"
merged_settings["video_length"] = int(frames_count)
if fps > 0:
merged_settings["duration_seconds"] = round(frames_count / fps, 3)
except Exception:
pass
media_record = self._record_direct_media(output_path, merged_settings, is_image=False, audio_only=False, label="Merged video")
result = {
"status": "done",
"output_file": output_path,
"media_id": "" if media_record is None else media_record.get("media_id", ""),
"video_first": first_media.get("media_id", ""),
"video_second": second_media.get("media_id", ""),
"error": "",
}
self._update_tool_progress("done", "Done", result)
self._set_status("Video merge finished.", kind="tool")
return result
@assistant_tool(
display_name="Search Doc",
description="Search WanGP documentation by keywords and return the best matching sections.",
parameters={
"query": {
"type": "string",
"description": "Keywords or a short natural-language question to search for in WanGP docs.",
},
"doc_id": {
"type": "string",
"description": "Optional documentation id to limit the search to: finetunes, getting_started, loras, overview, prompts, or vace.",
"required": False,
},
},
pause_runtime=False,
)
def search_doc(self, query: str, doc_id: str = "") -> dict[str, Any]:
query = str(query or "").strip()
lookup_id = str(doc_id or "").strip().lower()
if len(query) == 0:
return {"status": "error", "query": "", "doc_id": lookup_id, "matches": [], "error": "query is empty."}
if len(lookup_id) > 0 and lookup_id not in _DEEPY_DOCS:
return {
"status": "error",
"query": query,
"doc_id": lookup_id,
"matches": [],
"available_doc_ids": sorted(_DEEPY_DOCS.keys()),
"error": "Unknown documentation id.",
}
target_doc_ids = [lookup_id] if len(lookup_id) > 0 else sorted(_DEEPY_DOCS.keys())
query_tokens = _tokenize_doc_query(query)
self._set_status("Searching documentation...", kind="tool")
self._update_tool_progress("running", "Searching", {"status": "running", "query": query, "doc_id": lookup_id})
try:
matches = []
for current_doc_id in target_doc_ids:
doc_info, sections = _extract_doc_sections(current_doc_id)
for section in sections:
score = _score_doc_section(query, query_tokens, doc_info["title"], section)
if score <= 0:
continue
matches.append(
{
"doc_id": doc_info["doc_id"],
"title": doc_info["title"],
"path": doc_info["path"],
"section": section["section"],
"heading": section["heading"],
"heading_level": section["heading_level"],
"excerpt": _build_doc_excerpt(section, query, query_tokens),
"score": int(score),
}
)
except Exception as exc:
result = {"status": "error", "query": query, "doc_id": lookup_id, "matches": [], "error": str(exc)}
self._update_tool_progress("error", "Error", result)
return result
matches.sort(key=lambda item: (-int(item["score"]), str(item["doc_id"]), len(str(item["section"]))))
result = {
"status": "done",
"query": query,
"doc_id": lookup_id,
"searched_doc_ids": target_doc_ids,
"matches": matches[:5],
"error": "",
}
self._update_tool_progress("done", "Done", {"status": "done", "query": query, "doc_id": lookup_id, "match_count": len(result["matches"]), "error": ""})
self._set_status("Documentation search finished.", kind="tool")
return result
@assistant_tool(
display_name="Load Doc Section",
description="Load one specific WanGP documentation section using the doc id and section path returned by Search Doc.",
parameters={
"doc_id": {
"type": "string",
"description": "Documentation id: finetunes, getting_started, loras, overview, prompts, or vace.",
},
"section": {
"type": "string",
"description": "The section path returned by Search Doc, for example `Prompt Enhancer > Automatic Versus On-Demand`.",
},
},
pause_runtime=False,
)
def load_doc_section(self, doc_id: str, section: str) -> dict[str, Any]:
lookup_id = str(doc_id or "").strip().lower()
section = str(section or "").strip()
if lookup_id not in _DEEPY_DOCS:
return {
"status": "error",
"doc_id": lookup_id,
"section": section,
"available_doc_ids": sorted(_DEEPY_DOCS.keys()),
"error": "Unknown documentation id.",
}
if len(section) == 0:
return {"status": "error", "doc_id": lookup_id, "section": "", "error": "section is empty."}
self._set_status("Loading documentation section...", kind="tool")
self._update_tool_progress("running", "Loading", {"status": "running", "doc_id": lookup_id, "section": section})
try:
doc_info, resolved_section, candidate_sections = _resolve_doc_section(lookup_id, section)
except Exception as exc:
result = {"status": "error", "doc_id": lookup_id, "section": section, "error": str(exc)}
self._update_tool_progress("error", "Error", result)
return result
if len(resolved_section) == 0:
result = {
"status": "error",
"doc_id": lookup_id,
"section": section,
"matching_sections": candidate_sections,
"error": "Section not found or ambiguous. Use the exact section path returned by Search Doc.",
}
self._update_tool_progress("error", "Error", result)
return result
result = {
"status": "done",
"doc_id": doc_info["doc_id"],
"title": doc_info["title"],
"path": doc_info["path"],
"section": resolved_section["section"],
"heading": resolved_section["heading"],
"heading_level": resolved_section["heading_level"],
"content": resolved_section["content"],
"error": "",
}
self._update_tool_progress("done", "Loaded", {"status": "done", "doc_id": doc_info["doc_id"], "section": resolved_section["section"], "path": doc_info["path"], "error": ""})
self._set_status("Documentation section loaded.", kind="tool")
return result
@assistant_tool(
display_name="Get Selected Media",
description="Return the current selected WanGP gallery media. With media_type=all, return both the selected visual media and the selected audio media. If the selected visual item is a video, also report the current player time and frame number.",
parameters={
"media_type": {
"type": "string",
"description": "Optional desired media type: image, video, audio, or all. all returns both gallery selections.",
"required": False,
},
},
pause_runtime=False,
)
def get_selected_media(self, media_type: str = "all") -> dict[str, Any]:
self._sync_recent_media()
resolved_media_type = self._normalize_selected_media_type(media_type)
if resolved_media_type == "all":
visual_media_record, audio_media_record, error_result = self._get_all_selected_media_records()
if error_result is not None:
return error_result
return {
"status": "done",
"media_type": "all",
"selected_visual_media": None if visual_media_record is None else self._selected_media_payload(visual_media_record),
"selected_audio_media": None if audio_media_record is None else self._selected_media_payload(audio_media_record),
"error": "",
}
media_record, error_result = self._get_selected_media_record(media_type)
if error_result is not None:
return error_result
return {"status": "done", **self._selected_media_payload(media_record), "error": ""}
@assistant_tool(
display_name="Get Media Details",
description="Return detailed local metadata for a previously resolved image, video, or audio.",
parameters={
"media_id": {
"type": "string",
"description": "The media id returned by Resolve Media.",
},
},
pause_runtime=False,
)
def get_media_details(self, media_id: str) -> dict[str, Any]:
self._sync_recent_media()
if self.session is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "error": "Unknown media id."}
media_path = str(media_record.get("path", "")).strip()
media_type = str(media_record.get("media_type", "")).strip().lower()
if media_type not in {"image", "video", "audio"}:
return {
"status": "error",
"media_id": media_record.get("media_id", ""),
"media_type": media_type,
"error": "Detailed media info currently supports images, videos, and audio.",
}
self._set_status("Reading media details...", kind="tool")
self._update_tool_progress("running", "Reading", {"status": "running", "media_id": media_record.get("media_id", ""), "media_type": media_type})
try:
if media_type == "image":
with Image.open(media_path) as image_handle:
width, height = image_handle.size
result = {
"status": "done",
"media_id": media_record.get("media_id", ""),
"label": media_record.get("label", ""),
"media_type": "image",
"path": media_path,
"filename": os.path.basename(media_path),
"width": int(width),
"height": int(height),
"resolution": f"{int(width)}x{int(height)}",
"frame_count": 1,
"fps": None,
"duration_seconds": None,
"has_audio": False,
"audio_track_count": 0,
"sample_rate": None,
"channels": None,
"error": "",
}
elif media_type == "video":
fps, width, height, frame_count = get_video_info(media_path)
audio_track_count = int(extract_audio_tracks(media_path, query_only=True))
result = {
"status": "done",
"media_id": media_record.get("media_id", ""),
"label": media_record.get("label", ""),
"media_type": "video",
"path": media_path,
"filename": os.path.basename(media_path),
"width": int(width),
"height": int(height),
"resolution": f"{int(width)}x{int(height)}",
"frame_count": int(frame_count),
"fps": int(fps),
"duration_seconds": (float(frame_count) / float(fps)) if fps > 0 else None,
"has_audio": audio_track_count > 0,
"audio_track_count": audio_track_count,
"sample_rate": None,
"channels": None,
"error": "",
}
else:
probe = ffmpeg.probe(media_path)
audio_streams = [stream for stream in probe.get("streams", []) if str(stream.get("codec_type", "")).strip().lower() == "audio"]
primary_stream = audio_streams[0] if audio_streams else {}
sample_rate = primary_stream.get("sample_rate", None)
channels = primary_stream.get("channels", None)
duration_seconds = probe.get("format", {}).get("duration", None)
try:
duration_seconds = None if duration_seconds in {None, "", "N/A"} else float(duration_seconds)
except Exception:
duration_seconds = None
try:
sample_rate = None if sample_rate in {None, "", "N/A"} else int(sample_rate)
except Exception:
sample_rate = None
try:
channels = None if channels in {None, "", "N/A"} else int(channels)
except Exception:
channels = None
result = {
"status": "done",
"media_id": media_record.get("media_id", ""),
"label": media_record.get("label", ""),
"media_type": "audio",
"path": media_path,
"filename": os.path.basename(media_path),
"width": None,
"height": None,
"resolution": None,
"frame_count": None,
"fps": None,
"duration_seconds": duration_seconds,
"has_audio": len(audio_streams) > 0,
"audio_track_count": int(len(audio_streams)),
"sample_rate": sample_rate,
"channels": channels,
"error": "",
}
except Exception as exc:
result = {
"status": "error",
"media_id": media_record.get("media_id", ""),
"media_type": media_type,
"path": media_path,
"error": str(exc),
}
self._update_tool_progress("error", "Error", result)
return result
self._update_tool_progress("done", "Done", result)
self._set_status("Media details loaded.", kind="tool")
return result
@assistant_tool(
display_name="Resolve Media",
description="Look up previously generated WanGP media by natural reference such as last image, previous image, or a short description.",
parameters={
"reference": {
"type": "string",
"description": "The user's natural-language reference to previously generated media, such as 'last image' or 'robot on the moon'.",
},
"media_type": {
"type": "string",
"description": "The desired media type: image, video, audio, or all.",
},
},
pause_runtime=False,
)
def resolve_media_reference(self, reference: str, media_type: str) -> dict[str, Any]:
self._sync_recent_media()
if self.session is None:
return {"status": "error", "reference": str(reference or "").strip(), "media_type": str(media_type or "all").strip() or "all", "matches": [], "error": "Assistant session is not available."}
if self._is_selected_reference(reference):
resolved_media_type = self._normalize_selected_media_type(media_type)
if resolved_media_type == "all":
matches = []
visual_media_record, audio_media_record, error_result = self._get_all_selected_media_records()
if error_result is not None:
error_result.setdefault("reference", str(reference or "").strip())
return error_result
if visual_media_record is not None:
matches.append(self._selected_media_payload(visual_media_record, why="matched selected visual media"))
if audio_media_record is not None:
matches.append(self._selected_media_payload(audio_media_record, why="matched selected audio media"))
if len(matches) == 1:
return {"status": "resolved", "media_type": "all", "reference": str(reference or "").strip(), "media": matches[0], "error": ""}
return {"status": "candidates", "media_type": "all", "reference": str(reference or "").strip(), "matches": matches, "error": ""}
media_record, error_result = self._get_selected_media_record(resolved_media_type)
if error_result is not None:
error_result.setdefault("reference", str(reference or "").strip())
return error_result
return {"status": "resolved", "media_type": resolved_media_type, "reference": str(reference or "").strip(), "media": self._selected_media_payload(media_record, why="matched selected media"), "error": ""}
result = media_registry.resolve_media_reference(self.session, reference, media_type)
result.setdefault("error", "")
return result
@assistant_tool(
display_name="Inspect Media",
description="Ask Deepy to inspect a previously resolved image or a frame from a previously resolved video and answer a visual question about it.",
parameters={
"media_id": {
"type": "string",
"description": "The media id returned by Resolve Media.",
},
"question": {
"type": "string",
"description": "The visual question to answer about that media item.",
},
"frame_no": {
"type": "integer",
"description": "Optional frame number to inspect when media_id refers to a video. If omitted, the first frame is used.",
"required": False,
},
},
pause_runtime=True,
pause_reason="vision",
)
def inspect_media(self, media_id: str, question: str, frame_no: int | None = None) -> dict[str, Any]:
self._sync_recent_media()
try:
frame_no = None if frame_no is None or str(frame_no).strip() == "" else int(frame_no)
except Exception:
return {"status": "error", "media_id": str(media_id or "").strip(), "question": str(question or "").strip(), "answer": "", "error": "frame_no must be an integer."}
self._update_tool_progress("running", "Inspecting", {"status": "running", "media_id": str(media_id or "").strip(), "question": str(question or "").strip(), "frame_no": frame_no})
if self.session is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "question": str(question or "").strip(), "answer": "", "error": "Assistant session is not available."}
media_record = media_registry.get_media_record(self.session, media_id)
if media_record is None:
return {"status": "error", "media_id": str(media_id or "").strip(), "question": str(question or "").strip(), "answer": "", "error": "Unknown media id."}
if media_record.get("media_type") not in {"image", "video"}:
return {
"status": "error",
"media_id": media_record.get("media_id", ""),
"media_type": media_record.get("media_type", ""),
"question": str(question or "").strip(),
"answer": "",
"error": "Visual inspection currently supports images and videos.",
}
if self._vision_query_callback is None:
return {
"status": "error",
"media_id": media_record.get("media_id", ""),
"media_type": media_record.get("media_type", ""),
"question": str(question or "").strip(),
"answer": "",
"error": "Deepy vision inspection is not available.",
}
return self._vision_query_callback(media_record, question, frame_no)
def get_tool_schemas(self) -> list[dict[str, Any]]:
schemas = []
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
method = getattr(self, attr_name)
metadata = getattr(method, "_assistant_tool", None)
if metadata is None:
continue
properties = {}
required = []
annotations = getattr(method, "__annotations__", {})
for param_name, param_meta in metadata["parameters"].items():
properties[param_name] = {
"type": param_meta.get("type") or _json_type_from_annotation(annotations.get(param_name, str)),
"description": str(param_meta.get("description", "")).strip(),
}
if bool(param_meta.get("required", True)):
required.append(param_name)
schemas.append(
{
"type": "function",
"function": {
"name": metadata["name"],
"description": metadata["description"],
"parameters": {
"type": "object",
"properties": properties,
"required": required,
},
},
}
)
return schemas
def get_tool_display_name(self, tool_name: str) -> str:
lookup_name = str(tool_name or "").strip()
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
method = getattr(self, attr_name)
metadata = getattr(method, "_assistant_tool", None)
if metadata is None or metadata["name"] != lookup_name:
continue
return str(metadata.get("display_name", lookup_name)).strip() or lookup_name
return lookup_name.replace("_", " ").replace("-", " ").strip().title() or "Tool"
def get_tool_policy(self, tool_name: str) -> dict[str, Any]:
lookup_name = str(tool_name or "").strip()
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
method = getattr(self, attr_name)
metadata = getattr(method, "_assistant_tool", None)
if metadata is None or metadata["name"] != lookup_name:
continue
return {
"pause_runtime": bool(metadata.get("pause_runtime", True)),
"pause_reason": str(metadata.get("pause_reason", "tool") or "tool"),
}
return {"pause_runtime": True, "pause_reason": "tool"}
def validate_tool_call(self, tool_name: str, arguments: dict[str, Any]) -> str:
lookup_name = str(tool_name or "").strip()
call_args = dict(arguments or {})
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
method = getattr(self, attr_name)
metadata = getattr(method, "_assistant_tool", None)
if metadata is None or metadata["name"] != lookup_name:
continue
for param_name, param_meta in metadata["parameters"].items():
if not bool(param_meta.get("required", True)):
continue
value = call_args.get(param_name, None)
if value is None:
return f"{param_name} is required."
if str(param_meta.get("type", "")).strip().lower() == "string" and len(str(value or "").strip()) == 0:
return f"{param_name} is empty."
return ""
return ""
def infer_tool_calls(self, raw_text: str) -> list[dict[str, Any]]:
candidate_texts = []
thinking_text, answer_text = qwen35_text._split_generated_text(raw_text)
for candidate in (raw_text, answer_text, thinking_text):
candidate = str(candidate or "").strip()
if len(candidate) > 0:
candidate_texts.append(candidate)
by_name = {}
sole_tool_name = None
sole_tool_params = set()
for schema in self.get_tool_schemas():
function_spec = schema.get("function", {})
tool_name = str(function_spec.get("name", "")).strip()
if len(tool_name) == 0:
continue
by_name[tool_name] = set(function_spec.get("parameters", {}).get("properties", {}).keys())
if len(by_name) == 1:
sole_tool_name = next(iter(by_name))
sole_tool_params = by_name[sole_tool_name]
for candidate in candidate_texts:
pseudo_match = re.search(r"Tool call:\s*([A-Za-z_][A-Za-z0-9_]*)\((.*)\)", candidate, flags=re.DOTALL)
if pseudo_match is not None:
tool_name = pseudo_match.group(1).strip()
raw_args = pseudo_match.group(2).strip()
arguments = {}
for arg_name, quoted_value in re.findall(r'([A-Za-z_][A-Za-z0-9_]*)\s*=\s*"([^"]*)"', raw_args):
arguments[arg_name] = quoted_value
for arg_name, quoted_value in re.findall(r"([A-Za-z_][A-Za-z0-9_]*)\s*=\s*'([^']*)'", raw_args):
arguments[arg_name] = quoted_value
if tool_name in by_name:
return [{"name": tool_name, "arguments": arguments}]
fenced_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", candidate, flags=re.DOTALL | re.IGNORECASE)
json_candidate = fenced_match.group(1).strip() if fenced_match is not None else strip_trailing_stop_markup(candidate).strip()
try:
parsed = json.loads(json_candidate)
except Exception:
continue
if not isinstance(parsed, dict):
continue
if "name" in parsed and "arguments" in parsed:
tool_name = str(parsed.get("name", "")).strip()
arguments = parsed.get("arguments", {})
if isinstance(arguments, dict) and tool_name in by_name:
return [{"name": tool_name, "arguments": arguments}]
if sole_tool_name is not None and set(parsed.keys()).issubset(sole_tool_params):
return [{"name": sole_tool_name, "arguments": parsed}]
return []
def call(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
for attr_name in dir(self):
if attr_name.startswith("_"):
continue
method = getattr(self, attr_name)
metadata = getattr(method, "_assistant_tool", None)
if metadata is None:
continue
if metadata["name"] != tool_name:
continue
return method(**dict(arguments or {}))
raise KeyError(f"Unknown assistant tool: {tool_name}")
class AssistantEngine:
def __init__(self, session: AssistantSessionState, runtime_hooks: AssistantRuntimeHooks, tool_box: tools, send_cmd, debug_enabled: bool | None = None, thinking_enabled: bool = True, vram_mode: str = DEEPY_VRAM_MODE_UNLOAD):
self.session = session
self.runtime_hooks = runtime_hooks
self.tool_box = tool_box
self.send_cmd = send_cmd
self.debug_enabled = ASSISTANT_DEBUG if debug_enabled is None else bool(debug_enabled)
self.thinking_enabled = bool(thinking_enabled)
self.vram_mode = normalize_deepy_vram_mode(vram_mode)
self.runtime: Qwen35AssistantRuntime | None = None
self._gpu_acquired = False
self._skip_pause_snapshot = False
self._active_turn_id = ""
self._active_tool_context: tuple[str, str] | None = None
self._stream_answer_text = ""
self._stream_reasoning_text = ""
self._stream_reasoning_block_id = ""
self._stream_thinking_unknown = False
self._stream_thinking_open = False
self._prefill_started_at: float | None = None
self._live_prefill_tokens = 0
self._segment_started_at: float | None = None
self._segment_generated_tokens = 0
bind_runtime_tools = getattr(self.tool_box, "bind_runtime_tools", None)
if callable(bind_runtime_tools):
bind_runtime_tools(vision_query_callback=self._run_visual_query, tool_progress_callback=self._handle_tool_progress)
def _log(self, message: str) -> None:
if self.debug_enabled:
print(f"[Assistant] {message}")
def _emit_chat_event(self, payload: str | None) -> None:
if payload is None or len(str(payload).strip()) == 0:
return
self.send_cmd("chat_output", payload)
def _set_status(self, text: str | None, kind: str = "thinking") -> None:
self._emit_chat_event(assistant_chat.build_status_event(text, kind=kind, visible=text is not None and len(str(text).strip()) > 0))
self._emit_stats()
def _hide_status(self) -> None:
self._emit_chat_event(assistant_chat.build_status_event(None, visible=False))
self._emit_stats(force=True)
def _get_context_window_tokens(self) -> int:
return normalize_deepy_context_tokens(get_deepy_config_value(DEEPY_CONTEXT_TOKENS_KEY, DEEPY_CONTEXT_TOKENS_DEFAULT))
def _active_sequence_token_count(self) -> int | None:
if self.runtime is None:
return None
try:
current_seq = self.runtime._get_active_sequence()
except Exception:
return None
if current_seq is None:
return None
try:
return len(current_seq.token_ids or [])
except Exception:
return None
def _resolved_chat_max_tokens(self) -> int:
max_tokens = 0
if self.runtime is not None:
try:
max_tokens = int(self.runtime.get_max_model_len() or 0)
except Exception:
max_tokens = 0
if max_tokens > 0:
self.session.runtime_max_model_len = max_tokens
return max_tokens
try:
max_tokens = int(self.session.runtime_max_model_len or 0)
except Exception:
max_tokens = 0
return max_tokens if max_tokens > 0 else self._get_context_window_tokens()
def _chat_stats_payload(self) -> dict[str, Any]:
live_prefill_seconds = 0.0 if self._prefill_started_at is None else max(0.0, time.perf_counter() - self._prefill_started_at)
live_generation_seconds = 0.0 if self._segment_started_at is None else max(0.0, time.perf_counter() - self._segment_started_at)
return build_assistant_chat_stats(
self.session,
max_tokens=self._resolved_chat_max_tokens(),
active_sequence_token_count=self._active_sequence_token_count(),
live_prefill_tokens=self._live_prefill_tokens,
live_prefill_seconds=live_prefill_seconds,
live_generated_tokens=self._segment_generated_tokens,
live_generation_seconds=live_generation_seconds,
)
def _emit_stats(self, *, force: bool = False) -> None:
stats = self._chat_stats_payload()
signature = _json_dumps(stats)
if not force and signature == str(self.session.chat_stats_signature or ""):
return
self.session.chat_stats_signature = signature
self._emit_chat_event(assistant_chat.build_stats_event(stats))
def _record_prefill_metrics(self, token_count: int, elapsed_seconds: float) -> None:
tokens = max(0, int(token_count or 0))
elapsed = max(0.0, float(elapsed_seconds or 0.0))
if tokens <= 0 or elapsed <= 0.0:
return
self.session.prefill_token_total += tokens
self.session.prefill_seconds_total += elapsed
def _record_generation_metrics(self, token_count: int, elapsed_seconds: float) -> None:
tokens = max(0, int(token_count or 0))
elapsed = max(0.0, float(elapsed_seconds or 0.0))
if tokens <= 0 or elapsed <= 0.0:
return
self.session.generated_token_total += tokens
self.session.generated_seconds_total += elapsed
def _run_prefill_call(self, token_count: int, callback: Callable[[], Any], *, record_if: bool | Callable[[Any], bool] = True) -> Any:
tokens = max(0, int(token_count or 0))
started_at = time.perf_counter()
self._prefill_started_at = started_at if tokens > 0 else None
self._live_prefill_tokens = tokens
completed = False
result = None
try:
result = callback()
completed = True
return result
finally:
elapsed_seconds = max(0.0, time.perf_counter() - started_at)
self._prefill_started_at = None
self._live_prefill_tokens = 0
should_record = record_if(result) if callable(record_if) else bool(record_if)
if completed and should_record:
self._record_prefill_metrics(tokens, elapsed_seconds)
self._emit_stats(force=True)
def _finish_stream_pass(self, token_count: int | None = None) -> None:
elapsed_seconds = 0.0 if self._segment_started_at is None else max(0.0, time.perf_counter() - self._segment_started_at)
recorded_tokens = max(max(0, int(token_count or 0)), max(0, int(self._segment_generated_tokens or 0)))
self._record_generation_metrics(recorded_tokens, elapsed_seconds)
self._segment_started_at = None
self._segment_generated_tokens = 0
self._emit_stats(force=True)
def _get_custom_system_prompt(self) -> str:
return normalize_deepy_custom_system_prompt(get_deepy_config_value(DEEPY_CUSTOM_SYSTEM_PROMPT_KEY, ""))
def _build_system_prompt(self, *, log_injections: bool = False) -> str:
system_prompt = ASSISTANT_SYSTEM_PROMPT.rstrip()
custom_system_prompt = self._get_custom_system_prompt()
if len(custom_system_prompt) > 0:
system_prompt = f"{system_prompt}\n\n{custom_system_prompt}"
if len(self.session.interruption_notice.strip()) > 0:
if log_injections:
self._log(f"Injecting interruption notice into system prompt: {self.session.interruption_notice.strip()}")
system_prompt = f"{system_prompt.rstrip()}\n\n{self.session.interruption_notice.strip()}"
return system_prompt
def _current_system_prompt_signature(self) -> str:
return self._build_system_prompt()
def _remember_render_state(self) -> None:
self.session.rendered_system_prompt_signature = self._current_system_prompt_signature()
self.session.rendered_context_window_tokens = self._get_context_window_tokens()
self.session.rendered_messages_len = len(self.session.messages)
def _message_render_content(self, message: dict[str, Any]) -> str:
model_content = message.get("model_content", None)
if isinstance(model_content, str) and len(model_content) > 0:
if _INJECT_SELECTED_MEDIA_RUNTIME_UPDATES:
return model_content
return _RUNTIME_UPDATE_BLOCK_RE.sub("\n", model_content).strip()
return str(message.get("content", "") or "")
def _get_pending_render_messages(self) -> list[dict[str, Any]]:
try:
start_idx = int(self.session.rendered_messages_len or 0)
except Exception:
start_idx = 0
start_idx = max(0, min(start_idx, len(self.session.messages)))
return list(self.session.messages[start_idx:])
def _can_append_pending_user_suffix(self) -> bool:
if self.session.rendered_system_prompt_signature != self._current_system_prompt_signature():
return False
if int(self.session.rendered_context_window_tokens or 0) != self._get_context_window_tokens():
return False
pending_messages = self._get_pending_render_messages()
return len(pending_messages) == 1 and str(pending_messages[0].get("role", "")).strip().lower() == "user"
def _pending_user_render_content(self) -> str:
pending_messages = self._get_pending_render_messages()
if len(pending_messages) != 1:
return ""
if str(pending_messages[0].get("role", "")).strip().lower() != "user":
return ""
return self._message_render_content(pending_messages[0]).strip()
def _can_append_pending_tool_suffix(self) -> bool:
if self.session.rendered_system_prompt_signature != self._current_system_prompt_signature():
return False
if int(self.session.rendered_context_window_tokens or 0) != self._get_context_window_tokens():
return False
pending_messages = self._get_pending_render_messages()
return len(pending_messages) > 0 and all(str(message.get("role", "")).strip().lower() == "tool" for message in pending_messages)
def _pending_tool_render_contents(self) -> list[str]:
return [self._message_render_content(message).strip() for message in self._get_pending_render_messages() if str(message.get("role", "")).strip().lower() == "tool" and len(self._message_render_content(message).strip()) > 0]
def _refresh_runtime_status_note(self) -> None:
if not _INJECT_SELECTED_MEDIA_RUNTIME_UPDATES:
self.session.runtime_status_note = ""
self.session.runtime_status_signature = ""
return
snapshot = self.tool_box._get_selected_runtime_snapshot()
previous_snapshot = {}
previous_signature = str(self.session.runtime_status_signature or "").strip()
if len(previous_signature) > 0:
try:
previous_snapshot = dict(json.loads(previous_signature) or {})
except Exception:
previous_snapshot = {}
if snapshot is None:
if len(previous_signature) == 0:
self.session.runtime_status_note = ""
return
normalized_snapshot = {key: None for key in _RUNTIME_STATUS_ALL_KEYS}
else:
normalized_snapshot = {key: None for key in _RUNTIME_STATUS_ALL_KEYS}
for key in ("selected_visual_media_id", "selected_visual_media_type", "selected_visual_media_label", "selected_audio_media_id", "selected_audio_media_type", "selected_audio_media_label"):
normalized_snapshot[key] = str(snapshot.get(key, "") or "").strip() or None
for key in ("selected_visual_current_time_seconds", "selected_visual_current_frame_no"):
normalized_snapshot[key] = snapshot.get(key, None)
signature = _json_dumps(normalized_snapshot)
if signature == self.session.runtime_status_signature:
self.session.runtime_status_note = ""
return
changed_keys = [key for key in _RUNTIME_STATUS_ALL_KEYS if previous_snapshot.get(key, None) != normalized_snapshot.get(key, None)]
if len(previous_snapshot) == 0:
emitted_keys = list(_RUNTIME_STATUS_ALL_KEYS)
else:
emitted_keys = []
if any(key in changed_keys for key in _RUNTIME_STATUS_VISUAL_KEYS):
emitted_keys.extend(_RUNTIME_STATUS_VISUAL_KEYS)
if any(key in changed_keys for key in _RUNTIME_STATUS_AUDIO_KEYS):
emitted_keys.extend(_RUNTIME_STATUS_AUDIO_KEYS)
if len(emitted_keys) == 0:
self.session.runtime_status_note = ""
self.session.runtime_status_signature = signature
return
lines = [
"",
"Hidden WanGP runtime state. This is environment metadata, not a user message.",
"Use it as factual UI context only. Omitted keys keep their previous runtime-update values.",
]
for key in emitted_keys:
value = normalized_snapshot.get(key, None)
if isinstance(value, str):
rendered_value = value if len(value) > 0 else "none"
else:
rendered_value = "none" if value is None else value
lines.append(f"{key}: {rendered_value}")
lines.append("")
self.session.runtime_status_note = "\n".join(lines)
self.session.runtime_status_signature = signature
if self.debug_enabled:
self._log(f"Prepared runtime status update: {signature}")
def _build_pending_user_message(self, user_text: str) -> dict[str, Any]:
message = {"role": "user", "content": str(user_text or "").strip()}
runtime_status_note = str(self.session.runtime_status_note or "").strip()
if len(runtime_status_note) == 0:
return message
message["model_content"] = f"{runtime_status_note}\n\n{message['content']}".strip()
self.session.runtime_status_note = ""
if self.debug_enabled:
self._log(f"Queued runtime status update inside hidden user content: {runtime_status_note}")
return message
def _record_live_context(self, log_message: str) -> str:
if self.runtime is None:
raise RuntimeError("Assistant runtime is not available for live-context recording.")
current_seq = self.runtime._get_active_sequence()
if current_seq is None or len(current_seq.token_ids) == 0:
return self._canonicalize_context(sync_runtime="record_only")
self.session.rendered_token_ids = [int(token_id) for token_id in current_seq.token_ids]
self.session.runtime_snapshot = None
self.session.pending_replay_reason = ""
self._skip_pause_snapshot = False
self._remember_render_state()
self._log(log_message)
self._emit_stats(force=True)
return "recorded"
def _send_chat(self, text: str) -> None:
text = str(text or "").strip()
if len(text) == 0:
return
self._emit_chat_event(assistant_chat.set_assistant_content(self.session, self._ensure_active_turn(), text))
def _ensure_active_turn(self) -> str:
if len(self._active_turn_id) == 0:
self._active_turn_id = assistant_chat.create_assistant_turn(self.session)
mark_assistant_turn_message(self.session, self._active_turn_id)
return self._active_turn_id
def _split_for_display(self, raw_text: str) -> tuple[str, str]:
thinking_text, answer_text = qwen35_text._split_generated_text(raw_text)
if self.debug_enabled and len(thinking_text) > 0:
print("[Assistant][Thinking]")
try:
print(thinking_text)
except UnicodeEncodeError:
encoding = getattr(sys.stdout, "encoding", None) or "utf-8"
safe_text = thinking_text.encode(encoding, errors="replace").decode(encoding, errors="replace")
sys.stdout.write(safe_text + "\n")
sys.stdout.flush()
return thinking_text, answer_text
def _start_stream_pass(self) -> None:
self._ensure_active_turn()
self._stream_answer_text = ""
self._stream_reasoning_text = ""
self._stream_reasoning_block_id = ""
self._stream_thinking_unknown = self.runtime is not None and qwen35_text._prompt_enhancer_thinking_enabled(self.runtime.model, thinking_enabled=self.thinking_enabled)
self._stream_thinking_open = False
self._segment_started_at = time.perf_counter()
self._segment_generated_tokens = 0
def _current_stream_content(self) -> str:
return self._stream_answer_text
def _split_streaming_text(self, raw_text: str, is_final: bool = False) -> tuple[str, str]:
text = strip_trailing_stop_markup(str(raw_text or "")).replace("\r\n", "\n").replace("\r", "\n")
lowered = text.lower()
open_idx = lowered.find("")
close_idx = lowered.find("")
if open_idx >= 0 and (close_idx < 0 or open_idx < close_idx):
self._stream_thinking_unknown = False
if close_idx < 0:
self._stream_thinking_open = True
return qwen35_text._normalize_generated_text(text[open_idx + len("") :]), ""
self._stream_thinking_open = False
thinking_text, answer_text = qwen35_text._split_generated_text(text)
return thinking_text, qwen35_text._clean_answer_text(_strip_partial_tool_markup(answer_text))
if self._stream_thinking_open and close_idx < 0:
return qwen35_text._normalize_generated_text(text.replace("", "\n")), ""
if close_idx >= 0:
self._stream_thinking_unknown = False
self._stream_thinking_open = False
thinking_text, answer_text = qwen35_text._split_generated_text(text)
return thinking_text, qwen35_text._clean_answer_text(_strip_partial_tool_markup(answer_text))
if self._stream_thinking_unknown and not is_final:
return "", ""
self._stream_thinking_unknown = False
thinking_text, answer_text = qwen35_text._split_generated_text(text)
return thinking_text, qwen35_text._clean_answer_text(_strip_partial_tool_markup(answer_text))
def _stream_generation_update(self, *, raw_text: str, token_count: int, stop_reason: str | None, is_final: bool) -> None:
turn_id = self._ensure_active_turn()
self._segment_generated_tokens = max(int(self._segment_generated_tokens or 0), max(0, int(token_count or 0)))
thinking_text, answer_text = self._split_streaming_text(raw_text, is_final=is_final)
if not is_final and len(thinking_text) < len(self._stream_reasoning_text):
thinking_text = self._stream_reasoning_text
if not is_final and len(answer_text) < len(self._stream_answer_text):
answer_text = self._stream_answer_text
if thinking_text != self._stream_reasoning_text and len(thinking_text) > 0:
self._stream_reasoning_block_id, reasoning_event = assistant_chat.upsert_reasoning_block(self.session, turn_id, self._stream_reasoning_block_id, thinking_text)
self._stream_reasoning_text = thinking_text
self._emit_chat_event(reasoning_event)
if answer_text != self._stream_answer_text and len(answer_text) > 0:
self._stream_answer_text = answer_text
self._emit_chat_event(assistant_chat.set_assistant_content(self.session, turn_id, self._stream_answer_text))
self._emit_stats()
def _handle_tool_progress(self, status: str | None = None, status_text: str | None = None, result: dict[str, Any] | None = None) -> None:
if self._active_tool_context is None:
return
message_id, tool_id = self._active_tool_context
self._emit_chat_event(assistant_chat.update_tool_call(self.session, message_id, tool_id, status=status, status_text=status_text, result=result))
def _acquire_runtime(self) -> Qwen35AssistantRuntime:
acquired_here = False
if not self._gpu_acquired:
self.runtime_hooks.clear_gpu_resident()
self.session.release_vram_callback = None
self.runtime_hooks.acquire_gpu()
self._gpu_acquired = True
acquired_here = True
try:
model, _tokenizer = self.runtime_hooks.ensure_loaded()
model._prompt_enhancer_min_model_len_hint = self._get_context_window_tokens()
if self.runtime is None or self.runtime.model is not model:
self.runtime = Qwen35AssistantRuntime(model, debug_enabled=self.debug_enabled)
return self.runtime
except Exception:
if acquired_here:
self._gpu_acquired = False
self.runtime_hooks.release_gpu()
raise
def _ensure_vision_loaded(self) -> tuple[Any, Any]:
ensure_vision_loaded = self.runtime_hooks.ensure_vision_loaded
if not callable(ensure_vision_loaded):
raise RuntimeError("Deepy vision runtime is not available.")
caption_model, caption_processor = ensure_vision_loaded()
if caption_model is None or caption_processor is None:
raise RuntimeError("Deepy vision runtime is not available.")
return caption_model, caption_processor
def _run_visual_query(self, media_record: dict[str, Any], question: str, frame_no: int | None = None) -> dict[str, Any]:
if not self._gpu_acquired:
self.runtime_hooks.clear_gpu_resident()
self.session.release_vram_callback = None
self.runtime_hooks.acquire_gpu()
self._gpu_acquired = True
media_path = str(media_record.get("path", "")).strip()
if len(media_path) == 0 or not os.path.isfile(media_path):
raise FileNotFoundError(f"Media file not found: {media_path}")
caption_model, caption_processor = self._ensure_vision_loaded()
media_type = str(media_record.get("media_type", "")).strip().lower()
if media_type == "video":
image = get_video_frame(media_path, 0 if frame_no is None else int(frame_no), return_last_if_missing=True, return_PIL=True).convert("RGB")
else:
with Image.open(media_path) as image_handle:
image = image_handle.convert("RGB")
prompt_token_ids, prompt_embeds, prompt_position_ids, position_offset = deepy_vision.build_image_question_prompt(
caption_model,
caption_processor,
image,
question,
)
runtime = self._acquire_runtime()
answer = runtime.generate_embedded_answer(
prompt_token_ids,
prompt_embeds,
prompt_position_ids,
position_offset,
max_new_tokens=192,
seed=0,
do_sample=False,
temperature=None,
top_p=None,
top_k=None,
)
return {
"status": "done",
"media_id": media_record.get("media_id", ""),
"media_type": media_type,
"label": media_record.get("label", ""),
"frame_no": None if media_type != "video" else (0 if frame_no is None else int(frame_no)),
"question": str(question or "").strip(),
"answer": answer,
"error": "",
}
def _force_release_vram(self) -> None:
self.runtime_hooks.clear_gpu_resident()
discard_runtime_snapshot = bool(self.session.discard_runtime_snapshot_on_release)
try:
if discard_runtime_snapshot:
self.session.runtime_snapshot = None
if len(self.session.rendered_token_ids) > 0:
self.session.pending_replay_reason = "Deepy RAM unload discarded the cached runtime snapshot"
elif self.runtime is not None and self.session.runtime_snapshot is None and len(self.session.rendered_token_ids) > 0:
self.session.runtime_snapshot = self.runtime.snapshot_context()
except Exception as exc:
self._log(f"Resident snapshot before VRAM release failed: {exc}")
try:
self.runtime_hooks.unload_runtime()
finally:
self.runtime_hooks.unload_weights()
self.runtime = None
self.session.release_vram_callback = None
self.session.discard_runtime_snapshot_on_release = False
def _pause_runtime(self, pause_reason: str = "idle") -> None:
keep_loaded = self.vram_mode in (DEEPY_VRAM_MODE_ALWAYS_LOADED, DEEPY_VRAM_MODE_UNLOAD_ON_REQUEST)
if pause_reason == "vision":
keep_loaded = False
if pause_reason == "tool" and self.vram_mode != DEEPY_VRAM_MODE_ALWAYS_LOADED:
keep_loaded = False
allow_force_release = keep_loaded and self.vram_mode == DEEPY_VRAM_MODE_UNLOAD_ON_REQUEST and pause_reason != "tool"
release_callback = self._force_release_vram if keep_loaded else None
if keep_loaded:
self.session.release_vram_callback = release_callback
else:
self.session.release_vram_callback = None
if not self._gpu_acquired:
if self.session.drop_state_requested:
if callable(self.session.release_vram_callback):
self.session.release_vram_callback()
clear_assistant_session(self.session)
self.session.drop_state_requested = False
return
try:
if self.runtime is not None and not self.session.drop_state_requested and not self._skip_pause_snapshot:
self.session.runtime_snapshot = self.runtime.snapshot_context()
else:
self.session.runtime_snapshot = None
finally:
try:
if not keep_loaded:
self.runtime_hooks.unload_runtime()
finally:
try:
if not keep_loaded:
self.runtime_hooks.unload_weights()
self.runtime = None
finally:
self.runtime_hooks.release_gpu(
keep_resident=allow_force_release,
release_vram_callback=release_callback,
force_release_on_acquire=allow_force_release,
)
self._gpu_acquired = False
self._skip_pause_snapshot = False
if self.session.drop_state_requested:
if keep_loaded and callable(self.session.release_vram_callback):
self.session.release_vram_callback()
clear_assistant_session(self.session)
self.session.drop_state_requested = False
def _render_messages(self, add_generation_prompt: bool) -> list[int]:
if self.runtime is None:
raise RuntimeError("Assistant runtime is not available for prompt rendering.")
messages = [{"role": "system", "content": self._build_system_prompt(log_injections=True)}]
for message in self.session.messages:
role = str(message.get("role", "")).strip().lower()
if role == "assistant":
model_message = {"role": "assistant"}
model_content = message.get("model_content", None)
if isinstance(model_content, str) and len(model_content) > 0:
model_message["content"] = model_content
elif "content" in message:
model_message["content"] = message["content"]
if "tool_calls" in message:
model_message["tool_calls"] = message["tool_calls"]
messages.append(model_message)
continue
model_message = {"role": role}
model_message["content"] = self._message_render_content(message)
messages.append(model_message)
thinking_enabled = qwen35_text._prompt_enhancer_thinking_enabled(self.runtime.model, thinking_enabled=self.thinking_enabled)
return render_assistant_messages(
self.runtime.tokenizer,
messages,
self.tool_box.get_tool_schemas(),
add_generation_prompt=add_generation_prompt,
thinking_enabled=thinking_enabled,
)
def _restore_or_replay_session(self) -> str:
if self.runtime is None:
raise RuntimeError("Assistant runtime is not available for restore.")
runtime = self.runtime
fallback_tokens = self.session.rendered_token_ids
if len(fallback_tokens) == 0:
return "empty"
try:
live_seq = runtime._get_active_sequence()
except Exception:
live_seq = None
if live_seq is not None:
live_token_ids = [int(token_id) for token_id in live_seq.token_ids]
snapshot_seq = None if self.session.runtime_snapshot is None else self.session.runtime_snapshot.get("sequence", {})
snapshot_token_ids = [] if not isinstance(snapshot_seq, dict) else [int(token_id) for token_id in snapshot_seq.get("token_ids", []) or []]
if len(snapshot_token_ids) > 0 and snapshot_token_ids == live_token_ids:
self._log("Session context reused live runtime. [no prefill redone]")
self.session.runtime_snapshot = None
self.session.pending_replay_reason = ""
return "reused"
if fallback_tokens[: len(live_token_ids)] == live_token_ids:
self._log("Session context reused live runtime. [no prefill redone]")
self.session.runtime_snapshot = None
self.session.pending_replay_reason = ""
return "reused"
mode, runtime_replay_reason = self._run_prefill_call(
len(fallback_tokens),
lambda: runtime.restore_or_replay(self.session.runtime_snapshot, fallback_tokens),
record_if=lambda result: isinstance(result, tuple) and len(result) > 0 and result[0] == "replayed",
)
pending_replay_reason = str(self.session.pending_replay_reason or "").strip()
runtime_replay_reason = str(runtime_replay_reason or "").strip()
if len(pending_replay_reason) > 0 and runtime_replay_reason == "no exact runtime snapshot was available":
replay_reason = pending_replay_reason
elif len(pending_replay_reason) > 0 and len(runtime_replay_reason) > 0:
replay_reason = f"{pending_replay_reason}; {runtime_replay_reason}"
else:
replay_reason = pending_replay_reason or runtime_replay_reason
if mode == "replayed":
if len(replay_reason) > 0:
self._log(f"Session context replayed. Reason: {replay_reason} [prefill redone]")
else:
self._log("Session context replayed. [prefill redone]")
elif mode == "restored":
if len(replay_reason) > 0:
self._log(f"Session context restored. Reason: {replay_reason} [no prefill redone]")
else:
self._log("Session context restored. [no prefill redone]")
else:
self._log(f"Session context {mode}.")
self.session.runtime_snapshot = None
self.session.pending_replay_reason = ""
return mode
def _discard_oldest_completed_turn(self) -> str:
messages = self.session.messages
user_indexes = [idx for idx, message in enumerate(messages) if str(message.get("role", "")).strip().lower() == "user"]
if len(user_indexes) > 1:
cut = user_indexes[1]
del messages[:cut]
return f"dropped oldest turn ({cut} messages)"
return ""
def _fit_rendered_messages_to_window(self, *, add_generation_prompt: bool, reserve_tokens: int = 0) -> list[int]:
if self.runtime is None:
raise RuntimeError("Assistant runtime is not available for context fitting.")
max_model_len = self._get_context_window_tokens()
hard_budget = max(1, max_model_len - max(0, int(reserve_tokens)))
post_trim_budget = min(hard_budget, max(1, int(max_model_len * _POST_TRIM_WINDOW_FRACTION)))
target_tokens = self._render_messages(add_generation_prompt=add_generation_prompt)
if len(target_tokens) <= hard_budget:
return target_tokens
while len(target_tokens) > post_trim_budget:
trim_reason = self._discard_oldest_completed_turn()
if len(trim_reason) == 0:
if len(target_tokens) > hard_budget:
raise RuntimeError(f"Current assistant turn alone exceeds the model window ({len(target_tokens)} > {hard_budget}) and will not be cut mid-turn.")
break
self._log(f"Trimming assistant context: {trim_reason}.")
target_tokens = self._render_messages(add_generation_prompt=add_generation_prompt)
if len(target_tokens) > hard_budget:
raise RuntimeError(f"Assistant context exceeds the model window ({len(target_tokens)} > {hard_budget}) and cannot be trimmed further without cutting the current turn.")
return target_tokens
def _sync_generation_context(self) -> None:
runtime = self._acquire_runtime()
if len(self.session.rendered_token_ids) > 0:
restore_mode = self._restore_or_replay_session()
if restore_mode in ("reused", "restored") and self._can_append_pending_tool_suffix():
thinking_enabled = qwen35_text._prompt_enhancer_thinking_enabled(self.runtime.model, thinking_enabled=self.thinking_enabled)
suffix_tokens = render_tool_turn_suffix(runtime.tokenizer, self._pending_tool_render_contents(), thinking_enabled=thinking_enabled)
if len(suffix_tokens) > 0:
prefix_tokens = self._active_sequence_token_count()
prefix_tokens = len(self.session.rendered_token_ids) if prefix_tokens is None else prefix_tokens
mode = self._run_prefill_call(prefix_tokens + len(suffix_tokens), lambda: runtime.append_suffix(suffix_tokens), record_if=lambda result: result == "prefilled")
self._record_live_context("Generation context extended from live runtime. [suffix append only]" if mode == "extended" else "Generation context prefilled from live runtime. [prefill redone]" if mode == "prefilled" else f"Generation context {mode} from live runtime.")
return
if restore_mode in ("reused", "restored") and self._can_append_pending_user_suffix():
thinking_enabled = qwen35_text._prompt_enhancer_thinking_enabled(self.runtime.model, thinking_enabled=self.thinking_enabled)
suffix_tokens = render_text_user_turn_suffix(runtime.tokenizer, self._pending_user_render_content(), thinking_enabled=thinking_enabled)
if len(suffix_tokens) > 0:
prefix_tokens = self._active_sequence_token_count()
prefix_tokens = len(self.session.rendered_token_ids) if prefix_tokens is None else prefix_tokens
mode = self._run_prefill_call(prefix_tokens + len(suffix_tokens), lambda: runtime.append_suffix(suffix_tokens), record_if=lambda result: result == "prefilled")
self._record_live_context("Generation context extended from live runtime. [suffix append only]" if mode == "extended" else "Generation context prefilled from live runtime. [prefill redone]" if mode == "prefilled" else f"Generation context {mode} from live runtime.")
return
target_tokens = self._fit_rendered_messages_to_window(add_generation_prompt=True, reserve_tokens=128)
if len(self.session.rendered_token_ids) > 0:
mode = self._run_prefill_call(len(target_tokens), lambda: runtime.extend_context(target_tokens), record_if=lambda result: result in ("prefilled", "replayed"))
self._remember_render_state()
if mode == "prefilled":
self._log("Generation context prefilled. [prefill redone]")
elif mode == "extended":
self._log("Generation context extended. [suffix append only]")
elif mode == "replayed":
self._log("Generation context replayed. [prefill redone]")
else:
self._log(f"Generation context {mode}.")
return
self._run_prefill_call(len(target_tokens), lambda: runtime.prime_context(target_tokens))
self._remember_render_state()
self._log("Generation context primed. [prefill redone]")
def _canonicalize_context(self, sync_runtime: bool | str = True) -> str:
if self.runtime is None:
raise RuntimeError("Assistant runtime is not available for canonicalization.")
target_tokens = self._fit_rendered_messages_to_window(add_generation_prompt=False)
if not sync_runtime or sync_runtime == "record_only":
self.session.rendered_token_ids = list(target_tokens)
self.session.runtime_snapshot = None
self.session.pending_replay_reason = "context canonicalization was recorded without syncing runtime"
self._remember_render_state()
self._skip_pause_snapshot = True
self._log("Canonical context recorded without runtime sync.")
return "recorded"
if sync_runtime == "record_preserve_live":
self.session.rendered_token_ids = list(target_tokens)
self.session.runtime_snapshot = None
self.session.pending_replay_reason = ""
self._remember_render_state()
self._skip_pause_snapshot = False
self._log("Canonical context recorded while preserving live runtime.")
return "recorded"
current_seq = self.runtime._get_active_sequence()
if sync_runtime == "if_cheap":
if current_seq is None or len(current_seq.token_ids) == 0:
self.session.rendered_token_ids = list(target_tokens)
self.session.runtime_snapshot = None
self.session.pending_replay_reason = "no active runtime sequence was available during canonicalization"
self._remember_render_state()
self._skip_pause_snapshot = True
self._log("Canonical context recorded without runtime sync because no active sequence was available.")
return "recorded"
current_token_ids = [int(token_id) for token_id in current_seq.token_ids]
if target_tokens[: len(current_token_ids)] != current_token_ids:
self.session.rendered_token_ids = list(target_tokens)
self.session.runtime_snapshot = None
self.session.pending_replay_reason = _describe_prefix_mismatch(current_token_ids, target_tokens)
self._remember_render_state()
self._skip_pause_snapshot = True
self._log("Canonical context recorded without runtime sync because it would require replay.")
return "recorded"
self._skip_pause_snapshot = False
self.session.pending_replay_reason = ""
if current_seq is None or len(current_seq.token_ids) == 0:
self.runtime.prime_context(target_tokens)
self._log("Canonical context rebuilt from scratch.")
mode = "replayed"
else:
mode = self.runtime.extend_context(target_tokens)
self._log(f"Canonical context {mode}.")
self.session.rendered_token_ids = list(target_tokens)
self._remember_render_state()
return mode
def _build_tool_error(self, tool_name: str, arguments: dict[str, Any], error_text: str) -> dict[str, Any]:
return {
"status": "error",
"tool": tool_name,
"arguments": dict(arguments or {}),
"error": str(error_text),
}
def _execute_tool(self, tool_call: dict[str, Any]) -> dict[str, Any]:
tool_name = str(tool_call.get("name", "")).strip()
tool_label = self.tool_box.get_tool_display_name(tool_name)
tool_transcript_label = self.tool_box.get_tool_transcript_label(tool_name)
tool_template = self.tool_box.get_tool_template_filename(tool_name)
tool_policy = self.tool_box.get_tool_policy(tool_name)
arguments = dict(tool_call.get("arguments", {}) or {})
self._log(f"Tool call: {tool_name} {arguments}")
message_id = self._ensure_active_turn()
tool_id, tool_event = assistant_chat.add_tool_call(self.session, message_id, tool_name, arguments, tool_label=tool_transcript_label)
self._emit_chat_event(tool_event)
validation_error = self.tool_box.validate_tool_call(tool_name, arguments)
if len(validation_error) > 0:
result = self._build_tool_error(tool_name, arguments, validation_error)
self._log(f"Tool validation error: {validation_error}")
self._set_status(f"{tool_label} failed: {validation_error}", kind="error")
self._emit_chat_event(assistant_chat.complete_tool_call(self.session, message_id, tool_id, result))
self._emit_chat_event(assistant_chat.build_sync_event(self.session, stats=self._chat_stats_payload()))
return result
if len(tool_template) > 0:
self._set_status(f"Using {tool_label} ({Path(tool_template).stem})...", kind="tool")
else:
self._set_status(f"Using {tool_label}...", kind="tool")
if tool_policy.get("pause_runtime", True):
self._pause_runtime(pause_reason=tool_policy.get("pause_reason", "tool"))
try:
self._active_tool_context = (message_id, tool_id)
result = self.tool_box.call(tool_name, arguments)
except Exception as exc:
result = self._build_tool_error(tool_name, arguments, str(exc))
self._log(f"Tool error: {exc}")
finally:
self._active_tool_context = None
self._log(f"Tool result: {_json_dumps(result)}")
self._emit_chat_event(assistant_chat.complete_tool_call(self.session, message_id, tool_id, result))
# Queue-backed tools can finish and immediately trigger another model pass; emit a full
# transcript sync here so the UI materializes the final tool state and attachment first.
self._emit_chat_event(assistant_chat.build_sync_event(self.session, stats=self._chat_stats_payload()))
return result
def _append_assistant_message(self, raw_text: str, tool_calls: list[dict[str, Any]] | None = None) -> list[dict[str, Any]]:
cleaned_text = strip_tool_blocks(raw_text)
if tool_calls:
cleaned_text = strip_inline_tool_call_text(cleaned_text)
message = {"role": "assistant"}
stripped_text = strip_trailing_stop_markup(cleaned_text)
stripped_raw_text = strip_trailing_stop_markup(raw_text)
thinking_text, answer_text = qwen35_text._split_generated_text(stripped_text)
thinking_enabled = self.runtime is not None and qwen35_text._prompt_enhancer_thinking_enabled(self.runtime.model, thinking_enabled=self.thinking_enabled)
if thinking_enabled and ("" in stripped_text.lower() or "" in stripped_text.lower() or len(thinking_text) > 0):
content = "\n"
if len(thinking_text) > 0:
content += f"{thinking_text}\n"
content += ""
if len(answer_text) > 0:
content += f"\n\n{answer_text}"
else:
content = answer_text if len(answer_text) > 0 else stripped_text
if len(content) > 0:
message["content"] = content
if len(stripped_raw_text) > 0 and not tool_calls:
message["model_content"] = stripped_raw_text
if tool_calls:
message["tool_calls"] = [
{
"id": f"call_{int(time.time() * 1000)}_{idx}",
"type": "function",
"function": {
"name": tool_call["name"],
"arguments": dict(tool_call["arguments"]),
},
}
for idx, tool_call in enumerate(tool_calls)
]
self.session.messages.append(message)
return message.get("tool_calls", [])
def _append_tool_message(self, payload: dict[str, Any], tool_call_id: str | None = None) -> None:
message = {"role": "tool", "content": _json_dumps(payload)}
if tool_call_id:
message["tool_call_id"] = str(tool_call_id)
self.session.messages.append(message)
def run_turn(self, user_text: str, max_new_tokens: int = 1024, seed: int | None = 0, do_sample: bool = True, temperature: float | None = 0.6, top_p: float | None = 0.9, top_k: int | None = None) -> None:
user_text = str(user_text or "").strip()
if len(user_text) == 0:
self._send_chat("Please enter a request.")
return
if self.debug_enabled:
print("[User]")
print(user_text)
self._active_turn_id = ""
self._refresh_runtime_status_note()
self.session.messages.append(self._build_pending_user_message(user_text))
checkpoint_assistant_turn(self.session)
recent_thoughts: list[str] = []
model_passes = 0
final_user_text = ""
turn_completed = False
try:
while True:
if self.session.interrupt_requested:
break
show_loading_status = model_passes == 0 and (
self.session.force_loading_status_once
or (len(self.session.rendered_token_ids) == 0 and self.session.runtime_snapshot is None)
)
self._set_status("Loading Deepy..." if show_loading_status else "Thinking...", kind="loading" if show_loading_status else "thinking")
self._sync_generation_context()
self._emit_stats(force=True)
if self.session.interrupt_requested:
break
if show_loading_status:
self.session.force_loading_status_once = False
self._set_status("Thinking...", kind="thinking")
self._start_stream_pass()
result = None
try:
result = self.runtime.generate_segment(
max_new_tokens=max_new_tokens,
seed=seed,
do_sample=do_sample,
temperature=temperature,
top_p=top_p,
top_k=top_k,
thinking_enabled=self.thinking_enabled,
stop_requested=lambda: bool(self.session.interrupt_requested),
stream_callback=self._stream_generation_update,
stream_interval_seconds=1.0,
)
finally:
self._finish_stream_pass(None if result is None else result.token_count)
model_passes += 1
if self.session.interrupt_requested or result.stop_reason == "interrupted":
break
raw_text = result.raw_text
thinking_text, answer_text = self._split_for_display(raw_text)
normalized_thinking = re.sub(r"\s+", " ", str(thinking_text or "")).strip()
if len(normalized_thinking) == 0:
recent_thoughts.clear()
else:
recent_thoughts.append(normalized_thinking)
if len(recent_thoughts) > 4:
recent_thoughts = recent_thoughts[-4:]
if len(recent_thoughts) >= 3 and recent_thoughts[-1] == recent_thoughts[-2] == recent_thoughts[-3]:
self._send_chat("Assistant stopped because the same thought repeated 3 times in a row.")
turn_completed = True
break
if (
len(recent_thoughts) >= 4
and recent_thoughts[-1] == recent_thoughts[-3]
and recent_thoughts[-2] == recent_thoughts[-4]
and recent_thoughts[-1] != recent_thoughts[-2]
):
self._send_chat("Assistant stopped because the same two thoughts started alternating in a loop.")
turn_completed = True
break
tool_calls = extract_tool_calls(raw_text)
if len(tool_calls) == 0:
tool_calls = self.tool_box.infer_tool_calls(raw_text)
if self.debug_enabled:
self._log(f"Model stop reason: {result.stop_reason}")
print("[Assistant][Raw]")
print(raw_text)
if tool_calls:
stored_tool_calls = self._append_assistant_message(raw_text, tool_calls=tool_calls)
self._record_live_context("Assistant tool-call context recorded from live runtime.")
for tool_call, stored_tool_call in zip(tool_calls, stored_tool_calls):
if self.session.interrupt_requested:
break
tool_result = self._execute_tool(tool_call)
self._append_tool_message(tool_result, stored_tool_call.get("id"))
checkpoint_assistant_turn(self.session)
if self.session.interrupt_requested:
break
continue
self._append_assistant_message(raw_text)
self._record_live_context("Assistant context recorded from live runtime.")
final_user_text = "" if len(self._stream_answer_text.strip()) > 0 else (answer_text or qwen35_text._clean_generated_text(raw_text))
turn_completed = True
break
finally:
self._hide_status()
try:
self._pause_runtime(pause_reason="idle")
except Exception as exc:
self._log(f"Pause-after-turn failed: {exc}")
if self.session.interrupt_requested:
rollback_assistant_turn(self.session)
finish_assistant_turn(self.session)
self.session.runtime_status_note = ""
self._prefill_started_at = None
self._live_prefill_tokens = 0
self._segment_started_at = None
self._segment_generated_tokens = 0
self._emit_stats(force=True)
if not self.session.interrupt_requested and len(final_user_text.strip()) > 0:
self._send_chat(final_user_text)
if turn_completed and not self.session.interrupt_requested and len(self.session.interruption_notice.strip()) > 0:
if self.debug_enabled:
self._log("Clearing interruption notice after a successful follow-up turn.")
self.session.interruption_notice = ""