""" Stream chat service implemented with Agno SDK (Agent + tools + DB). """ from __future__ import annotations import ast import asyncio import json import os import re import time from collections.abc import AsyncGenerator from datetime import datetime from pathlib import Path from typing import Any from zoneinfo import ZoneInfo from agno.agent import Agent, RunEvent from agno.models.message import Message from agno.run.agent import RunOutput, ToolCallCompletedEvent, ToolCallStartedEvent from agno.run.team import TeamRunEvent from agno.utils.log import logger from ..models.stream_chat import ( AgentStatusEvent, DoneEvent, ErrorEvent, FormRequestEvent, # New: HITL form request event SourceEvent, StreamChatRequest, TextEvent, ThoughtEvent, ToolCallEvent, ToolResultEvent, ) from .agent_registry import get_agent_for_provider, build_team, resolve_agent_config from .hitl_storage import get_hitl_storage from .summary_service import update_session_summary from .tool_registry import resolve_tool_name MEMORY_OPTIMIZE_THRESHOLD = 50 MEMORY_OPTIMIZE_INTERVAL_SECONDS = 60 * 60 * 12 THINK_TAG_REGEX = re.compile(r"", re.IGNORECASE) PROTOCOL_TAG_REGEX = re.compile( r"(?:<\s*[||]\s*(?P[a-zA-Z0-9_]+)\s*[||]\s*>)" r"|(?:<\s*(?P/?)\s*[||]\s*DSML\s*[||]\s*(?P[^>]+)>)", re.IGNORECASE, ) TOOL_TRACE_BEGIN_TAGS = { "tool_calls_begin", "tool_calls_section_begin", "tool_call_begin", "tool_argument_begin", "tool_call_argument_begin", } TOOL_TRACE_END_TAGS = { "tool_argument_end", "tool_call_argument_end", "tool_call_end", "tool_calls_end", "tool_calls_section_end", } def _strip_internal_tool_trace(text: str) -> str: """Remove explicit protocol marker tokens without truncating normal text.""" if not text: return "" cleaned = str(text) cleaned = re.sub(r"", "", cleaned, flags=re.IGNORECASE) # Remove protocol markers like <|tool_calls_begin|> and spaced variants. cleaned = re.sub(r"<\s*[||]\s*[^|>||]*\s*[||]\s*>", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"]*>", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\[SYSTEM INJECTED CONTEXT\]", "", cleaned, flags=re.IGNORECASE) # Remove inline tool-call-like snippets leaked by some models. cleaned = re.sub( r"([::]\s*)?[a-zA-Z_][a-zA-Z0-9_]{1,80}\s*\{[^{}\n]{0,1200}\}", "", cleaned, flags=re.IGNORECASE, ) return cleaned def _split_content_by_think_tags(text: str, in_think: bool) -> tuple[list[tuple[str, str]], bool]: """Split a content chunk into ordered thought/text segments by / tags.""" if not text: return [], in_think segments: list[tuple[str, str]] = [] cursor = 0 current_in_think = in_think for match in THINK_TAG_REGEX.finditer(text): start, end = match.span() if start > cursor: piece = text[cursor:start] if piece: segments.append(("thought" if current_in_think else "text", piece)) tag = match.group(0).lower() current_in_think = not tag.startswith(" tuple[str, int, str, bool]: """ Strip inline tool-protocol payloads from content chunks safely across chunk boundaries. Returns: cleaned_text, next_tool_trace_depth, next_protocol_tail, had_protocol_tokens """ combined = f"{protocol_tail}{text or ''}" if not combined: return "", tool_trace_depth, "", False # Keep trailing incomplete protocol marker for next chunk. tail = "" tail_start = -1 for m in re.finditer(r"<\s*/?\s*[||]", combined): tail_start = m.start() if tail_start != -1 and ">" not in combined[tail_start:]: tail = combined[tail_start:] combined = combined[:tail_start] if not combined: return "", tool_trace_depth, tail, bool(tail) # Non-destructive stripping: remove marker tokens only. matches = list(PROTOCOL_TAG_REGEX.finditer(combined)) had_protocol = bool(matches) if not had_protocol: return combined, 0, tail, bool(tail) cleaned = PROTOCOL_TAG_REGEX.sub("", combined) return cleaned, 0, tail, True def _squash_whitespace(text: Any) -> str: return re.sub(r"\s+", "", str(text or "")) def _extract_agent_info_from_event( run_event: Any, leader_id: str | None = None, leader_name: str | None = None, leader_emoji: str | None = None, agent_metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Extract agent identification from a run event. For Team mode, member events have agent_id and agent_name directly on the event. If these are set, it's a member event; otherwise it's from the leader. Returns: dict with 'agent_id', 'agent_name', 'agent_role', 'agent_emoji', 'model', 'provider' keys """ agent_id = getattr(run_event, "agent_id", None) agent_name = getattr(run_event, "agent_name", None) agent_emoji = getattr(run_event, "agent_emoji", None) # Check if this matches the leader. # Important: some providers might use slightly different names, but if IDs match it's definitely leader. is_leader = False if leader_id and agent_id == leader_id: is_leader = True elif not agent_id and not agent_name: # Default to leader if no info is present is_leader = True elif not leader_id and agent_name == leader_name: is_leader = True # If the ID starts with 'qurio-' (default Agno IDs often follow this pattern) # and we are in team mode, and it's not explicitly a member ID in our metadata, # it's highly likely the leader's initialization event. elif str(agent_id or "").startswith("qurio-") and agent_metadata: is_leader = agent_id not in agent_metadata if is_leader: res = { "agent_id": leader_id, "agent_name": leader_name, "agent_role": "leader", "agent_emoji": leader_emoji, "model": None, "provider": None, } if agent_metadata and leader_id in agent_metadata: res.update(agent_metadata[leader_id]) return res # Otherwise treat as member if agent_id or agent_name: # Log at DEBUG level to reduce main stream noise, as switch logs will provide context. logger.debug(f"[TEAM] Member event detected: agent_id={agent_id}, agent_name={agent_name}") res = { "agent_id": agent_id, "agent_name": agent_name, "agent_role": "member", "agent_emoji": agent_emoji, "model": None, "provider": None, } # Enrich from metadata if possible if agent_metadata: if agent_id in agent_metadata: res.update(agent_metadata[agent_id]) elif agent_name in agent_metadata: res.update(agent_metadata[agent_name]) elif not agent_id and agent_name: # Fallback lookup by name if ID missing on event for meta_id, meta in agent_metadata.items(): if meta.get("name") == agent_name: res.update(meta) res["agent_id"] = meta_id break return res # Fallback to leader if totally ambiguous res = { "agent_id": leader_id, "agent_name": leader_name, "agent_role": "leader", "agent_emoji": leader_emoji, "model": None, "provider": None, } if agent_metadata and leader_id in agent_metadata: res.update(agent_metadata[leader_id]) return res def _is_reasoning_duplicate_of_content(reasoning: str, content: str) -> bool: """ Detect provider chunks where answer text is mirrored in reasoning_content. This prevents answer paragraphs from being rendered as a second thought block. """ reasoning_flat = _squash_whitespace(reasoning) content_flat = _squash_whitespace(content) if not reasoning_flat or not content_flat: return False if reasoning_flat == content_flat: return True shorter, longer = ( (reasoning_flat, content_flat) if len(reasoning_flat) <= len(content_flat) else (content_flat, reasoning_flat) ) if len(shorter) < 12: return False if shorter in longer and len(shorter) >= int(len(longer) * 0.75): return True return False def _is_stream_trace_enabled() -> bool: value = str(os.getenv("QURIO_STREAM_TRACE", "")).strip().lower() return value in {"1", "true", "yes", "on", "debug"} def _is_verbose_logs_enabled() -> bool: value = str(os.getenv("QURIO_VERBOSE_LOGS", "0")).strip().lower() return value in {"1", "true", "yes", "on", "debug"} def _log_verbose_info(message: str) -> None: if _is_verbose_logs_enabled(): logger.info(message) else: logger.debug(message) def _preview(text: Any, limit: int = 140) -> str: raw = str(text or "").replace("\n", "\\n") return raw[:limit] + ("..." if len(raw) > limit else "") def _extract_message_from_payload(payload: Any) -> str | None: if payload is None: return None if hasattr(payload, "model_dump"): try: payload = payload.model_dump() except Exception: payload = str(payload) if isinstance(payload, dict): for key in ("message", "error", "detail", "msg"): value = payload.get(key) if isinstance(value, str) and value.strip(): return value.strip() nested = _extract_message_from_payload(value) if nested: return nested for value in payload.values(): nested = _extract_message_from_payload(value) if nested: return nested return None if isinstance(payload, (list, tuple)): for item in payload: nested = _extract_message_from_payload(item) if nested: return nested return None text = str(payload).strip() if not text: return None # Agno RunErrorEvent repr: RunErrorEvent(..., content='Unknown model error', ...) run_error_content_match = re.search( r"""content\s*=\s*(['"])(.*?)\1""", text, re.IGNORECASE | re.DOTALL, ) if run_error_content_match and run_error_content_match.group(2).strip(): return run_error_content_match.group(2).strip() for parser in (json.loads, ast.literal_eval): try: parsed = parser(text) nested = _extract_message_from_payload(parsed) if nested: return nested except Exception: pass json_like = re.search(r"(\{[\s\S]*\})", text) if json_like: snippet = json_like.group(1) for parser in (json.loads, ast.literal_eval): try: parsed = parser(snippet) nested = _extract_message_from_payload(parsed) if nested: return nested except Exception: pass message_match = re.search(r"""['"]message['"]\s*:\s*['"](.+?)['"]""", text, re.IGNORECASE) if message_match and message_match.group(1).strip(): return message_match.group(1).strip() return text def _extract_best_error_message(exc: Exception | Any) -> str: """Extract the most actionable provider message from nested exceptions.""" generic_markers = ("unknown model error", "unknown error", "model provider error") def _is_generic(text: str) -> bool: lowered = text.strip().lower() return any(marker in lowered for marker in generic_markers) candidates: list[str] = [] queue: list[Any] = [exc] seen: set[int] = set() while queue: current = queue.pop(0) if current is None: continue marker = id(current) if marker in seen: continue seen.add(marker) extracted = _extract_message_from_payload(current) if extracted and extracted.strip(): candidates.append(extracted.strip()) if isinstance(current, BaseException): queue.append(getattr(current, "__cause__", None)) queue.append(getattr(current, "__context__", None)) args = getattr(current, "args", None) if isinstance(args, tuple): queue.extend(args) for attr in ("content", "error", "message", "detail", "model_provider_data"): if hasattr(current, attr): queue.append(getattr(current, attr, None)) for msg in candidates: if not _is_generic(msg): return msg if candidates: # Avoid dumping full event repr like "RunErrorEvent(...)" to UI. filtered = [msg for msg in candidates if not msg.strip().lower().startswith("runerrorevent(")] if filtered: return min(filtered, key=len) return min(candidates, key=len) return str(exc or "Unknown error") def _extract_text_chunk(run_event: Any) -> str: """Extract assistant text only from explicit content fields. Shared between stream_chat() and _continue_hitl_run() to avoid duplication. """ provider_data = getattr(run_event, "model_provider_data", None) if isinstance(provider_data, dict): choices = provider_data.get("choices") or [] if choices and isinstance(choices[0], dict): delta = choices[0].get("delta") or {} raw_content = delta.get("content") if isinstance(raw_content, str) and raw_content: return raw_content if isinstance(raw_content, list): parts: list[str] = [] for item in raw_content: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: parts.append(str(text_part)) elif isinstance(item, str) and item: parts.append(item) if parts: return "".join(parts) raw_reasoning = delta.get("reasoning_content") if isinstance(raw_reasoning, str) and raw_reasoning: return "" if isinstance(raw_reasoning, list): reasoning_parts: list[str] = [] for item in raw_reasoning: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: reasoning_parts.append(str(text_part)) elif isinstance(item, str) and item: reasoning_parts.append(item) if reasoning_parts: return "" content = getattr(run_event, "content", None) if isinstance(content, str) and content: return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if isinstance(text_part, str) and text_part: parts.append(text_part) elif isinstance(item, str) and item: parts.append(item) if parts: return "".join(parts) if isinstance(provider_data, dict): choices = provider_data.get("choices") or [] if choices and isinstance(choices[0], dict): delta = choices[0].get("delta") or {} raw_content = delta.get("content") if isinstance(raw_content, str) and raw_content: return raw_content if isinstance(raw_content, list): parts: list[str] = [] for item in raw_content: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: parts.append(str(text_part)) elif isinstance(item, str) and item: parts.append(item) if parts: return "".join(parts) return "" def _extract_reasoning_chunk( run_event: Any, trace_fn: Any = None, ) -> str: """Extract reasoning/thought content from a stream event. Shared between stream_chat() and _continue_hitl_run() to avoid duplication. ``trace_fn`` is an optional callable(stage, **kwargs) for trace logging. """ def _trace(stage: str, **kwargs: Any) -> None: if trace_fn: trace_fn(stage, **kwargs) provider_data = getattr(run_event, "model_provider_data", None) if isinstance(provider_data, dict): choices = provider_data.get("choices") or [] if choices and isinstance(choices[0], dict): delta = choices[0].get("delta") or {} raw_reasoning = delta.get("reasoning_content") if isinstance(raw_reasoning, str) and raw_reasoning: _trace("reasoning_source", source="provider_data.delta.reasoning_content") return raw_reasoning if isinstance(raw_reasoning, list): parts: list[str] = [] for item in raw_reasoning: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: parts.append(str(text_part)) elif isinstance(item, str) and item: parts.append(item) if parts: _trace("reasoning_source", source="provider_data.delta.reasoning_content[]") return "".join(parts) reasoning = getattr(run_event, "reasoning_content", None) if isinstance(reasoning, str) and reasoning: _trace("reasoning_source", source="run_event.reasoning_content") return reasoning if isinstance(reasoning, list): parts: list[str] = [] for item in reasoning: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: parts.append(str(text_part)) elif isinstance(item, str) and item: parts.append(item) if parts: _trace("reasoning_source", source="run_event.reasoning_content[]") return "".join(parts) if isinstance(provider_data, dict): choices = provider_data.get("choices") or [] if choices and isinstance(choices[0], dict): delta = choices[0].get("delta") or {} raw_reasoning = delta.get("reasoning_content") if isinstance(raw_reasoning, str) and raw_reasoning: _trace("reasoning_source", source="provider_data.delta.reasoning_content") return raw_reasoning if isinstance(raw_reasoning, list): parts: list[str] = [] for item in raw_reasoning: if isinstance(item, dict): text_part = item.get("text") or item.get("content") if text_part: parts.append(str(text_part)) if parts: _trace("reasoning_source", source="provider_data.delta.reasoning_content[]") return "".join(parts) # NOTE: # Some providers (e.g. DeepSeek-compatible streams) may place # assistant answer tokens under `delta.reasoning`. # Treating that field as reasoning can misclassify answer text as thought. # Keep reasoning extraction strict to `reasoning_content` only. return "" def _is_raw_events_log_enabled() -> bool: value = str(os.getenv("QURIO_RAW_EVENTS_LOG", "0")).strip().lower() return value not in {"0", "false", "off", "no"} def _raw_events_log_path() -> Path: configured = str(os.getenv("QURIO_RAW_EVENTS_LOG_PATH", "")).strip() if configured: return Path(configured) logs_dir = Path(__file__).resolve().parents[2] / "logs" logs_dir.mkdir(parents=True, exist_ok=True) date_tag = datetime.utcnow().strftime("%Y%m%d") return logs_dir / f"agno_raw_events_{date_tag}.jsonl" def _append_raw_event_log( *, phase: str, request: StreamChatRequest, run_id: str | None, run_event: Any, ) -> None: if not _is_raw_events_log_enabled(): return try: event_name = str(getattr(run_event, "event", "") or "") content_chunk = _extract_text_chunk(run_event) reasoning_chunk = _extract_reasoning_chunk(run_event) payload = { "timestamp": datetime.utcnow().isoformat() + "Z", "phase": phase, "provider": request.provider, "model": request.model, "conversation_id": request.conversation_id, "run_id": run_id or getattr(run_event, "run_id", None), "event_name": event_name, "raw_event_type": type(run_event).__name__, "has_content": bool(str(content_chunk or "").strip()), "has_reasoning_content": bool(str(reasoning_chunk or "").strip()), "content_preview": _preview(content_chunk), "reasoning_preview": _preview(reasoning_chunk), "raw_event": repr(run_event), } log_path = _raw_events_log_path() with log_path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload, ensure_ascii=False) + "\n") except Exception: # Never break stream flow due to diagnostics logging failures. return def _extract_completed_content_and_output( run_event: Any, streamed_content: str = "", ) -> tuple[str, Any]: """ Extract final assistant content/output from RunCompleted-style events. Shared between normal stream_chat() and HITL continuation to keep behavior aligned. """ agn_content = getattr(run_event, "content", None) run_response = getattr(run_event, "run_response", None) if not agn_content and run_response is not None: agn_content = getattr(run_response, "content", None) final_content = streamed_content or "" output = None # Structured output should override streamed text to preserve canonical payload. if agn_content and hasattr(agn_content, "model_dump"): output = agn_content final_content = json.dumps(agn_content.model_dump(), ensure_ascii=False) elif isinstance(agn_content, (dict, list)): output = agn_content final_content = json.dumps(agn_content, ensure_ascii=False) elif isinstance(agn_content, str) and agn_content.strip() and not final_content: final_content = agn_content # Fallback for providers that only keep final assistant text in run_response.messages. if not final_content and run_response is not None: try: rr_messages = getattr(run_response, "messages", None) or [] for rr_msg in reversed(rr_messages): rr_role = getattr(rr_msg, "role", None) rr_content = getattr(rr_msg, "content", None) if rr_role != "assistant": continue extracted = _extract_text_from_message_content(rr_content).strip() if extracted: final_content = extracted break except Exception: pass return final_content, output def _extract_text_from_message_content(content: Any) -> str: """ Best-effort text extraction for provider-specific assistant message payloads. """ if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) continue if isinstance(item, dict): text_part = item.get("text") if isinstance(text_part, str): parts.append(text_part) continue content_part = item.get("content") if isinstance(content_part, str): parts.append(content_part) continue if isinstance(content_part, (list, dict)): nested = _extract_text_from_message_content(content_part) if nested: parts.append(nested) parts_part = item.get("parts") if isinstance(parts_part, (list, dict)): nested = _extract_text_from_message_content(parts_part) if nested: parts.append(nested) return "".join(parts) if isinstance(content, dict): text_part = content.get("text") if isinstance(text_part, str): return text_part content_part = content.get("content") if isinstance(content_part, str): return content_part if isinstance(content_part, (list, dict)): nested = _extract_text_from_message_content(content_part) if nested: return nested parts_part = content.get("parts") if isinstance(parts_part, (list, dict)): nested = _extract_text_from_message_content(parts_part) if nested: return nested return "" def _coerce_tool_result_payload(output: Any) -> Any: """ Normalize tool output payload into JSON-friendly objects when possible. """ normalized = output if normalized and isinstance(normalized, str): try: normalized = json.loads(normalized) except json.JSONDecodeError: pass if isinstance(normalized, str): try: parsed = ast.literal_eval(normalized) if isinstance(parsed, dict): normalized = parsed except (ValueError, SyntaxError): pass return normalized def _build_tool_result_event( tool: Any, duration_ms: int | None, normalize_tool_output_fn: Any, agent_info: dict[str, str | None] | None = None, ) -> tuple[dict[str, Any], Any]: """ Build frontend ToolResultEvent payload and return parsed tool output. """ output = _coerce_tool_result_payload(normalize_tool_output_fn(getattr(tool, "result", None))) event = ToolResultEvent( id=getattr(tool, "tool_call_id", None), name=getattr(tool, "tool_name", "") or "", status="done" if not getattr(tool, "tool_call_error", None) else "error", output=output, durationMs=duration_ms, agent_id=agent_info.get("agent_id") if agent_info else None, agent_name=agent_info.get("agent_name") if agent_info else None, agent_role=agent_info.get("agent_role") if agent_info else None, agent_emoji=agent_info.get("agent_emoji") if agent_info else None, ).model_dump(by_alias=True, exclude_none=True) return event, output def _build_tool_call_event( tool: Any, text_index: int, include_none: bool = False, agent_info: dict[str, str | None] | None = None, ) -> dict[str, Any]: payload = ToolCallEvent( id=getattr(tool, "tool_call_id", None), name=getattr(tool, "tool_name", "") or "", arguments=json.dumps(getattr(tool, "tool_args", None) or {}), text_index=text_index, agent_id=agent_info.get("agent_id") if agent_info else None, agent_name=agent_info.get("agent_name") if agent_info else None, agent_role=agent_info.get("agent_role") if agent_info else None, agent_emoji=agent_info.get("agent_emoji") if agent_info else None, ) if include_none: return payload.model_dump(by_alias=True, exclude_none=False) return payload.model_dump(by_alias=True, exclude_none=True) def _normalize_interactive_form_fields(raw_fields: Any) -> list[dict[str, Any]]: """ Normalize interactive_form fields to a strict list[dict]. Some providers/tool runtimes return fields as a JSON string; this helper parses and sanitizes that shape so FormRequestEvent validation won't fail. """ parsed = raw_fields if isinstance(parsed, str): text = parsed.strip() if not text: return [] try: parsed = json.loads(text) except Exception: try: parsed = ast.literal_eval(text) except Exception: logger.warning("interactive_form fields is invalid string, fallback to empty list") return [] if isinstance(parsed, dict): maybe_fields = parsed.get("fields") if isinstance(maybe_fields, list): parsed = maybe_fields else: return [] if not isinstance(parsed, list): return [] normalized: list[dict[str, Any]] = [] used_names: set[str] = set() def _slugify_name(value: Any, fallback_index: int) -> str: base = re.sub(r"[^a-zA-Z0-9_]+", "_", str(value or "").strip().lower()).strip("_") if not base: base = f"field_{fallback_index}" candidate = base suffix = 2 while candidate in used_names: candidate = f"{base}_{suffix}" suffix += 1 used_names.add(candidate) return candidate def _normalize_field_type(value: Any) -> str: candidate = str(value or "").strip().lower() if candidate in {"text", "number", "select", "checkbox", "range"}: return candidate return "text" for idx, item in enumerate(parsed, start=1): if isinstance(item, str): label = item.strip() if not label: continue normalized.append( { "name": _slugify_name(label, idx), "label": label, "type": "text", "required": False, } ) continue if not isinstance(item, dict): logger.warning("interactive_form field item is not dict, skipped: %s", type(item).__name__) continue raw_name = item.get("name") raw_label = item.get("label") label = str(raw_label or raw_name or f"Field {idx}").strip() or f"Field {idx}" field_name = _slugify_name(raw_name or label, idx) field_type = _normalize_field_type(item.get("type")) normalized_item = dict(item) normalized_item["name"] = field_name normalized_item["label"] = label normalized_item["type"] = field_type normalized_item["required"] = bool(item.get("required", False)) normalized.append(normalized_item) return normalized def _extract_interactive_form_payload(req: Any, default_title: str) -> tuple[str | None, str, list[dict[str, Any]]]: """Extract interactive form payload from requirement in a validation-safe way.""" tool_args = req.tool_execution.tool_args if getattr(req, "tool_execution", None) else {} if not isinstance(tool_args, dict): tool_args = {} form_id = tool_args.get("id") title = str(tool_args.get("title") or default_title) fields = _normalize_interactive_form_fields(tool_args.get("fields", [])) return form_id, title, fields def _is_interactive_form_requirement(req: Any) -> bool: tool_exec = getattr(req, "tool_execution", None) tool_name = getattr(tool_exec, "tool_name", None) if tool_exec else None return tool_name == "interactive_form" class StreamChatService: """Stream chat service implemented using Agno Agent streaming events.""" def __init__(self) -> None: self._last_memory_optimization: dict[str, float] = {} async def stream_chat( self, request: StreamChatRequest, ) -> AsyncGenerator[dict[str, Any], None]: """ Stream chat completion with HITL support. If request.run_id is present, this is a resumption request after form submission. Otherwise, this is a normal chat request. """ # ================================================================ # HITL: Check if this is a resumption request # ================================================================ if request.run_id and request.field_values: _log_verbose_info(f"Detected HITL resumption request (run_id: {request.run_id})") async for event in self._continue_hitl_run(request): yield event return # Debug: Log request fields for expert mode if request.expert_mode: logger.info(f"[DEBUG] Expert mode request - leader_agent_id: {getattr(request, 'leader_agent_id', 'NOT_FOUND')}, team_agent_ids: {getattr(request, 'team_agent_ids', [])}") # ================================================================ # Normal chat flow # ================================================================ try: if not request.provider: raise ValueError("Missing required field: provider") if not request.messages: raise ValueError("Missing required field: messages") # Enable skills for the definitive user-facing chat agent request.enable_skills = True # Build standard agent or Team agent_metadata: dict[str, Any] = {} if request.expert_mode and getattr(request, "team_agent_ids", []): # Log leader configuration for debugging logger.info(f"[TEAM] Building team - leader_agent_id: {getattr(request, 'leader_agent_id', None)}, team_agent_ids: {request.team_agent_ids}") # 1. Resolve Leader Configuration if ID is provided if getattr(request, "leader_agent_id", None): request = resolve_agent_config(request.leader_agent_id, request) logger.info(f"[TEAM] Leader resolved - agent_id: {getattr(request, 'agent_id', None)}, agent_name: {getattr(request, 'agent_name', None)}") if request.agent_id: agent_metadata[request.agent_id] = { "model": request.model, "provider": request.provider, } # 2. Resolve Member Agents members = [] for a_id in request.team_agent_ids: import copy sub_req = copy.deepcopy(request) sub_req.expert_mode = False # Fetch actual member config from DB member_req = resolve_agent_config(a_id, sub_req) members.append(get_agent_for_provider(member_req)) # Capture member metadata if member_req.agent_id: agent_metadata[member_req.agent_id] = { "model": member_req.model, "provider": member_req.provider, } if member_req.agent_name: agent_metadata[member_req.agent_name] = { "model": member_req.model, "provider": member_req.provider, } # 3. Build the Team with resolved leader (request) and members agent = build_team(request, members) is_team_mode = True else: agent = get_agent_for_provider(request) is_team_mode = False sources_map: dict[str, Any] = {} full_content = "" full_thought = "" tool_start_times: dict[str, float] = {} should_break_next_thought = False in_reasoning_phase = False reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" stream_trace = _is_stream_trace_enabled() # Current agent info for Team mode (updated per event) current_agent_info: dict[str, Any] = {"agent_id": None, "agent_name": None} last_active_agent_id = None def trace_stream(stage: str, **kwargs: Any) -> None: if not stream_trace: return payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()]) logger.info(f"[STREAM_TRACE][main] {stage} | {payload}") def emit_thought_part(part: str): nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle text = _strip_internal_tool_trace(str(part or "")) if not text or not text.strip(): return should_break_next_thought = False in_reasoning_phase = True full_thought += text trace_stream("emit_reasoning", reasoning_preview=_preview(text)) current_text_index = len(full_content) yield ThoughtEvent( content=text, text_index=current_text_index, agent_id=current_agent_info.get("agent_id"), agent_name=current_agent_info.get("agent_name"), agent_status=current_agent_info.get("status"), ).model_dump(by_alias=True, exclude_none=True) def process_text(text: str): nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle clean_text = _strip_internal_tool_trace(text) if clean_text: has_visible_text = bool(clean_text.strip()) if has_visible_text: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = True full_content += clean_text yield TextEvent( content=clean_text, agent_id=current_agent_info.get("agent_id"), agent_name=current_agent_info.get("agent_name"), agent_status=current_agent_info.get("status"), ).model_dump(by_alias=True, exclude_none=True) # Agent status tracking for Team mode agent_statuses: dict[str, str] = {} def set_agent_status(agent_id: str | None, status: str): if not agent_id: return if agent_statuses.get(agent_id) == status: return agent_statuses[agent_id] = status return AgentStatusEvent(agentId=agent_id, status=status).model_dump(by_alias=True) async def update_status_and_yield(agent_id: str | None, status: str): event = set_agent_status(agent_id, status) if event: yield event # Context management now handled by Agno's num_history_runs parameter messages = request.messages pre_events: list[dict[str, Any]] = [] messages = self._inject_local_time_context(messages, request, pre_events) enabled_tool_names = self._collect_enabled_tool_names(request) messages = self._inject_tool_guidance(messages, enabled_tool_names, request) for event in pre_events: yield event # ================================================================ # MANUAL CONTEXT MANAGEMENT (Rolling Summary + Fixed Window) # ================================================================ # 1. Fetch Session Summary from DB session_summary_text = None old_summary_json = None if request.conversation_id: try: from ..models.db import DbFilter, DbQueryRequest from .db_service import execute_db_async, get_db_adapter adapter = get_db_adapter(request.database_provider) if adapter: req = DbQueryRequest( providerId=adapter.config.id, action="select", table="conversations", columns=["session_summary"], filters=[DbFilter(op="eq", column="id", value=request.conversation_id)], maybeSingle=True, ) result = await execute_db_async(adapter, req) if result.data and isinstance(result.data, dict): row = result.data raw_summary = row.get("session_summary") if raw_summary: # Parsing handled by adapter often, but double check if isinstance(raw_summary, str): try: old_summary_json = json.loads(raw_summary) except (ValueError, json.JSONDecodeError): pass elif isinstance(raw_summary, dict): old_summary_json = raw_summary if old_summary_json: session_summary_text = old_summary_json.get("summary") except Exception as e: logger.warning(f"Failed to fetch session summary: {e}") logger.error(f"Failed to fetch session summary: {e}") # 2. Slice History (Turn-Based Window) # Strategy: Keep all System messages + Last N User turns (User + AI + Tools) # N comes from frontend context setting: contextTurns. raw_turn_limit = request.context_turn_limit turn_limit = ( max(1, min(50, int(raw_turn_limit))) if isinstance(raw_turn_limit, int) and raw_turn_limit > 0 else 2 ) # Separate System and Non-System system_messages = [m for m in messages if m.get("role") == "system"] chat_messages = [m for m in messages if m.get("role") != "system"] # Find the indices of User messages to determine run boundaries user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"] user_turn_count = len(user_indices) if user_turn_count > turn_limit: cutoff_index = user_indices[-turn_limit] recent_history = chat_messages[cutoff_index:] else: recent_history = chat_messages # For single-turn requests (common during first-turn regenerate), # using persisted summary can re-introduce stale assistant text. # In this case, use fresh request messages only and rebuild summary from this turn. is_single_user_turn = user_turn_count <= 1 # Force rebuild if it's the first turn OR if the user is editing/regenerating should_rebuild_summary = bool(is_single_user_turn or request.is_editing) # Inject summary only when history exceeds turn window and request is not rebuild flow. should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit) and (not should_rebuild_summary) if not should_inject_summary and session_summary_text: _log_verbose_info( "Skipping session summary injection (within turn window or single-turn rebuild context)." ) session_summary_text = None old_summary_json = None # 3. Inject Summary into System Prompt if session_summary_text: summary_prompt = ( "\n\nSession memory summary:\n" "Here is a summary of the conversation so far. Use this to understand long-term context, " "but prioritize the details in the recent messages below.\n" f"{session_summary_text}\n" ) # Inject into the LAST system message, or create a new one if none exist if system_messages: last_sys = system_messages[-1] # Avoid appending if already present (defensive) if "Session memory summary:" not in str(last_sys.get("content", "")): new_content = str(last_sys.get("content", "")) + summary_prompt # Update the dict (need to be careful not to mutate original request list in place if reused, but here it's fine) last_sys["content"] = new_content else: system_messages.append({"role": "system", "content": summary_prompt}) # Final Agent Input agent_input = system_messages + recent_history stream = agent.arun( input=agent_input, stream=True, stream_events=True, user_id=request.user_id, session_id=request.conversation_id, # Only pass explicit structured-output schema. # Do not fallback to response_format, otherwise {"type":"json_object"} # may be treated as grammar and trigger provider-side grammar cache errors. output_schema=request.output_schema, ) # ================================================================ # Stream processing with HITL support # ================================================================ async for run_event in stream: _append_raw_event_log( phase="main", request=request, run_id=getattr(run_event, "run_id", None), run_event=run_event, ) # ============================================================ # HITL: Check if agent paused for user input # ============================================================ if hasattr(run_event, 'is_paused') and run_event.is_paused: logger.info(f"Agent paused for HITL (run_id: {run_event.run_id})") # Extract requirements requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None) if requirements: form_requirements = [req for req in requirements if _is_interactive_form_requirement(req)] if not form_requirements: logger.info("Agent paused without interactive_form; skipping HITL form handling") yield DoneEvent( content=full_content or "", thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() return # Save to Supabase try: paused_tools = getattr(run_event, "tools", None) or [] serialized_tools = [ tool.to_dict() if hasattr(tool, "to_dict") else tool for tool in paused_tools if tool is not None ] serialized_requirements = [ req.to_dict() if hasattr(req, "to_dict") else req for req in form_requirements ] paused_run_output = { "run_id": getattr(run_event, "run_id", None), "session_id": getattr(run_event, "session_id", None) or request.conversation_id, "user_id": request.user_id, "messages": messages or [], "tools": serialized_tools, "requirements": serialized_requirements, "status": "PAUSED", } logger.info( f"[HITL] Saving pending run_id={run_event.run_id} " f"with database_provider={request.database_provider}" ) hitl_storage = get_hitl_storage(request.database_provider) saved = await hitl_storage.save_pending_run( run_id=run_event.run_id, requirements=form_requirements, conversation_id=request.conversation_id, user_id=request.user_id, agent_model=request.model, messages=messages, run_output=paused_run_output, ) if not saved: raise RuntimeError("Failed to persist HITL pending run") # Extract form fields for frontend for req in form_requirements: # Handle external execution (e.g., interactive_form with external_execution=True) if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \ (req.tool_execution and req.tool_execution.tool_name == "interactive_form"): form_id, title, fields = _extract_interactive_form_payload( req, default_title="Please provide the following information", ) # Send form_request event to frontend yield FormRequestEvent( run_id=run_event.run_id, form_id=form_id, title=title, fields=fields ).model_dump() # Fallback handle traditional user input (e.g., get_user_input) elif req.needs_user_input and req.user_input_schema: # Convert from user_input_schema form_id = None title = "Please provide the following information" fields = [ { "name": field.name, "type": self._map_field_type_to_frontend(field.field_type), "label": field.description or field.name, "required": True, "value": field.value } for field in req.user_input_schema ] # Send form_request event to frontend yield FormRequestEvent( run_id=run_event.run_id, form_id=form_id, title=title, fields=fields ).model_dump() # Send done event to indicate pause yield DoneEvent( content=full_content or "", thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() _log_verbose_info(f"HITL pause successful, waiting for user submission (run_id: {run_event.run_id})") return # Exit stream, wait for user to submit form except Exception as e: logger.error(f"Failed to save HITL state: {e}") yield ErrorEvent(error=f"Failed to pause for form: {str(e)}").model_dump() return else: logger.warning("Agent paused but no requirements found") yield DoneEvent( content=full_content or "", thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() return # ============================================================ # Normal streaming events (use stream_events for details) # ============================================================ # Check if this is a detailed event (from stream_events=True) if hasattr(run_event, 'event'): # Extract agent info for Team mode (member vs leader identification) current_agent_info = _extract_agent_info_from_event( run_event, leader_id=request.agent_id, leader_name=request.agent_name, leader_emoji=request.agent_emoji, agent_metadata=agent_metadata, ) # Log active agent switch in Team mode if is_team_mode: current_id = current_agent_info.get("agent_id") if current_id != last_active_agent_id: last_active_agent_id = current_id active_name = current_agent_info.get("agent_name") active_role = current_agent_info.get("agent_role") active_model = current_agent_info.get("model") active_provider = current_agent_info.get("provider") logger.info( f"[TEAM] >>> Active Agent Switch: {active_name} ({active_role}) " f"| Model: {active_model} | Provider: {active_provider}" ) # Apply current tracked status to info for text/thought events current_agent_info["status"] = agent_statuses.get(current_id, "active") if current_agent_info.get("agent_role") == "member": trace_stream( "member_event", agent_id=current_agent_info.get("agent_id"), agent_name=current_agent_info.get("agent_name"), ) match run_event.event: case RunEvent.run_started.value | TeamRunEvent.run_started: if is_team_mode: active_id = current_agent_info.get("agent_id") active_name = current_agent_info.get("agent_name") active_role = current_agent_info.get("agent_role") active_model = current_agent_info.get("model") active_provider = current_agent_info.get("provider") logger.info( f"[TEAM] >>> run_started: {active_name} ({active_role}) " f"| Model: {active_model} | Provider: {active_provider}" ) # Member starts -> Leader waits, Member active if active_role == "member": # Ensure leader is set to waiting when member starts async for e in update_status_and_yield(request.agent_id, "waiting"): yield e async for e in update_status_and_yield(active_id, "active"): yield e else: # Leader starts -> Leader active async for e in update_status_and_yield(active_id, "active"): yield e continue case TeamRunEvent.run_completed: if is_team_mode: active_id = current_agent_info.get("agent_id") active_role = current_agent_info.get("agent_role") if active_role == "member": # Member finished -> Leader still waiting (until it resumes), Member ready async for e in update_status_and_yield(active_id, "ready"): yield e continue # Handle both Agent RunEvent and Team TeamRunEvent for content streaming case RunEvent.run_content.value | TeamRunEvent.run_content: raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) if had_protocol: trace_stream( "strip_tool_protocol", depth=inline_tool_trace_depth, tail_len=len(inline_protocol_tail), cleaned_preview=_preview(raw_content_chunk), ) raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) reasoning = raw_reasoning if reasoning and content_chunk and _is_reasoning_duplicate_of_content( str(reasoning), str(content_chunk), ): trace_stream( "suppress_reasoning_overlap", event="run_content", reasoning_preview=_preview(reasoning), content_preview=_preview(content_chunk), ) reasoning = "" has_content_chunk = bool(content_chunk) has_inline_thought = bool(inline_thought_chunk) trace_stream( "run_content", has_content=has_content_chunk, has_reasoning=bool(reasoning) or has_inline_thought, reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), ) has_any_thought = bool(reasoning) or has_inline_thought if has_any_thought and reasoning_closed_for_current_cycle: # Re-open reasoning phase (e.g. after tool call or interleaved model output). reasoning_closed_for_current_cycle = False in_reasoning_phase = False should_break_next_thought = True if reasoning: for event in emit_thought_part(str(reasoning)): yield event if has_inline_thought: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e case RunEvent.reasoning_content_delta.value | TeamRunEvent.reasoning_content_delta: raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) if had_protocol: trace_stream( "strip_tool_protocol", depth=inline_tool_trace_depth, tail_len=len(inline_protocol_tail), cleaned_preview=_preview(raw_content_chunk), ) raw_reasoning = _extract_reasoning_chunk(run_event) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) reasoning = raw_reasoning if reasoning and content_chunk and _is_reasoning_duplicate_of_content( str(reasoning), str(content_chunk), ): trace_stream( "suppress_reasoning_overlap", event="reasoning_content_delta", reasoning_preview=_preview(reasoning), content_preview=_preview(content_chunk), ) reasoning = "" has_content_chunk = bool(content_chunk) has_inline_thought = bool(inline_thought_chunk) trace_stream( "reasoning_delta", has_content=has_content_chunk, has_reasoning=bool(reasoning) or has_inline_thought, reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), ) has_any_thought = bool(reasoning) or has_inline_thought if has_any_thought and reasoning_closed_for_current_cycle: reasoning_closed_for_current_cycle = False in_reasoning_phase = False should_break_next_thought = True if reasoning: for event in emit_thought_part(str(reasoning)): yield event if has_inline_thought: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e case RunEvent.tool_call_started.value | TeamRunEvent.tool_call_started: tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment] tool = tool_event.tool if tool: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" if getattr(tool, "tool_call_id", None): tool_start_times[tool.tool_call_id] = time.time() current_id = current_agent_info.get("agent_id") # If leader calls a tool, it's either an internal tool (code, etc) or delegation. # During the tool call itself, the agent is "active". async for e in update_status_and_yield(current_id, "active"): yield e trace_stream( "tool_call_started", tool_name=getattr(tool, "tool_name", ""), tool_call_id=getattr(tool, "tool_call_id", None), ) current_text_index = len(full_content) yield _build_tool_call_event( tool, current_text_index, agent_info=current_agent_info ) case RunEvent.tool_call_completed.value | TeamRunEvent.tool_call_completed: tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment] tool = tool_event.tool if tool: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" duration_ms = None if tool.tool_call_id and tool.tool_call_id in tool_start_times: duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000) trace_stream( "tool_call_completed", tool_name=tool.tool_name or "", tool_call_id=tool.tool_call_id, is_error=bool(tool.tool_call_error), ) tool_result_event, output = _build_tool_result_event( tool, duration_ms, self._normalize_tool_output, agent_info=current_agent_info, ) yield tool_result_event self._collect_search_sources(output, sources_map) case RunEvent.run_completed.value | TeamRunEvent.run_completed: # Extract agent info to check if this is a member or leader event_agent_info = _extract_agent_info_from_event( run_event, leader_id=request.agent_id, leader_name=request.agent_name, leader_emoji=request.agent_emoji, agent_metadata=agent_metadata, ) is_member_completion = event_agent_info.get("agent_role") == "member" # In Team Mode, only terminate when the LEADER (no agent_id on event) completes. # Member completions should just let the main loop continue. if is_team_mode and is_member_completion: active_id = event_agent_info.get("agent_id") active_name = event_agent_info.get("agent_name") active_model = event_agent_info.get("model") active_provider = event_agent_info.get("provider") logger.info( f"[TEAM] Member {active_name} completed " f"(Model: {active_model} | Provider: {active_provider}). " "Continuing stream..." ) # Ensure member is marked as ready if not already handled by TeamRunEvent.run_completed async for e in update_status_and_yield(active_id, "ready"): yield e continue # Leader completed if is_team_mode: async for e in update_status_and_yield(request.agent_id, "idle"): yield e final_content, output = _extract_completed_content_and_output( run_event, full_content, ) yield DoneEvent( content=final_content or "", output=output, thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() if request: asyncio.create_task(self._maybe_optimize_memories(agent, request)) # 4. Trigger Async Session Summary Update # Only if conversation_id exists (Main Chat Flow) AND summary is enabled if request.conversation_id and request.enable_session_summary: # Prepare summary lines: # - Normal flow: incremental update with last user + new assistant # - Regenerate/Edit (or single-turn rebuild): rebuild from current request context + new assistant if should_rebuild_summary: new_lines = [ m for m in messages if m.get("role") in ("user", "assistant") ] new_lines.append({"role": "assistant", "content": final_content}) else: new_lines = [] last_user = next((m for m in reversed(messages) if m.get("role") == "user"), None) if last_user: new_lines.append(last_user) new_lines.append({"role": "assistant", "content": final_content}) _log_verbose_info(f"Triggering async summary update for {request.conversation_id} with {len(new_lines)} messages (rebuild: {should_rebuild_summary}, is_editing: {request.is_editing})") asyncio.create_task(update_session_summary( conversation_id=request.conversation_id, old_summary=old_summary_json, new_messages=new_lines, database_provider=request.database_provider, memory_provider=request.memory_provider, memory_model=request.memory_model, memory_api_key=request.memory_api_key, memory_base_url=request.memory_base_url, summary_provider=request.summary_provider, summary_model=request.summary_model, summary_api_key=request.summary_api_key, summary_base_url=request.summary_base_url, rebuild_from_scratch=should_rebuild_summary, )) return case RunEvent.run_error.value | TeamRunEvent.run_error: error_msg = _extract_best_error_message(run_event) active_id = current_agent_info.get("agent_id") if active_id: async for e in update_status_and_yield(active_id, "error"): yield e yield ErrorEvent(error=error_msg).model_dump() return else: # Simple event Fallback (no detailed event type), just check for content raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) raw_reasoning = _extract_reasoning_chunk(run_event) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) if raw_reasoning: for event in emit_thought_part(str(raw_reasoning)): yield event if inline_thought_chunk: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e except Exception as exc: logger.error(f"Stream chat error: {exc}") yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump() async def _continue_hitl_run( self, request: StreamChatRequest, ) -> AsyncGenerator[dict[str, Any], None]: """ Continue a paused HITL run after user submits form. This method: 1. Retrieves requirements from storage 2. Rebuilds continuation messages with submitted form values 3. Runs agent.arun() and streams completion 4. Cleans up storage record """ try: run_id = request.run_id field_values = request.field_values or {} _log_verbose_info( f"[HITL] Continuing run_id={run_id!r} " f"with database_provider={request.database_provider} " f"and field_values={list(field_values.keys())}" ) # 1. Fetch Session Summary from DB session_summary_text = None old_summary_json = None if request.conversation_id: try: from ..models.db import DbFilter, DbQueryRequest from .db_service import execute_db_async, get_db_adapter adapter = get_db_adapter(request.database_provider) if adapter: req = DbQueryRequest( providerId=adapter.config.id, action="select", table="conversations", columns=["session_summary"], filters=[DbFilter(op="eq", column="id", value=request.conversation_id)], maybeSingle=True, ) result = await execute_db_async(adapter, req) if result.data and isinstance(result.data, dict): row = result.data raw_summary = row.get("session_summary") if raw_summary: if isinstance(raw_summary, str): try: old_summary_json = json.loads(raw_summary) except (ValueError, json.JSONDecodeError): pass elif isinstance(raw_summary, dict): old_summary_json = raw_summary if old_summary_json: session_summary_text = old_summary_json.get("summary") except Exception as e: logger.warning(f"Failed to fetch session summary in HITL flow: {e}") # Retrieve requirements from Supabase hitl_storage = get_hitl_storage(request.database_provider) pending = await hitl_storage.get_pending_run(run_id) requirements = None saved_messages = None saved_run_output = None if isinstance(pending, dict): requirements = pending.get("requirements") saved_messages = pending.get("messages") saved_run_output = pending.get("run_output") else: requirements = pending _log_verbose_info( "[HITL] loaded pending payload: " f"has_requirements={bool(requirements)}, " f"messages_type={type(saved_messages).__name__ if saved_messages is not None else 'None'}, " f"has_run_output={saved_run_output is not None}, " f"run_output_type={type(saved_run_output).__name__ if saved_run_output is not None else 'None'}" ) if not requirements: logger.error( f"[HITL] No pending run found for run_id={run_id!r} " f"(database_provider={request.database_provider})" ) yield ErrorEvent(error="Form session expired or not found").model_dump() return # Enable skills for the definitive user-facing chat agent request.enable_skills = True # Get agent (same provider as original request) agent = get_agent_for_provider(request) _log_verbose_info(f"[HITL Continue] Agent instructions: {getattr(agent, 'instructions', None)}") full_content = "" full_thought = "" sources_map: dict[str, Any] = {} tool_start_times: dict[str, float] = {} should_break_next_thought = False in_reasoning_phase = False reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" stream_trace = _is_stream_trace_enabled() paused_again = False # Flag to prevent cleanup when multi-form chaining occurs stream_had_error = False completed_content_fallback = "" saw_terminal_completion = False continuation_event_count = 0 last_event_name: str | None = None last_event_type: str | None = None last_event_run_id: str | None = None # Current agent info for Team mode (updated per event) current_agent_info: dict[str, Any] = {"agent_id": request.agent_id, "agent_name": request.agent_name} def trace_stream(stage: str, **kwargs: Any) -> None: if not stream_trace: return payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()]) logger.info(f"[STREAM_TRACE][hitl] {stage} | {payload}") def emit_thought_part(part: str): nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle text = _strip_internal_tool_trace(str(part or "")) if not text or not text.strip(): return should_break_next_thought = False in_reasoning_phase = True full_thought += text trace_stream("emit_reasoning", reasoning_preview=_preview(text)) current_text_index = len(full_content) yield ThoughtEvent( content=text, text_index=current_text_index, agent_id=current_agent_info.get("agent_id"), agent_name=current_agent_info.get("agent_name"), ).model_dump(by_alias=True, exclude_none=True) def process_text(text: str): nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle clean_text = _strip_internal_tool_trace(text) if clean_text: has_visible_text = bool(clean_text.strip()) if has_visible_text: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = True full_content += clean_text yield TextEvent( content=clean_text, agent_id=current_agent_info.get("agent_id"), agent_name=current_agent_info.get("agent_name"), ).model_dump(by_alias=True, exclude_none=True) async def _iterate_run_stream(stream: Any): """ Normalize both async and sync Agno run streams into an async iterator. """ if hasattr(stream, "__aiter__"): async for item in stream: yield item return iterator = iter(stream) sentinel = object() while True: item = await asyncio.to_thread(lambda: next(iterator, sentinel)) if item is sentinel: break yield item async def _stream_events(stream): nonlocal full_content, full_thought, sources_map, tool_start_times, paused_again, stream_had_error nonlocal in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle nonlocal in_content_think_block, inline_tool_trace_depth, inline_protocol_tail nonlocal completed_content_fallback, saw_terminal_completion nonlocal continuation_event_count, last_event_name, last_event_type, last_event_run_id async for run_event in _iterate_run_stream(stream): _append_raw_event_log( phase="hitl_continuation", request=request, run_id=run_id, run_event=run_event, ) continuation_event_count += 1 last_event_type = type(run_event).__name__ last_event_name = str(getattr(run_event, "event", None) or last_event_type) last_event_run_id = str(raw_event_run_id) if raw_event_run_id else None # Extract agent info for Team mode (though Team HITL is currently disabled) current_agent_info = _extract_agent_info_from_event( run_event, leader_id=request.agent_id, leader_name=request.agent_name, leader_emoji=request.agent_emoji, ) # When yield_run_output=True, acontinue_run may yield the final RunOutput object. # Capture its canonical content as a robust fallback for providers that emit sparse events. # IMPORTANT: Do NOT re-emit this content via process_text() if we already received # streaming chunks (run_content events). Doing so would cause the full answer to be # appended a second time, resulting in visible duplication in the UI. # Only use RunOutput.content as a fallback when the stream produced nothing. if isinstance(run_event, RunOutput): saw_terminal_completion = True completed_content_fallback, _ = _extract_completed_content_and_output( run_event, completed_content_fallback or full_content, ) # Only emit if no content was streamed yet (sparse-event provider fallback). if not full_content: text_from_output = _extract_text_from_message_content( getattr(run_event, "content", None) ).strip() if text_from_output: for e in process_text(text_from_output): yield e continue # HITL Pause Check if hasattr(run_event, 'is_paused') and run_event.is_paused: logger.info(f"Agent paused again during continuation (multi-form chain, run_id: {run_id})") # Extract new requirements new_requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None) if new_requirements: form_requirements = [req for req in new_requirements if _is_interactive_form_requirement(req)] if form_requirements: # Save new form requirements (overwrites previous in memory) hitl_storage_multi = get_hitl_storage(request.database_provider) saved = await hitl_storage_multi.save_pending_run( run_id=run_id, requirements=form_requirements, conversation_id=request.conversation_id, user_id=request.user_id, agent_model=request.model, messages=saved_messages, # Reuse saved messages run_output=( { "run_id": getattr(run_event, "run_id", None) or run_id, "session_id": getattr(run_event, "session_id", None) or request.conversation_id, "user_id": request.user_id, "messages": saved_messages or [], "tools": [ tool.to_dict() if hasattr(tool, "to_dict") else tool for tool in (getattr(run_event, "tools", None) or []) if tool is not None ], "requirements": [ req.to_dict() if hasattr(req, "to_dict") else req for req in form_requirements ], "status": "PAUSED", } ), ) if not saved: raise RuntimeError("Failed to persist chained HITL pending run") # Extract form fields and notify frontend for req in form_requirements: if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \ (req.tool_execution and req.tool_execution.tool_name == "interactive_form"): form_id, title, fields = _extract_interactive_form_payload( req, default_title="Please provide additional information", ) yield FormRequestEvent( run_id=run_id, form_id=form_id, title=title, fields=fields ).model_dump() # Send partial done event yield DoneEvent( content=full_content or "", thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() logger.info(f"Multi-form: saved second form, waiting for user (run_id: {run_id})") paused_again = True # Mark as paused again to skip cleanup return # If no form requirements, just continue logger.warning(f"Agent paused again but no interactive_form found (run_id: {run_id})") yield ErrorEvent(error="Agent paused unexpectedly").model_dump() return # Check if this is a detailed event (from stream_events=True or implicit) if hasattr(run_event, 'event'): match run_event.event: case RunEvent.run_content.value: raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) if had_protocol: trace_stream( "strip_tool_protocol", depth=inline_tool_trace_depth, tail_len=len(inline_protocol_tail), cleaned_preview=_preview(raw_content_chunk), ) raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) reasoning = raw_reasoning if reasoning and content_chunk and _is_reasoning_duplicate_of_content( str(reasoning), str(content_chunk), ): trace_stream( "suppress_reasoning_overlap", event="run_content", reasoning_preview=_preview(reasoning), content_preview=_preview(content_chunk), ) reasoning = "" has_content_chunk = bool(content_chunk) has_inline_thought = bool(inline_thought_chunk) trace_stream( "run_content", has_content=has_content_chunk, has_reasoning=bool(reasoning) or has_inline_thought, reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), ) has_any_thought = bool(reasoning) or has_inline_thought if has_any_thought and reasoning_closed_for_current_cycle: reasoning_closed_for_current_cycle = False in_reasoning_phase = False should_break_next_thought = True if reasoning: for event in emit_thought_part(str(reasoning)): yield event if has_inline_thought: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e trace_stream( "emit_content", reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), ) case RunEvent.reasoning_content_delta.value: raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) if had_protocol: trace_stream( "strip_tool_protocol", depth=inline_tool_trace_depth, tail_len=len(inline_protocol_tail), cleaned_preview=_preview(raw_content_chunk), ) raw_reasoning = _extract_reasoning_chunk(run_event) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) reasoning = raw_reasoning if reasoning and content_chunk and _is_reasoning_duplicate_of_content( str(reasoning), str(content_chunk), ): trace_stream( "suppress_reasoning_overlap", event="reasoning_content_delta", reasoning_preview=_preview(reasoning), content_preview=_preview(content_chunk), ) reasoning = "" has_content_chunk = bool(content_chunk) has_inline_thought = bool(inline_thought_chunk) trace_stream( "reasoning_delta", has_content=has_content_chunk, has_reasoning=bool(reasoning) or has_inline_thought, reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), ) has_any_thought = bool(reasoning) or has_inline_thought if has_any_thought and reasoning_closed_for_current_cycle: reasoning_closed_for_current_cycle = False in_reasoning_phase = False should_break_next_thought = True if reasoning: for event in emit_thought_part(str(reasoning)): yield event if has_inline_thought: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e trace_stream( "emit_content", reasoning_closed=reasoning_closed_for_current_cycle, content_preview=_preview(content_chunk), ) case RunEvent.tool_call_started.value: tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment] tool = tool_event.tool if tool: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" if tool.tool_call_id: tool_start_times[tool.tool_call_id] = time.time() trace_stream( "tool_call_started", tool_name=tool.tool_name or "", tool_call_id=tool.tool_call_id, ) current_text_index = len(full_content) yield _build_tool_call_event(tool, current_text_index) case RunEvent.tool_call_completed.value: tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment] tool = tool_event.tool if tool: in_reasoning_phase = False should_break_next_thought = True reasoning_closed_for_current_cycle = False in_content_think_block = False inline_tool_trace_depth = 0 inline_protocol_tail = "" duration_ms = None if tool.tool_call_id and tool.tool_call_id in tool_start_times: duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000) trace_stream( "tool_call_completed", tool_name=tool.tool_name or "", tool_call_id=tool.tool_call_id, is_error=bool(tool.tool_call_error), ) tool_result_event, output = _build_tool_result_event( tool, duration_ms, self._normalize_tool_output, ) yield tool_result_event self._collect_search_sources(output, sources_map) case RunEvent.run_completed.value: saw_terminal_completion = True completed_content_fallback, _ = _extract_completed_content_and_output( run_event, completed_content_fallback or full_content, ) case RunEvent.run_error.value: error_msg = _extract_best_error_message(run_event) stream_had_error = True yield ErrorEvent(error=error_msg).model_dump() return else: # Simple event Fallback raw_content_chunk = _extract_text_chunk(run_event) raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol( raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, ) raw_reasoning = _extract_reasoning_chunk(run_event) content_segments, in_content_think_block = _split_content_by_think_tags( raw_content_chunk, in_content_think_block, ) content_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "text" ) inline_thought_chunk = "".join( seg_text for seg_type, seg_text in content_segments if seg_type == "thought" ) if raw_reasoning: for event in emit_thought_part(str(raw_reasoning)): yield event if inline_thought_chunk: for event in emit_thought_part(str(inline_thought_chunk)): yield event if content_chunk: for e in process_text(content_chunk): yield e def _build_fallback_messages(): if not saved_messages: return None updated_messages = list(saved_messages) for req in requirements: tool_exec = getattr(req, 'tool_execution', None) tool_name = getattr(tool_exec, 'tool_name', None) if tool_exec else None if tool_name != "interactive_form": continue tool_args = getattr(tool_exec, 'tool_args', {}) if tool_exec else {} tool_call_id = getattr(req, "id", None) or tool_args.get("id") or f"form-{int(time.time() * 1000)}" updated_messages.append({ "role": "assistant", "content": None, "tool_calls": [{ "id": tool_call_id, "type": "function", "function": { "name": "interactive_form", "arguments": json.dumps(tool_args or {}), } }], }) updated_messages.append({ "role": "tool", "content": json.dumps(field_values), "tool_call_id": tool_call_id, }) return updated_messages def _apply_field_values_to_requirements() -> list[Any]: """ Resolve pending HITL requirements with the submitted form payload. Prefer Agno-native requirement resolution so we can use acontinue_run(). """ resolved_requirements: list[Any] = [] serialized_values = json.dumps(field_values, ensure_ascii=False) for req in requirements or []: try: if hasattr(req, "needs_external_execution") and req.needs_external_execution: req.set_external_execution_result(serialized_values) tool_exec = getattr(req, "tool_execution", None) if tool_exec is not None and getattr(tool_exec, "result", None) is None: tool_exec.result = serialized_values elif hasattr(req, "needs_user_input") and req.needs_user_input: req.provide_user_input(field_values) elif hasattr(req, "needs_confirmation") and req.needs_confirmation: req.confirm() except Exception as req_err: logger.warning( f"[HITL] Failed to resolve requirement {getattr(req, 'id', None)} for run_id={run_id}: {req_err}" ) resolved_requirements.append(req) return resolved_requirements def _build_continuation_agent_input(base_messages: list[dict[str, Any]]) -> list[dict[str, Any]]: messages = self._inject_local_time_context(list(base_messages), request, []) system_messages = [m for m in messages if m.get("role") == "system"] chat_messages = [m for m in messages if m.get("role") != "system"] raw_turn_limit = request.context_turn_limit turn_limit = ( max(1, min(50, int(raw_turn_limit))) if isinstance(raw_turn_limit, int) and raw_turn_limit > 0 else 2 ) user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"] user_turn_count = len(user_indices) if user_turn_count > turn_limit: cutoff_index = user_indices[-turn_limit] recent_history = chat_messages[cutoff_index:] else: recent_history = chat_messages should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit) if should_inject_summary: summary_prompt = ( "\n\nSession memory summary:\n" "Here is a summary of the conversation so far. Use this to understand long-term context, " "but prioritize the details in the recent messages below.\n" f"{session_summary_text}\n" ) if system_messages: last_sys = system_messages[-1] if "Session memory summary:" not in str(last_sys.get("content", "")): last_sys["content"] = str(last_sys.get("content", "")) + summary_prompt else: system_messages.append({"role": "system", "content": summary_prompt}) return system_messages + recent_history resolved_requirements = _apply_field_values_to_requirements() stream = None try: restored_run_output = None if isinstance(saved_run_output, dict): try: restored_run_output = RunOutput.from_dict(dict(saved_run_output)) # Ensure external-execution tool results are concretely attached to # run_response.tools before acontinue_run() processes updates. if restored_run_output and isinstance(restored_run_output.tools, list): serialized_values = json.dumps(field_values, ensure_ascii=False) tool_result_by_id: dict[str, Any] = {} for req in resolved_requirements or []: tool_exec = getattr(req, "tool_execution", None) if not tool_exec: continue tcid = getattr(tool_exec, "tool_call_id", None) if tcid: tool_result_by_id[str(tcid)] = getattr(tool_exec, "result", None) for tool in restored_run_output.tools: if getattr(tool, "result", None) is not None: continue tcid = getattr(tool, "tool_call_id", None) if tcid and str(tcid) in tool_result_by_id: tool.result = tool_result_by_id[str(tcid)] continue if getattr(tool, "tool_name", None) == "interactive_form": tool.result = serialized_values # OpenAI-compatible tool flow requires an assistant message # with matching tool_calls before any tool message can appear. if restored_run_output and isinstance(restored_run_output.tools, list): existing_messages = ( list(restored_run_output.messages) if isinstance(restored_run_output.messages, list) else [] ) existing_tool_call_ids: set[str] = set() for msg in existing_messages: msg_tool_calls = getattr(msg, "tool_calls", None) or [] for tc in msg_tool_calls: if isinstance(tc, dict) and tc.get("id"): existing_tool_call_ids.add(str(tc.get("id"))) for tool in restored_run_output.tools: tool_call_id = getattr(tool, "tool_call_id", None) tool_name = getattr(tool, "tool_name", None) or "interactive_form" if not tool_call_id or str(tool_call_id) in existing_tool_call_ids: continue tool_args = getattr(tool, "tool_args", None) or {} existing_messages.append( Message( role="assistant", content="", tool_calls=[ { "id": str(tool_call_id), "type": "function", "function": { "name": str(tool_name), "arguments": json.dumps(tool_args, ensure_ascii=False), }, } ], ) ) existing_tool_call_ids.add(str(tool_call_id)) restored_run_output.messages = existing_messages if resolved_requirements: restored_run_output.requirements = resolved_requirements except Exception as restore_err: logger.warning( f"[HITL] Failed to restore RunOutput for run_id={run_id}, fallback to run_id path: {restore_err}" ) _log_verbose_info( f"[HITL] continuation restore mode: {'run_response' if restored_run_output is not None else 'run_id'} " f"(has_saved_run_output={isinstance(saved_run_output, dict)})" ) _log_verbose_info( f"Running HITL continuation via continue_run (run_id: {run_id}, session_id: {request.conversation_id})" ) if restored_run_output is not None: stream = agent.continue_run( run_response=restored_run_output, stream=True, stream_events=True, yield_run_output=True, user_id=request.user_id, session_id=request.conversation_id, output_schema=request.output_schema, ) else: stream = agent.continue_run( run_id=run_id, requirements=resolved_requirements, stream=True, stream_events=True, yield_run_output=True, user_id=request.user_id, session_id=request.conversation_id, output_schema=request.output_schema, ) except Exception as continue_err: logger.warning( f"[HITL] continue_run failed for run_id={run_id}, fallback to rebuilt arun: {continue_err}" ) fallback_messages = _build_fallback_messages() if not fallback_messages: yield ErrorEvent(error="Form session cannot be resumed (missing state)").model_dump() return agent_input = _build_continuation_agent_input(fallback_messages) stream = agent.arun( input=agent_input, stream=True, stream_events=True, user_id=request.user_id, session_id=request.conversation_id, output_schema=request.output_schema, ) async for event in _stream_events(stream): yield event _log_verbose_info( "[HITL] continuation stream summary: " f"run_id={run_id}, " f"events={continuation_event_count}, " f"last_event={last_event_name}, " f"last_event_type={last_event_type}, " f"last_event_run_id={last_event_run_id}, " f"saw_terminal_completion={saw_terminal_completion}, " f"stream_had_error={stream_had_error}, " f"paused_again={paused_again}" ) if stream_had_error: logger.warning(f"HITL run {run_id} ended with stream error; skipping done/cleanup") return if not saw_terminal_completion: logger.warning( f"HITL run {run_id} stream ended without terminal completion event; skipping done/cleanup" ) yield ErrorEvent(error="HITL continuation ended before completion").model_dump() return # Stream completed, send done event yield DoneEvent( content=full_content or completed_content_fallback, thought=full_thought.strip() or None, sources=list(sources_map.values()) or None, ).model_dump() # Clean up Supabase (skip if paused again for multi-form) if not paused_again: await hitl_storage.delete_pending_run(run_id) logger.info(f"HITL run {run_id} completed and cleaned up") else: logger.info(f"HITL run {run_id} paused again (multi-form), skipping cleanup") # 6. Trigger Async Session Summary Update if request.conversation_id and not paused_again: # For HITL resumption, extract the last turn's context from saved_messages # (matching normal flow: only last user + assistant, not full history) summary_messages = [] # Extract only the last user-assistant turn from saved_messages if saved_messages: last_user_idx = -1 for i in range(len(saved_messages) - 1, -1, -1): if saved_messages[i].get("role") == "user": last_user_idx = i break if last_user_idx >= 0: # Include from last user message to end of saved_messages # This captures: user question -> assistant form(s) -> any intermediate interactions for msg in saved_messages[last_user_idx:]: role = msg.get("role") content = msg.get("content") # Only include user/assistant messages with content for summary if role in ("user", "assistant") and content: summary_messages.append({"role": role, "content": content}) # Add the form submission as user input (provides structured data context) form_submission_text = f"[Form Submitted] Values: {json.dumps(field_values)}" summary_messages.append({"role": "user", "content": form_submission_text}) # Add the new assistant response (based on form data) summary_messages.append({"role": "assistant", "content": full_content}) _log_verbose_info(f"Triggering async summary update for {request.conversation_id} (Resumed HITL flow, {len(summary_messages)} messages)") asyncio.create_task(update_session_summary( conversation_id=request.conversation_id, old_summary=old_summary_json, new_messages=summary_messages, database_provider=request.database_provider, memory_provider=request.memory_provider, memory_model=request.memory_model, memory_api_key=request.memory_api_key, memory_base_url=request.memory_base_url, summary_provider=request.summary_provider, summary_model=request.summary_model, summary_api_key=request.summary_api_key, summary_base_url=request.summary_base_url, rebuild_from_scratch=False, # HITL resumption is usually incremental )) except Exception as exc: import traceback error_details = traceback.format_exc() logger.error(f"HITL continuation error: {exc}\n{error_details}") yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump() def _collect_enabled_tool_names(self, request: StreamChatRequest) -> set[str]: names: list[str] = [] if request.provider != "gemini": for tool_id in request.tool_ids or []: names.append(resolve_tool_name(str(tool_id))) for tool_def in request.tools or []: if hasattr(tool_def, "model_dump"): tool_def = tool_def.model_dump() if not isinstance(tool_def, dict): continue name = tool_def.get("function", {}).get("name") or tool_def.get("name") if name: names.append(resolve_tool_name(str(name))) for user_tool in request.user_tools or []: if hasattr(user_tool, "name") and user_tool.name: names.append(str(user_tool.name)) return set(names) def _inject_local_time_context( self, messages: list[dict[str, Any]], request: StreamChatRequest, pre_events: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not messages: return messages timezone = request.user_timezone or "UTC" locale = request.user_locale or "en-US" time_result = self._compute_local_time(timezone, locale) try: local_date = datetime.fromisoformat(str(time_result.get("iso"))).strftime("%Y-%m-%d") except Exception: local_date = str(time_result.get("formatted", "")).split(" ")[0] or datetime.now().strftime( "%Y-%m-%d" ) tz_label = str(time_result.get("timezone") or timezone) note = ( f"\n\n[Time note for this query]\n" f"Note: local date is {local_date} ({tz_label}). " "Interpret relative time terms using this date." ) updated: list[dict[str, Any]] = [] for msg in messages: if not isinstance(msg, dict) or msg.get("role") != "user": updated.append(msg) continue content = msg.get("content") if isinstance(content, str): if "[Time note for this query]" in content: updated.append(msg) else: updated.append({**msg, "content": f"{content}{note}"}) continue if isinstance(content, list): has_note = False for part in content: if isinstance(part, dict): if "[Time note for this query]" in str(part.get("text", "")) or "[Time note for this query]" in str(part.get("content", "")): has_note = True break if has_note: updated.append(msg) else: updated.append({**msg, "content": [*content, {"type": "text", "text": note}]}) continue updated.append(msg) return updated def _compute_local_time(self, timezone: str, locale: str) -> dict[str, Any]: try: tzinfo = ZoneInfo(timezone) now = datetime.now(tzinfo) except Exception: now = datetime.now() return { "timezone": timezone, "locale": locale, "formatted": now.strftime("%Y-%m-%d %H:%M:%S"), "iso": now.isoformat(), "now": now, } def _inject_tool_guidance( self, messages: list[dict[str, Any]], enabled_tools: set[str], request: Any | None = None, ) -> list[dict[str, Any]]: if not enabled_tools: return messages updated = list(messages) system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) no_tool_narration_guidance = ( "\n\n[OUTPUT DIRECTIVES]\n" "1. The main text (Answer) must contain ONLY the final helpful content and necessary explanations for the user.\n" "2. In the main text, NEVER describe that you are going to, are currently, or have already called any tools, searched, browsed, retrieved memory, or queried databases. These are internal traces (Trace).\n" "3. In the main text, do NOT refer to yourself performing actions (e.g., \"Let me check\", \"I will search\", \"I have retrieved\").\n" " Instead, directly present results as established information.\n" "4. If citing sources, use neutral phrasing such as: \"According to available data\", \"Based on public information\", \"According to the returned data\".\n" " Never mention tool names or the calling process.\n" "5. If information is insufficient, directly state the missing gap and ask clarifying questions.\n" " Do NOT say \"Let me check again\" or similar transitional action phrases.\n" "6. Once you start presenting the final answer, do not switch back to planning, searching, or tool-calling language.\n" "7. The final answer should begin naturally with the content itself, without meta commentary or transitional phrases.\n" ) updated = self._append_system_message(updated, no_tool_narration_guidance, system_index) system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) if "interactive_form" in enabled_tools: form_guidance = ( "\n[TOOL USE GUIDANCE]\n" "When you need to collect structured information from the user (e.g. preferences, requirements, " "booking details), use the 'interactive_form' tool.\n" "CRITICAL: DO NOT list questions in text or markdown. YOU MUST USE the 'interactive_form' tool to " "display fields.\n" "CRITICAL: If the user explicitly asks you to confirm something via a form, interactive form, or form tool, " "you MUST call 'interactive_form' in this response instead of asking in plain text.\n" "CRITICAL: If you need approval before installing a dependency, you MUST use 'interactive_form'. " "Do NOT ask yes/no approval questions in normal prose when the tool is available.\n" "CRITICAL: For approval forms, ask only for information the model does NOT already know. " "If skill_id or package_name are already known from the current tool result or failure context, " "do NOT include text inputs for them in the form.\n" "CRITICAL: For dependency installation approval, prefer a single required field such as " "'approve_install', and place the package name in the form title or description.\n" "Keep forms concise (3-6 fields).\n\n" "[SIMPLIFIED PAYLOAD]\n" "You may use a minimal payload to reduce tool-call size.\n" "- 'id' and 'title' are optional.\n" "- Each field may be minimal (e.g., {'name':'budget'}) or even a short string label.\n" "- Backend will auto-fill missing label/type defaults.\n\n" "[MANDATORY TEXT-FIRST RULE]\n" "CRITICAL: You MUST output meaningful introductory text BEFORE calling 'interactive_form'.\n" "- NEVER call 'interactive_form' as the very first thing in your response\n" "- ALWAYS explain the context, acknowledge the user's request, or provide guidance BEFORE the form\n" "- Minimum: Output at least 1-2 sentences before the form call\n" '- Example: "I can help you with that. To provide the best recommendation, please share some ' 'details below:"\n\n' "[SINGLE FORM PER RESPONSE]\n" "CRITICAL: You may call 'interactive_form' ONLY ONCE per response. Do NOT call it multiple times in " "the same answer.\n" "If you need to collect information, design ONE comprehensive form that gathers all necessary " "details at once.\n\n" "[MULTI-TURN INTERACTIONS]\n" "1. If the information from a submitted form is insufficient, you MAY present another " "'interactive_form' in your NEXT response (after the user submits the first form).\n" "2. LIMIT: Use at most 2-3 forms total across the entire conversation. Excessive questioning " "frustrates users.\n" "3. INTERLEAVING: You can place the form anywhere in your response. Output introductory text FIRST " "(e.g., \"I can help with that. Please provide some details below:\"), then call 'interactive_form' " "once.\n" "4. If the user has provided enough context through previous forms, proceed directly to the final " "answer without requesting more information." ) updated = self._append_system_message(updated, form_guidance, system_index) system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) search_tools_requiring_citations = { "Tavily_web_search", "Tavily_academic_search", "web_search_using_tavily", "web_search", "search_news", "search_exa", "search_arxiv_and_return_articles", "search_wikipedia", } if enabled_tools.intersection(search_tools_requiring_citations): citation_prompt = ( "\n\n[IMPORTANT] You have access to search tools. When you use them to answer a question, " "you MUST cite the search results in your answer using the format [1], [2], etc., " "corresponding to the index of the search result provided in the tool output. Do not fabricate " "citations." ) updated = self._append_system_message(updated, citation_prompt, system_index) if "local_time" in enabled_tools: local_time_guidance = ( "\n\n[TIME CONTEXT GUIDANCE]\n" "A local-date note is already appended to each user query before model execution.\n" "Do not call local_time again unless the user explicitly asks to refresh/recheck time." ) updated = self._append_system_message(updated, local_time_guidance, system_index) # [REINFORCED] System-level long-term memory guidance for the internal agent-memory skill if getattr(request, "enable_long_term_memory", False): memory_guidance = ( "\n\n[SYSTEM-LEVEL META-RULE: LONG-TERM MEMORY]\n" "You are equipped with a persistent memory layer ('agent-memory'). This is a core infrastructure skill that overrides situational constraints.\n" "1) MANDATORY PREREQUISITE: If a query involves personal history, preferences, or prior facts, searching memory is a mandatory PREREQUISITE. " "This MUST be executed BEFORE any other skill logic (e.g., Roleplay, Image Search) to avoid factual inconsistency.\n" "2) PERSISTENCE AGAINST OVERRIDES: This meta-rule remains in effect even if other skills (like roleplay) demand immediate responses. " "Retrieving ground-truth user context is the first step of all processing.\n" "3) USAGE: Call 'execute_skill_script' with skill_id='agent-memory'. Factual alignment via memory retrieval is non-negotiable." ) updated = self._append_system_message(updated, memory_guidance, system_index) return updated def _append_system_message( self, messages: list[dict[str, Any]], addition: str, system_index: int, ) -> list[dict[str, Any]]: updated = list(messages) if system_index != -1: updated[system_index] = { **updated[system_index], "content": f"{updated[system_index].get('content', '')}{addition}", } else: updated.insert(0, {"role": "system", "content": addition}) return updated def _normalize_tool_output(self, output: Any) -> Any: if hasattr(output, "model_dump"): try: return output.model_dump() except Exception: return str(output) if isinstance(output, dict): return output if isinstance(output, list): return [self._normalize_tool_output(item) for item in output] return output def _collect_search_sources(self, result: Any, sources_map: dict[str, Any]) -> None: def _extract_results(payload: Any) -> list[dict[str, Any]]: if isinstance(payload, list): return [item for item in payload if isinstance(item, dict)] if isinstance(payload, dict): for key in ("results", "items", "data", "sources", "articles", "news", "papers"): value = payload.get(key) if isinstance(value, list): return [item for item in value if isinstance(item, dict)] return [] results = _extract_results(result) if not results: return for item in results: url = ( item.get("url") or item.get("link") or item.get("uri") or item.get("source") or item.get("href") ) if not url or url in sources_map: continue title = ( item.get("title") or item.get("name") or item.get("headline") or item.get("paper_title") or "Unknown Source" ) snippet = ( item.get("content") or item.get("snippet") or item.get("summary") or item.get("abstract") or "" ) sources_map[url] = SourceEvent( uri=url, title=title, snippet=str(snippet)[:200], ).model_dump() async def _maybe_optimize_memories(self, agent: Agent, request: StreamChatRequest) -> None: return def _map_field_type_to_frontend(self, field_type: Any) -> str: """ Map Python/Agno field types to frontend form types. Args: field_type: Python type (class or string) Returns: Frontend form field type (text, number, checkbox, etc.) """ # Handle cases where field_type is a class/type instead of a string field_type_str = "" if isinstance(field_type, type): field_type_str = field_type.__name__ elif not isinstance(field_type, str): field_type_str = str(field_type) else: field_type_str = field_type type_mapping = { "str": "text", "int": "number", "float": "number", "bool": "checkbox", "date": "date", "time": "time", "datetime": "datetime", "list": "text", "dict": "textarea", } return type_mapping.get(field_type_str.lower(), "text") _stream_chat_service: StreamChatService | None = None def get_stream_chat_service() -> StreamChatService: global _stream_chat_service if _stream_chat_service is None: _stream_chat_service = StreamChatService() return _stream_chat_service