Spaces:
Running
Running
| """ | |
| Stream chat service implemented with Agno SDK (Agent + tools + DB). | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import time | |
| from collections.abc import AsyncGenerator | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| from zoneinfo import ZoneInfo | |
| from agno.agent import Agent, RunEvent | |
| from agno.models.message import Message | |
| from agno.run.agent import RunOutput, ToolCallCompletedEvent, ToolCallStartedEvent | |
| from agno.run.team import TeamRunEvent | |
| from agno.utils.log import logger | |
| from ..models.stream_chat import ( | |
| AgentStatusEvent, | |
| DoneEvent, | |
| ErrorEvent, | |
| FormRequestEvent, # New: HITL form request event | |
| SourceEvent, | |
| StreamChatRequest, | |
| TextEvent, | |
| ThoughtEvent, | |
| ToolCallEvent, | |
| ToolResultEvent, | |
| ) | |
| from .agent_registry import get_agent_for_provider, build_team, resolve_agent_config | |
| from .hitl_storage import get_hitl_storage | |
| from .summary_service import update_session_summary | |
| from .tool_registry import resolve_tool_name | |
| MEMORY_OPTIMIZE_THRESHOLD = 50 | |
| MEMORY_OPTIMIZE_INTERVAL_SECONDS = 60 * 60 * 12 | |
| THINK_TAG_REGEX = re.compile(r"</?(?:think|thought)>", re.IGNORECASE) | |
| PROTOCOL_TAG_REGEX = re.compile( | |
| r"(?:<\s*[||]\s*(?P<tag>[a-zA-Z0-9_]+)\s*[||]\s*>)" | |
| r"|(?:<\s*(?P<dsml_close>/?)\s*[||]\s*DSML\s*[||]\s*(?P<dsml_body>[^>]+)>)", | |
| re.IGNORECASE, | |
| ) | |
| TOOL_TRACE_BEGIN_TAGS = { | |
| "tool_calls_begin", | |
| "tool_calls_section_begin", | |
| "tool_call_begin", | |
| "tool_argument_begin", | |
| "tool_call_argument_begin", | |
| } | |
| TOOL_TRACE_END_TAGS = { | |
| "tool_argument_end", | |
| "tool_call_argument_end", | |
| "tool_call_end", | |
| "tool_calls_end", | |
| "tool_calls_section_end", | |
| } | |
| def _strip_internal_tool_trace(text: str) -> str: | |
| """Remove explicit protocol marker tokens without truncating normal text.""" | |
| if not text: | |
| return "" | |
| cleaned = str(text) | |
| cleaned = re.sub(r"</?(?:think|thought)>", "", cleaned, flags=re.IGNORECASE) | |
| # Remove protocol markers like <|tool_calls_begin|> and spaced variants. | |
| cleaned = re.sub(r"<\s*[||]\s*[^|>||]*\s*[||]\s*>", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"</?\s*[||]\s*DSML\s*[||]\s*[^>]*>", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"</?(?:session_memory|today_local_time)>", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"\[SYSTEM INJECTED CONTEXT\]", "", cleaned, flags=re.IGNORECASE) | |
| # Remove inline tool-call-like snippets leaked by some models. | |
| cleaned = re.sub( | |
| r"([::]\s*)?[a-zA-Z_][a-zA-Z0-9_]{1,80}\s*\{[^{}\n]{0,1200}\}", | |
| "", | |
| cleaned, | |
| flags=re.IGNORECASE, | |
| ) | |
| return cleaned | |
| def _split_content_by_think_tags(text: str, in_think: bool) -> tuple[list[tuple[str, str]], bool]: | |
| """Split a content chunk into ordered thought/text segments by <think>/<thought> tags.""" | |
| if not text: | |
| return [], in_think | |
| segments: list[tuple[str, str]] = [] | |
| cursor = 0 | |
| current_in_think = in_think | |
| for match in THINK_TAG_REGEX.finditer(text): | |
| start, end = match.span() | |
| if start > cursor: | |
| piece = text[cursor:start] | |
| if piece: | |
| segments.append(("thought" if current_in_think else "text", piece)) | |
| tag = match.group(0).lower() | |
| current_in_think = not tag.startswith("</") | |
| cursor = end | |
| if cursor < len(text): | |
| piece = text[cursor:] | |
| if piece: | |
| segments.append(("thought" if current_in_think else "text", piece)) | |
| return segments, current_in_think | |
| def _strip_inline_tool_protocol( | |
| text: str, | |
| tool_trace_depth: int, | |
| protocol_tail: str, | |
| ) -> tuple[str, int, str, bool]: | |
| """ | |
| Strip inline tool-protocol payloads from content chunks safely across chunk boundaries. | |
| Returns: | |
| cleaned_text, next_tool_trace_depth, next_protocol_tail, had_protocol_tokens | |
| """ | |
| combined = f"{protocol_tail}{text or ''}" | |
| if not combined: | |
| return "", tool_trace_depth, "", False | |
| # Keep trailing incomplete protocol marker for next chunk. | |
| tail = "" | |
| tail_start = -1 | |
| for m in re.finditer(r"<\s*/?\s*[||]", combined): | |
| tail_start = m.start() | |
| if tail_start != -1 and ">" not in combined[tail_start:]: | |
| tail = combined[tail_start:] | |
| combined = combined[:tail_start] | |
| if not combined: | |
| return "", tool_trace_depth, tail, bool(tail) | |
| # Non-destructive stripping: remove marker tokens only. | |
| matches = list(PROTOCOL_TAG_REGEX.finditer(combined)) | |
| had_protocol = bool(matches) | |
| if not had_protocol: | |
| return combined, 0, tail, bool(tail) | |
| cleaned = PROTOCOL_TAG_REGEX.sub("", combined) | |
| return cleaned, 0, tail, True | |
| def _squash_whitespace(text: Any) -> str: | |
| return re.sub(r"\s+", "", str(text or "")) | |
| def _extract_agent_info_from_event( | |
| run_event: Any, | |
| leader_id: str | None = None, | |
| leader_name: str | None = None, | |
| leader_emoji: str | None = None, | |
| agent_metadata: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Extract agent identification from a run event. | |
| For Team mode, member events have agent_id and agent_name directly on the event. | |
| If these are set, it's a member event; otherwise it's from the leader. | |
| Returns: | |
| dict with 'agent_id', 'agent_name', 'agent_role', 'agent_emoji', 'model', 'provider' keys | |
| """ | |
| agent_id = getattr(run_event, "agent_id", None) | |
| agent_name = getattr(run_event, "agent_name", None) | |
| agent_emoji = getattr(run_event, "agent_emoji", None) | |
| # Check if this matches the leader. | |
| # Important: some providers might use slightly different names, but if IDs match it's definitely leader. | |
| is_leader = False | |
| if leader_id and agent_id == leader_id: | |
| is_leader = True | |
| elif not agent_id and not agent_name: | |
| # Default to leader if no info is present | |
| is_leader = True | |
| elif not leader_id and agent_name == leader_name: | |
| is_leader = True | |
| # If the ID starts with 'qurio-' (default Agno IDs often follow this pattern) | |
| # and we are in team mode, and it's not explicitly a member ID in our metadata, | |
| # it's highly likely the leader's initialization event. | |
| elif str(agent_id or "").startswith("qurio-") and agent_metadata: | |
| is_leader = agent_id not in agent_metadata | |
| if is_leader: | |
| res = { | |
| "agent_id": leader_id, | |
| "agent_name": leader_name, | |
| "agent_role": "leader", | |
| "agent_emoji": leader_emoji, | |
| "model": None, | |
| "provider": None, | |
| } | |
| if agent_metadata and leader_id in agent_metadata: | |
| res.update(agent_metadata[leader_id]) | |
| return res | |
| # Otherwise treat as member | |
| if agent_id or agent_name: | |
| # Log at DEBUG level to reduce main stream noise, as switch logs will provide context. | |
| logger.debug(f"[TEAM] Member event detected: agent_id={agent_id}, agent_name={agent_name}") | |
| res = { | |
| "agent_id": agent_id, | |
| "agent_name": agent_name, | |
| "agent_role": "member", | |
| "agent_emoji": agent_emoji, | |
| "model": None, | |
| "provider": None, | |
| } | |
| # Enrich from metadata if possible | |
| if agent_metadata: | |
| if agent_id in agent_metadata: | |
| res.update(agent_metadata[agent_id]) | |
| elif agent_name in agent_metadata: | |
| res.update(agent_metadata[agent_name]) | |
| elif not agent_id and agent_name: # Fallback lookup by name if ID missing on event | |
| for meta_id, meta in agent_metadata.items(): | |
| if meta.get("name") == agent_name: | |
| res.update(meta) | |
| res["agent_id"] = meta_id | |
| break | |
| return res | |
| # Fallback to leader if totally ambiguous | |
| res = { | |
| "agent_id": leader_id, | |
| "agent_name": leader_name, | |
| "agent_role": "leader", | |
| "agent_emoji": leader_emoji, | |
| "model": None, | |
| "provider": None, | |
| } | |
| if agent_metadata and leader_id in agent_metadata: | |
| res.update(agent_metadata[leader_id]) | |
| return res | |
| def _is_reasoning_duplicate_of_content(reasoning: str, content: str) -> bool: | |
| """ | |
| Detect provider chunks where answer text is mirrored in reasoning_content. | |
| This prevents answer paragraphs from being rendered as a second thought block. | |
| """ | |
| reasoning_flat = _squash_whitespace(reasoning) | |
| content_flat = _squash_whitespace(content) | |
| if not reasoning_flat or not content_flat: | |
| return False | |
| if reasoning_flat == content_flat: | |
| return True | |
| shorter, longer = ( | |
| (reasoning_flat, content_flat) | |
| if len(reasoning_flat) <= len(content_flat) | |
| else (content_flat, reasoning_flat) | |
| ) | |
| if len(shorter) < 12: | |
| return False | |
| if shorter in longer and len(shorter) >= int(len(longer) * 0.75): | |
| return True | |
| return False | |
| def _is_stream_trace_enabled() -> bool: | |
| value = str(os.getenv("QURIO_STREAM_TRACE", "")).strip().lower() | |
| return value in {"1", "true", "yes", "on", "debug"} | |
| def _is_verbose_logs_enabled() -> bool: | |
| value = str(os.getenv("QURIO_VERBOSE_LOGS", "0")).strip().lower() | |
| return value in {"1", "true", "yes", "on", "debug"} | |
| def _log_verbose_info(message: str) -> None: | |
| if _is_verbose_logs_enabled(): | |
| logger.info(message) | |
| else: | |
| logger.debug(message) | |
| def _preview(text: Any, limit: int = 140) -> str: | |
| raw = str(text or "").replace("\n", "\\n") | |
| return raw[:limit] + ("..." if len(raw) > limit else "") | |
| def _extract_message_from_payload(payload: Any) -> str | None: | |
| if payload is None: | |
| return None | |
| if hasattr(payload, "model_dump"): | |
| try: | |
| payload = payload.model_dump() | |
| except Exception: | |
| payload = str(payload) | |
| if isinstance(payload, dict): | |
| for key in ("message", "error", "detail", "msg"): | |
| value = payload.get(key) | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| nested = _extract_message_from_payload(value) | |
| if nested: | |
| return nested | |
| for value in payload.values(): | |
| nested = _extract_message_from_payload(value) | |
| if nested: | |
| return nested | |
| return None | |
| if isinstance(payload, (list, tuple)): | |
| for item in payload: | |
| nested = _extract_message_from_payload(item) | |
| if nested: | |
| return nested | |
| return None | |
| text = str(payload).strip() | |
| if not text: | |
| return None | |
| # Agno RunErrorEvent repr: RunErrorEvent(..., content='Unknown model error', ...) | |
| run_error_content_match = re.search( | |
| r"""content\s*=\s*(['"])(.*?)\1""", | |
| text, | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| if run_error_content_match and run_error_content_match.group(2).strip(): | |
| return run_error_content_match.group(2).strip() | |
| for parser in (json.loads, ast.literal_eval): | |
| try: | |
| parsed = parser(text) | |
| nested = _extract_message_from_payload(parsed) | |
| if nested: | |
| return nested | |
| except Exception: | |
| pass | |
| json_like = re.search(r"(\{[\s\S]*\})", text) | |
| if json_like: | |
| snippet = json_like.group(1) | |
| for parser in (json.loads, ast.literal_eval): | |
| try: | |
| parsed = parser(snippet) | |
| nested = _extract_message_from_payload(parsed) | |
| if nested: | |
| return nested | |
| except Exception: | |
| pass | |
| message_match = re.search(r"""['"]message['"]\s*:\s*['"](.+?)['"]""", text, re.IGNORECASE) | |
| if message_match and message_match.group(1).strip(): | |
| return message_match.group(1).strip() | |
| return text | |
| def _extract_best_error_message(exc: Exception | Any) -> str: | |
| """Extract the most actionable provider message from nested exceptions.""" | |
| generic_markers = ("unknown model error", "unknown error", "model provider error") | |
| def _is_generic(text: str) -> bool: | |
| lowered = text.strip().lower() | |
| return any(marker in lowered for marker in generic_markers) | |
| candidates: list[str] = [] | |
| queue: list[Any] = [exc] | |
| seen: set[int] = set() | |
| while queue: | |
| current = queue.pop(0) | |
| if current is None: | |
| continue | |
| marker = id(current) | |
| if marker in seen: | |
| continue | |
| seen.add(marker) | |
| extracted = _extract_message_from_payload(current) | |
| if extracted and extracted.strip(): | |
| candidates.append(extracted.strip()) | |
| if isinstance(current, BaseException): | |
| queue.append(getattr(current, "__cause__", None)) | |
| queue.append(getattr(current, "__context__", None)) | |
| args = getattr(current, "args", None) | |
| if isinstance(args, tuple): | |
| queue.extend(args) | |
| for attr in ("content", "error", "message", "detail", "model_provider_data"): | |
| if hasattr(current, attr): | |
| queue.append(getattr(current, attr, None)) | |
| for msg in candidates: | |
| if not _is_generic(msg): | |
| return msg | |
| if candidates: | |
| # Avoid dumping full event repr like "RunErrorEvent(...)" to UI. | |
| filtered = [msg for msg in candidates if not msg.strip().lower().startswith("runerrorevent(")] | |
| if filtered: | |
| return min(filtered, key=len) | |
| return min(candidates, key=len) | |
| return str(exc or "Unknown error") | |
| def _extract_text_chunk(run_event: Any) -> str: | |
| """Extract assistant text only from explicit content fields. | |
| Shared between stream_chat() and _continue_hitl_run() to avoid duplication. | |
| """ | |
| provider_data = getattr(run_event, "model_provider_data", None) | |
| if isinstance(provider_data, dict): | |
| choices = provider_data.get("choices") or [] | |
| if choices and isinstance(choices[0], dict): | |
| delta = choices[0].get("delta") or {} | |
| raw_content = delta.get("content") | |
| if isinstance(raw_content, str) and raw_content: | |
| return raw_content | |
| if isinstance(raw_content, list): | |
| parts: list[str] = [] | |
| for item in raw_content: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| parts.append(str(text_part)) | |
| elif isinstance(item, str) and item: | |
| parts.append(item) | |
| if parts: | |
| return "".join(parts) | |
| raw_reasoning = delta.get("reasoning_content") | |
| if isinstance(raw_reasoning, str) and raw_reasoning: | |
| return "" | |
| if isinstance(raw_reasoning, list): | |
| reasoning_parts: list[str] = [] | |
| for item in raw_reasoning: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| reasoning_parts.append(str(text_part)) | |
| elif isinstance(item, str) and item: | |
| reasoning_parts.append(item) | |
| if reasoning_parts: | |
| return "" | |
| content = getattr(run_event, "content", None) | |
| if isinstance(content, str) and content: | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if isinstance(text_part, str) and text_part: | |
| parts.append(text_part) | |
| elif isinstance(item, str) and item: | |
| parts.append(item) | |
| if parts: | |
| return "".join(parts) | |
| if isinstance(provider_data, dict): | |
| choices = provider_data.get("choices") or [] | |
| if choices and isinstance(choices[0], dict): | |
| delta = choices[0].get("delta") or {} | |
| raw_content = delta.get("content") | |
| if isinstance(raw_content, str) and raw_content: | |
| return raw_content | |
| if isinstance(raw_content, list): | |
| parts: list[str] = [] | |
| for item in raw_content: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| parts.append(str(text_part)) | |
| elif isinstance(item, str) and item: | |
| parts.append(item) | |
| if parts: | |
| return "".join(parts) | |
| return "" | |
| def _extract_reasoning_chunk( | |
| run_event: Any, | |
| trace_fn: Any = None, | |
| ) -> str: | |
| """Extract reasoning/thought content from a stream event. | |
| Shared between stream_chat() and _continue_hitl_run() to avoid duplication. | |
| ``trace_fn`` is an optional callable(stage, **kwargs) for trace logging. | |
| """ | |
| def _trace(stage: str, **kwargs: Any) -> None: | |
| if trace_fn: | |
| trace_fn(stage, **kwargs) | |
| provider_data = getattr(run_event, "model_provider_data", None) | |
| if isinstance(provider_data, dict): | |
| choices = provider_data.get("choices") or [] | |
| if choices and isinstance(choices[0], dict): | |
| delta = choices[0].get("delta") or {} | |
| raw_reasoning = delta.get("reasoning_content") | |
| if isinstance(raw_reasoning, str) and raw_reasoning: | |
| _trace("reasoning_source", source="provider_data.delta.reasoning_content") | |
| return raw_reasoning | |
| if isinstance(raw_reasoning, list): | |
| parts: list[str] = [] | |
| for item in raw_reasoning: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| parts.append(str(text_part)) | |
| elif isinstance(item, str) and item: | |
| parts.append(item) | |
| if parts: | |
| _trace("reasoning_source", source="provider_data.delta.reasoning_content[]") | |
| return "".join(parts) | |
| reasoning = getattr(run_event, "reasoning_content", None) | |
| if isinstance(reasoning, str) and reasoning: | |
| _trace("reasoning_source", source="run_event.reasoning_content") | |
| return reasoning | |
| if isinstance(reasoning, list): | |
| parts: list[str] = [] | |
| for item in reasoning: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| parts.append(str(text_part)) | |
| elif isinstance(item, str) and item: | |
| parts.append(item) | |
| if parts: | |
| _trace("reasoning_source", source="run_event.reasoning_content[]") | |
| return "".join(parts) | |
| if isinstance(provider_data, dict): | |
| choices = provider_data.get("choices") or [] | |
| if choices and isinstance(choices[0], dict): | |
| delta = choices[0].get("delta") or {} | |
| raw_reasoning = delta.get("reasoning_content") | |
| if isinstance(raw_reasoning, str) and raw_reasoning: | |
| _trace("reasoning_source", source="provider_data.delta.reasoning_content") | |
| return raw_reasoning | |
| if isinstance(raw_reasoning, list): | |
| parts: list[str] = [] | |
| for item in raw_reasoning: | |
| if isinstance(item, dict): | |
| text_part = item.get("text") or item.get("content") | |
| if text_part: | |
| parts.append(str(text_part)) | |
| if parts: | |
| _trace("reasoning_source", source="provider_data.delta.reasoning_content[]") | |
| return "".join(parts) | |
| # NOTE: | |
| # Some providers (e.g. DeepSeek-compatible streams) may place | |
| # assistant answer tokens under `delta.reasoning`. | |
| # Treating that field as reasoning can misclassify answer text as thought. | |
| # Keep reasoning extraction strict to `reasoning_content` only. | |
| return "" | |
| def _is_raw_events_log_enabled() -> bool: | |
| value = str(os.getenv("QURIO_RAW_EVENTS_LOG", "0")).strip().lower() | |
| return value not in {"0", "false", "off", "no"} | |
| def _raw_events_log_path() -> Path: | |
| configured = str(os.getenv("QURIO_RAW_EVENTS_LOG_PATH", "")).strip() | |
| if configured: | |
| return Path(configured) | |
| logs_dir = Path(__file__).resolve().parents[2] / "logs" | |
| logs_dir.mkdir(parents=True, exist_ok=True) | |
| date_tag = datetime.utcnow().strftime("%Y%m%d") | |
| return logs_dir / f"agno_raw_events_{date_tag}.jsonl" | |
| def _append_raw_event_log( | |
| *, | |
| phase: str, | |
| request: StreamChatRequest, | |
| run_id: str | None, | |
| run_event: Any, | |
| ) -> None: | |
| if not _is_raw_events_log_enabled(): | |
| return | |
| try: | |
| event_name = str(getattr(run_event, "event", "") or "") | |
| content_chunk = _extract_text_chunk(run_event) | |
| reasoning_chunk = _extract_reasoning_chunk(run_event) | |
| payload = { | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "phase": phase, | |
| "provider": request.provider, | |
| "model": request.model, | |
| "conversation_id": request.conversation_id, | |
| "run_id": run_id or getattr(run_event, "run_id", None), | |
| "event_name": event_name, | |
| "raw_event_type": type(run_event).__name__, | |
| "has_content": bool(str(content_chunk or "").strip()), | |
| "has_reasoning_content": bool(str(reasoning_chunk or "").strip()), | |
| "content_preview": _preview(content_chunk), | |
| "reasoning_preview": _preview(reasoning_chunk), | |
| "raw_event": repr(run_event), | |
| } | |
| log_path = _raw_events_log_path() | |
| with log_path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(payload, ensure_ascii=False) + "\n") | |
| except Exception: | |
| # Never break stream flow due to diagnostics logging failures. | |
| return | |
| def _extract_completed_content_and_output( | |
| run_event: Any, | |
| streamed_content: str = "", | |
| ) -> tuple[str, Any]: | |
| """ | |
| Extract final assistant content/output from RunCompleted-style events. | |
| Shared between normal stream_chat() and HITL continuation to keep behavior aligned. | |
| """ | |
| agn_content = getattr(run_event, "content", None) | |
| run_response = getattr(run_event, "run_response", None) | |
| if not agn_content and run_response is not None: | |
| agn_content = getattr(run_response, "content", None) | |
| final_content = streamed_content or "" | |
| output = None | |
| # Structured output should override streamed text to preserve canonical payload. | |
| if agn_content and hasattr(agn_content, "model_dump"): | |
| output = agn_content | |
| final_content = json.dumps(agn_content.model_dump(), ensure_ascii=False) | |
| elif isinstance(agn_content, (dict, list)): | |
| output = agn_content | |
| final_content = json.dumps(agn_content, ensure_ascii=False) | |
| elif isinstance(agn_content, str) and agn_content.strip() and not final_content: | |
| final_content = agn_content | |
| # Fallback for providers that only keep final assistant text in run_response.messages. | |
| if not final_content and run_response is not None: | |
| try: | |
| rr_messages = getattr(run_response, "messages", None) or [] | |
| for rr_msg in reversed(rr_messages): | |
| rr_role = getattr(rr_msg, "role", None) | |
| rr_content = getattr(rr_msg, "content", None) | |
| if rr_role != "assistant": | |
| continue | |
| extracted = _extract_text_from_message_content(rr_content).strip() | |
| if extracted: | |
| final_content = extracted | |
| break | |
| except Exception: | |
| pass | |
| return final_content, output | |
| def _extract_text_from_message_content(content: Any) -> str: | |
| """ | |
| Best-effort text extraction for provider-specific assistant message payloads. | |
| """ | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, str): | |
| parts.append(item) | |
| continue | |
| if isinstance(item, dict): | |
| text_part = item.get("text") | |
| if isinstance(text_part, str): | |
| parts.append(text_part) | |
| continue | |
| content_part = item.get("content") | |
| if isinstance(content_part, str): | |
| parts.append(content_part) | |
| continue | |
| if isinstance(content_part, (list, dict)): | |
| nested = _extract_text_from_message_content(content_part) | |
| if nested: | |
| parts.append(nested) | |
| parts_part = item.get("parts") | |
| if isinstance(parts_part, (list, dict)): | |
| nested = _extract_text_from_message_content(parts_part) | |
| if nested: | |
| parts.append(nested) | |
| return "".join(parts) | |
| if isinstance(content, dict): | |
| text_part = content.get("text") | |
| if isinstance(text_part, str): | |
| return text_part | |
| content_part = content.get("content") | |
| if isinstance(content_part, str): | |
| return content_part | |
| if isinstance(content_part, (list, dict)): | |
| nested = _extract_text_from_message_content(content_part) | |
| if nested: | |
| return nested | |
| parts_part = content.get("parts") | |
| if isinstance(parts_part, (list, dict)): | |
| nested = _extract_text_from_message_content(parts_part) | |
| if nested: | |
| return nested | |
| return "" | |
| def _coerce_tool_result_payload(output: Any) -> Any: | |
| """ | |
| Normalize tool output payload into JSON-friendly objects when possible. | |
| """ | |
| normalized = output | |
| if normalized and isinstance(normalized, str): | |
| try: | |
| normalized = json.loads(normalized) | |
| except json.JSONDecodeError: | |
| pass | |
| if isinstance(normalized, str): | |
| try: | |
| parsed = ast.literal_eval(normalized) | |
| if isinstance(parsed, dict): | |
| normalized = parsed | |
| except (ValueError, SyntaxError): | |
| pass | |
| return normalized | |
| def _build_tool_result_event( | |
| tool: Any, | |
| duration_ms: int | None, | |
| normalize_tool_output_fn: Any, | |
| agent_info: dict[str, str | None] | None = None, | |
| ) -> tuple[dict[str, Any], Any]: | |
| """ | |
| Build frontend ToolResultEvent payload and return parsed tool output. | |
| """ | |
| output = _coerce_tool_result_payload(normalize_tool_output_fn(getattr(tool, "result", None))) | |
| event = ToolResultEvent( | |
| id=getattr(tool, "tool_call_id", None), | |
| name=getattr(tool, "tool_name", "") or "", | |
| status="done" if not getattr(tool, "tool_call_error", None) else "error", | |
| output=output, | |
| durationMs=duration_ms, | |
| agent_id=agent_info.get("agent_id") if agent_info else None, | |
| agent_name=agent_info.get("agent_name") if agent_info else None, | |
| agent_role=agent_info.get("agent_role") if agent_info else None, | |
| agent_emoji=agent_info.get("agent_emoji") if agent_info else None, | |
| ).model_dump(by_alias=True, exclude_none=True) | |
| return event, output | |
| def _build_tool_call_event( | |
| tool: Any, | |
| text_index: int, | |
| include_none: bool = False, | |
| agent_info: dict[str, str | None] | None = None, | |
| ) -> dict[str, Any]: | |
| payload = ToolCallEvent( | |
| id=getattr(tool, "tool_call_id", None), | |
| name=getattr(tool, "tool_name", "") or "", | |
| arguments=json.dumps(getattr(tool, "tool_args", None) or {}), | |
| text_index=text_index, | |
| agent_id=agent_info.get("agent_id") if agent_info else None, | |
| agent_name=agent_info.get("agent_name") if agent_info else None, | |
| agent_role=agent_info.get("agent_role") if agent_info else None, | |
| agent_emoji=agent_info.get("agent_emoji") if agent_info else None, | |
| ) | |
| if include_none: | |
| return payload.model_dump(by_alias=True, exclude_none=False) | |
| return payload.model_dump(by_alias=True, exclude_none=True) | |
| def _normalize_interactive_form_fields(raw_fields: Any) -> list[dict[str, Any]]: | |
| """ | |
| Normalize interactive_form fields to a strict list[dict]. | |
| Some providers/tool runtimes return fields as a JSON string; this helper | |
| parses and sanitizes that shape so FormRequestEvent validation won't fail. | |
| """ | |
| parsed = raw_fields | |
| if isinstance(parsed, str): | |
| text = parsed.strip() | |
| if not text: | |
| return [] | |
| try: | |
| parsed = json.loads(text) | |
| except Exception: | |
| try: | |
| parsed = ast.literal_eval(text) | |
| except Exception: | |
| logger.warning("interactive_form fields is invalid string, fallback to empty list") | |
| return [] | |
| if isinstance(parsed, dict): | |
| maybe_fields = parsed.get("fields") | |
| if isinstance(maybe_fields, list): | |
| parsed = maybe_fields | |
| else: | |
| return [] | |
| if not isinstance(parsed, list): | |
| return [] | |
| normalized: list[dict[str, Any]] = [] | |
| used_names: set[str] = set() | |
| def _slugify_name(value: Any, fallback_index: int) -> str: | |
| base = re.sub(r"[^a-zA-Z0-9_]+", "_", str(value or "").strip().lower()).strip("_") | |
| if not base: | |
| base = f"field_{fallback_index}" | |
| candidate = base | |
| suffix = 2 | |
| while candidate in used_names: | |
| candidate = f"{base}_{suffix}" | |
| suffix += 1 | |
| used_names.add(candidate) | |
| return candidate | |
| def _normalize_field_type(value: Any) -> str: | |
| candidate = str(value or "").strip().lower() | |
| if candidate in {"text", "number", "select", "checkbox", "range"}: | |
| return candidate | |
| return "text" | |
| for idx, item in enumerate(parsed, start=1): | |
| if isinstance(item, str): | |
| label = item.strip() | |
| if not label: | |
| continue | |
| normalized.append( | |
| { | |
| "name": _slugify_name(label, idx), | |
| "label": label, | |
| "type": "text", | |
| "required": False, | |
| } | |
| ) | |
| continue | |
| if not isinstance(item, dict): | |
| logger.warning("interactive_form field item is not dict, skipped: %s", type(item).__name__) | |
| continue | |
| raw_name = item.get("name") | |
| raw_label = item.get("label") | |
| label = str(raw_label or raw_name or f"Field {idx}").strip() or f"Field {idx}" | |
| field_name = _slugify_name(raw_name or label, idx) | |
| field_type = _normalize_field_type(item.get("type")) | |
| normalized_item = dict(item) | |
| normalized_item["name"] = field_name | |
| normalized_item["label"] = label | |
| normalized_item["type"] = field_type | |
| normalized_item["required"] = bool(item.get("required", False)) | |
| normalized.append(normalized_item) | |
| return normalized | |
| def _extract_interactive_form_payload(req: Any, default_title: str) -> tuple[str | None, str, list[dict[str, Any]]]: | |
| """Extract interactive form payload from requirement in a validation-safe way.""" | |
| tool_args = req.tool_execution.tool_args if getattr(req, "tool_execution", None) else {} | |
| if not isinstance(tool_args, dict): | |
| tool_args = {} | |
| form_id = tool_args.get("id") | |
| title = str(tool_args.get("title") or default_title) | |
| fields = _normalize_interactive_form_fields(tool_args.get("fields", [])) | |
| return form_id, title, fields | |
| def _is_interactive_form_requirement(req: Any) -> bool: | |
| tool_exec = getattr(req, "tool_execution", None) | |
| tool_name = getattr(tool_exec, "tool_name", None) if tool_exec else None | |
| return tool_name == "interactive_form" | |
| class StreamChatService: | |
| """Stream chat service implemented using Agno Agent streaming events.""" | |
| def __init__(self) -> None: | |
| self._last_memory_optimization: dict[str, float] = {} | |
| async def stream_chat( | |
| self, | |
| request: StreamChatRequest, | |
| ) -> AsyncGenerator[dict[str, Any], None]: | |
| """ | |
| Stream chat completion with HITL support. | |
| If request.run_id is present, this is a resumption request after form submission. | |
| Otherwise, this is a normal chat request. | |
| """ | |
| # ================================================================ | |
| # HITL: Check if this is a resumption request | |
| # ================================================================ | |
| if request.run_id and request.field_values: | |
| _log_verbose_info(f"Detected HITL resumption request (run_id: {request.run_id})") | |
| async for event in self._continue_hitl_run(request): | |
| yield event | |
| return | |
| # Debug: Log request fields for expert mode | |
| if request.expert_mode: | |
| logger.info(f"[DEBUG] Expert mode request - leader_agent_id: {getattr(request, 'leader_agent_id', 'NOT_FOUND')}, team_agent_ids: {getattr(request, 'team_agent_ids', [])}") | |
| # ================================================================ | |
| # Normal chat flow | |
| # ================================================================ | |
| try: | |
| if not request.provider: | |
| raise ValueError("Missing required field: provider") | |
| if not request.messages: | |
| raise ValueError("Missing required field: messages") | |
| # Enable skills for the definitive user-facing chat agent | |
| request.enable_skills = True | |
| # Build standard agent or Team | |
| agent_metadata: dict[str, Any] = {} | |
| if request.expert_mode and getattr(request, "team_agent_ids", []): | |
| # Log leader configuration for debugging | |
| logger.info(f"[TEAM] Building team - leader_agent_id: {getattr(request, 'leader_agent_id', None)}, team_agent_ids: {request.team_agent_ids}") | |
| # 1. Resolve Leader Configuration if ID is provided | |
| if getattr(request, "leader_agent_id", None): | |
| request = resolve_agent_config(request.leader_agent_id, request) | |
| logger.info(f"[TEAM] Leader resolved - agent_id: {getattr(request, 'agent_id', None)}, agent_name: {getattr(request, 'agent_name', None)}") | |
| if request.agent_id: | |
| agent_metadata[request.agent_id] = { | |
| "model": request.model, | |
| "provider": request.provider, | |
| } | |
| # 2. Resolve Member Agents | |
| members = [] | |
| for a_id in request.team_agent_ids: | |
| import copy | |
| sub_req = copy.deepcopy(request) | |
| sub_req.expert_mode = False | |
| # Fetch actual member config from DB | |
| member_req = resolve_agent_config(a_id, sub_req) | |
| members.append(get_agent_for_provider(member_req)) | |
| # Capture member metadata | |
| if member_req.agent_id: | |
| agent_metadata[member_req.agent_id] = { | |
| "model": member_req.model, | |
| "provider": member_req.provider, | |
| } | |
| if member_req.agent_name: | |
| agent_metadata[member_req.agent_name] = { | |
| "model": member_req.model, | |
| "provider": member_req.provider, | |
| } | |
| # 3. Build the Team with resolved leader (request) and members | |
| agent = build_team(request, members) | |
| is_team_mode = True | |
| else: | |
| agent = get_agent_for_provider(request) | |
| is_team_mode = False | |
| sources_map: dict[str, Any] = {} | |
| full_content = "" | |
| full_thought = "" | |
| tool_start_times: dict[str, float] = {} | |
| should_break_next_thought = False | |
| in_reasoning_phase = False | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| stream_trace = _is_stream_trace_enabled() | |
| # Current agent info for Team mode (updated per event) | |
| current_agent_info: dict[str, Any] = {"agent_id": None, "agent_name": None} | |
| last_active_agent_id = None | |
| def trace_stream(stage: str, **kwargs: Any) -> None: | |
| if not stream_trace: | |
| return | |
| payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()]) | |
| logger.info(f"[STREAM_TRACE][main] {stage} | {payload}") | |
| def emit_thought_part(part: str): | |
| nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle | |
| text = _strip_internal_tool_trace(str(part or "")) | |
| if not text or not text.strip(): | |
| return | |
| should_break_next_thought = False | |
| in_reasoning_phase = True | |
| full_thought += text | |
| trace_stream("emit_reasoning", reasoning_preview=_preview(text)) | |
| current_text_index = len(full_content) | |
| yield ThoughtEvent( | |
| content=text, | |
| text_index=current_text_index, | |
| agent_id=current_agent_info.get("agent_id"), | |
| agent_name=current_agent_info.get("agent_name"), | |
| agent_status=current_agent_info.get("status"), | |
| ).model_dump(by_alias=True, exclude_none=True) | |
| def process_text(text: str): | |
| nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle | |
| clean_text = _strip_internal_tool_trace(text) | |
| if clean_text: | |
| has_visible_text = bool(clean_text.strip()) | |
| if has_visible_text: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = True | |
| full_content += clean_text | |
| yield TextEvent( | |
| content=clean_text, | |
| agent_id=current_agent_info.get("agent_id"), | |
| agent_name=current_agent_info.get("agent_name"), | |
| agent_status=current_agent_info.get("status"), | |
| ).model_dump(by_alias=True, exclude_none=True) | |
| # Agent status tracking for Team mode | |
| agent_statuses: dict[str, str] = {} | |
| def set_agent_status(agent_id: str | None, status: str): | |
| if not agent_id: return | |
| if agent_statuses.get(agent_id) == status: return | |
| agent_statuses[agent_id] = status | |
| return AgentStatusEvent(agentId=agent_id, status=status).model_dump(by_alias=True) | |
| async def update_status_and_yield(agent_id: str | None, status: str): | |
| event = set_agent_status(agent_id, status) | |
| if event: | |
| yield event | |
| # Context management now handled by Agno's num_history_runs parameter | |
| messages = request.messages | |
| pre_events: list[dict[str, Any]] = [] | |
| messages = self._inject_local_time_context(messages, request, pre_events) | |
| enabled_tool_names = self._collect_enabled_tool_names(request) | |
| messages = self._inject_tool_guidance(messages, enabled_tool_names, request) | |
| for event in pre_events: | |
| yield event | |
| # ================================================================ | |
| # MANUAL CONTEXT MANAGEMENT (Rolling Summary + Fixed Window) | |
| # ================================================================ | |
| # 1. Fetch Session Summary from DB | |
| session_summary_text = None | |
| old_summary_json = None | |
| if request.conversation_id: | |
| try: | |
| from ..models.db import DbFilter, DbQueryRequest | |
| from .db_service import execute_db_async, get_db_adapter | |
| adapter = get_db_adapter(request.database_provider) | |
| if adapter: | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="conversations", | |
| columns=["session_summary"], | |
| filters=[DbFilter(op="eq", column="id", value=request.conversation_id)], | |
| maybeSingle=True, | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.data and isinstance(result.data, dict): | |
| row = result.data | |
| raw_summary = row.get("session_summary") | |
| if raw_summary: | |
| # Parsing handled by adapter often, but double check | |
| if isinstance(raw_summary, str): | |
| try: | |
| old_summary_json = json.loads(raw_summary) | |
| except (ValueError, json.JSONDecodeError): | |
| pass | |
| elif isinstance(raw_summary, dict): | |
| old_summary_json = raw_summary | |
| if old_summary_json: | |
| session_summary_text = old_summary_json.get("summary") | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch session summary: {e}") | |
| logger.error(f"Failed to fetch session summary: {e}") | |
| # 2. Slice History (Turn-Based Window) | |
| # Strategy: Keep all System messages + Last N User turns (User + AI + Tools) | |
| # N comes from frontend context setting: contextTurns. | |
| raw_turn_limit = request.context_turn_limit | |
| turn_limit = ( | |
| max(1, min(50, int(raw_turn_limit))) | |
| if isinstance(raw_turn_limit, int) and raw_turn_limit > 0 | |
| else 2 | |
| ) | |
| # Separate System and Non-System | |
| system_messages = [m for m in messages if m.get("role") == "system"] | |
| chat_messages = [m for m in messages if m.get("role") != "system"] | |
| # Find the indices of User messages to determine run boundaries | |
| user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"] | |
| user_turn_count = len(user_indices) | |
| if user_turn_count > turn_limit: | |
| cutoff_index = user_indices[-turn_limit] | |
| recent_history = chat_messages[cutoff_index:] | |
| else: | |
| recent_history = chat_messages | |
| # For single-turn requests (common during first-turn regenerate), | |
| # using persisted summary can re-introduce stale assistant text. | |
| # In this case, use fresh request messages only and rebuild summary from this turn. | |
| is_single_user_turn = user_turn_count <= 1 | |
| # Force rebuild if it's the first turn OR if the user is editing/regenerating | |
| should_rebuild_summary = bool(is_single_user_turn or request.is_editing) | |
| # Inject summary only when history exceeds turn window and request is not rebuild flow. | |
| should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit) and (not should_rebuild_summary) | |
| if not should_inject_summary and session_summary_text: | |
| _log_verbose_info( | |
| "Skipping session summary injection (within turn window or single-turn rebuild context)." | |
| ) | |
| session_summary_text = None | |
| old_summary_json = None | |
| # 3. Inject Summary into System Prompt | |
| if session_summary_text: | |
| summary_prompt = ( | |
| "\n\nSession memory summary:\n" | |
| "Here is a summary of the conversation so far. Use this to understand long-term context, " | |
| "but prioritize the details in the recent messages below.\n" | |
| f"{session_summary_text}\n" | |
| ) | |
| # Inject into the LAST system message, or create a new one if none exist | |
| if system_messages: | |
| last_sys = system_messages[-1] | |
| # Avoid appending if already present (defensive) | |
| if "Session memory summary:" not in str(last_sys.get("content", "")): | |
| new_content = str(last_sys.get("content", "")) + summary_prompt | |
| # Update the dict (need to be careful not to mutate original request list in place if reused, but here it's fine) | |
| last_sys["content"] = new_content | |
| else: | |
| system_messages.append({"role": "system", "content": summary_prompt}) | |
| # Final Agent Input | |
| agent_input = system_messages + recent_history | |
| stream = agent.arun( | |
| input=agent_input, | |
| stream=True, | |
| stream_events=True, | |
| user_id=request.user_id, | |
| session_id=request.conversation_id, | |
| # Only pass explicit structured-output schema. | |
| # Do not fallback to response_format, otherwise {"type":"json_object"} | |
| # may be treated as grammar and trigger provider-side grammar cache errors. | |
| output_schema=request.output_schema, | |
| ) | |
| # ================================================================ | |
| # Stream processing with HITL support | |
| # ================================================================ | |
| async for run_event in stream: | |
| _append_raw_event_log( | |
| phase="main", | |
| request=request, | |
| run_id=getattr(run_event, "run_id", None), | |
| run_event=run_event, | |
| ) | |
| # ============================================================ | |
| # HITL: Check if agent paused for user input | |
| # ============================================================ | |
| if hasattr(run_event, 'is_paused') and run_event.is_paused: | |
| logger.info(f"Agent paused for HITL (run_id: {run_event.run_id})") | |
| # Extract requirements | |
| requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None) | |
| if requirements: | |
| form_requirements = [req for req in requirements if _is_interactive_form_requirement(req)] | |
| if not form_requirements: | |
| logger.info("Agent paused without interactive_form; skipping HITL form handling") | |
| yield DoneEvent( | |
| content=full_content or "", | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| return | |
| # Save to Supabase | |
| try: | |
| paused_tools = getattr(run_event, "tools", None) or [] | |
| serialized_tools = [ | |
| tool.to_dict() if hasattr(tool, "to_dict") else tool | |
| for tool in paused_tools | |
| if tool is not None | |
| ] | |
| serialized_requirements = [ | |
| req.to_dict() if hasattr(req, "to_dict") else req | |
| for req in form_requirements | |
| ] | |
| paused_run_output = { | |
| "run_id": getattr(run_event, "run_id", None), | |
| "session_id": getattr(run_event, "session_id", None) | |
| or request.conversation_id, | |
| "user_id": request.user_id, | |
| "messages": messages or [], | |
| "tools": serialized_tools, | |
| "requirements": serialized_requirements, | |
| "status": "PAUSED", | |
| } | |
| logger.info( | |
| f"[HITL] Saving pending run_id={run_event.run_id} " | |
| f"with database_provider={request.database_provider}" | |
| ) | |
| hitl_storage = get_hitl_storage(request.database_provider) | |
| saved = await hitl_storage.save_pending_run( | |
| run_id=run_event.run_id, | |
| requirements=form_requirements, | |
| conversation_id=request.conversation_id, | |
| user_id=request.user_id, | |
| agent_model=request.model, | |
| messages=messages, | |
| run_output=paused_run_output, | |
| ) | |
| if not saved: | |
| raise RuntimeError("Failed to persist HITL pending run") | |
| # Extract form fields for frontend | |
| for req in form_requirements: | |
| # Handle external execution (e.g., interactive_form with external_execution=True) | |
| if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \ | |
| (req.tool_execution and req.tool_execution.tool_name == "interactive_form"): | |
| form_id, title, fields = _extract_interactive_form_payload( | |
| req, | |
| default_title="Please provide the following information", | |
| ) | |
| # Send form_request event to frontend | |
| yield FormRequestEvent( | |
| run_id=run_event.run_id, | |
| form_id=form_id, | |
| title=title, | |
| fields=fields | |
| ).model_dump() | |
| # Fallback handle traditional user input (e.g., get_user_input) | |
| elif req.needs_user_input and req.user_input_schema: | |
| # Convert from user_input_schema | |
| form_id = None | |
| title = "Please provide the following information" | |
| fields = [ | |
| { | |
| "name": field.name, | |
| "type": self._map_field_type_to_frontend(field.field_type), | |
| "label": field.description or field.name, | |
| "required": True, | |
| "value": field.value | |
| } | |
| for field in req.user_input_schema | |
| ] | |
| # Send form_request event to frontend | |
| yield FormRequestEvent( | |
| run_id=run_event.run_id, | |
| form_id=form_id, | |
| title=title, | |
| fields=fields | |
| ).model_dump() | |
| # Send done event to indicate pause | |
| yield DoneEvent( | |
| content=full_content or "", | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| _log_verbose_info(f"HITL pause successful, waiting for user submission (run_id: {run_event.run_id})") | |
| return # Exit stream, wait for user to submit form | |
| except Exception as e: | |
| logger.error(f"Failed to save HITL state: {e}") | |
| yield ErrorEvent(error=f"Failed to pause for form: {str(e)}").model_dump() | |
| return | |
| else: | |
| logger.warning("Agent paused but no requirements found") | |
| yield DoneEvent( | |
| content=full_content or "", | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| return | |
| # ============================================================ | |
| # Normal streaming events (use stream_events for details) | |
| # ============================================================ | |
| # Check if this is a detailed event (from stream_events=True) | |
| if hasattr(run_event, 'event'): | |
| # Extract agent info for Team mode (member vs leader identification) | |
| current_agent_info = _extract_agent_info_from_event( | |
| run_event, | |
| leader_id=request.agent_id, | |
| leader_name=request.agent_name, | |
| leader_emoji=request.agent_emoji, | |
| agent_metadata=agent_metadata, | |
| ) | |
| # Log active agent switch in Team mode | |
| if is_team_mode: | |
| current_id = current_agent_info.get("agent_id") | |
| if current_id != last_active_agent_id: | |
| last_active_agent_id = current_id | |
| active_name = current_agent_info.get("agent_name") | |
| active_role = current_agent_info.get("agent_role") | |
| active_model = current_agent_info.get("model") | |
| active_provider = current_agent_info.get("provider") | |
| logger.info( | |
| f"[TEAM] >>> Active Agent Switch: {active_name} ({active_role}) " | |
| f"| Model: {active_model} | Provider: {active_provider}" | |
| ) | |
| # Apply current tracked status to info for text/thought events | |
| current_agent_info["status"] = agent_statuses.get(current_id, "active") | |
| if current_agent_info.get("agent_role") == "member": | |
| trace_stream( | |
| "member_event", | |
| agent_id=current_agent_info.get("agent_id"), | |
| agent_name=current_agent_info.get("agent_name"), | |
| ) | |
| match run_event.event: | |
| case RunEvent.run_started.value | TeamRunEvent.run_started: | |
| if is_team_mode: | |
| active_id = current_agent_info.get("agent_id") | |
| active_name = current_agent_info.get("agent_name") | |
| active_role = current_agent_info.get("agent_role") | |
| active_model = current_agent_info.get("model") | |
| active_provider = current_agent_info.get("provider") | |
| logger.info( | |
| f"[TEAM] >>> run_started: {active_name} ({active_role}) " | |
| f"| Model: {active_model} | Provider: {active_provider}" | |
| ) | |
| # Member starts -> Leader waits, Member active | |
| if active_role == "member": | |
| # Ensure leader is set to waiting when member starts | |
| async for e in update_status_and_yield(request.agent_id, "waiting"): | |
| yield e | |
| async for e in update_status_and_yield(active_id, "active"): | |
| yield e | |
| else: | |
| # Leader starts -> Leader active | |
| async for e in update_status_and_yield(active_id, "active"): | |
| yield e | |
| continue | |
| case TeamRunEvent.run_completed: | |
| if is_team_mode: | |
| active_id = current_agent_info.get("agent_id") | |
| active_role = current_agent_info.get("agent_role") | |
| if active_role == "member": | |
| # Member finished -> Leader still waiting (until it resumes), Member ready | |
| async for e in update_status_and_yield(active_id, "ready"): | |
| yield e | |
| continue | |
| # Handle both Agent RunEvent and Team TeamRunEvent for content streaming | |
| case RunEvent.run_content.value | TeamRunEvent.run_content: | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| if had_protocol: | |
| trace_stream( | |
| "strip_tool_protocol", | |
| depth=inline_tool_trace_depth, | |
| tail_len=len(inline_protocol_tail), | |
| cleaned_preview=_preview(raw_content_chunk), | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| reasoning = raw_reasoning | |
| if reasoning and content_chunk and _is_reasoning_duplicate_of_content( | |
| str(reasoning), | |
| str(content_chunk), | |
| ): | |
| trace_stream( | |
| "suppress_reasoning_overlap", | |
| event="run_content", | |
| reasoning_preview=_preview(reasoning), | |
| content_preview=_preview(content_chunk), | |
| ) | |
| reasoning = "" | |
| has_content_chunk = bool(content_chunk) | |
| has_inline_thought = bool(inline_thought_chunk) | |
| trace_stream( | |
| "run_content", | |
| has_content=has_content_chunk, | |
| has_reasoning=bool(reasoning) or has_inline_thought, | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), | |
| ) | |
| has_any_thought = bool(reasoning) or has_inline_thought | |
| if has_any_thought and reasoning_closed_for_current_cycle: | |
| # Re-open reasoning phase (e.g. after tool call or interleaved model output). | |
| reasoning_closed_for_current_cycle = False | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| if reasoning: | |
| for event in emit_thought_part(str(reasoning)): | |
| yield event | |
| if has_inline_thought: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| case RunEvent.reasoning_content_delta.value | TeamRunEvent.reasoning_content_delta: | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| if had_protocol: | |
| trace_stream( | |
| "strip_tool_protocol", | |
| depth=inline_tool_trace_depth, | |
| tail_len=len(inline_protocol_tail), | |
| cleaned_preview=_preview(raw_content_chunk), | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| reasoning = raw_reasoning | |
| if reasoning and content_chunk and _is_reasoning_duplicate_of_content( | |
| str(reasoning), | |
| str(content_chunk), | |
| ): | |
| trace_stream( | |
| "suppress_reasoning_overlap", | |
| event="reasoning_content_delta", | |
| reasoning_preview=_preview(reasoning), | |
| content_preview=_preview(content_chunk), | |
| ) | |
| reasoning = "" | |
| has_content_chunk = bool(content_chunk) | |
| has_inline_thought = bool(inline_thought_chunk) | |
| trace_stream( | |
| "reasoning_delta", | |
| has_content=has_content_chunk, | |
| has_reasoning=bool(reasoning) or has_inline_thought, | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), | |
| ) | |
| has_any_thought = bool(reasoning) or has_inline_thought | |
| if has_any_thought and reasoning_closed_for_current_cycle: | |
| reasoning_closed_for_current_cycle = False | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| if reasoning: | |
| for event in emit_thought_part(str(reasoning)): | |
| yield event | |
| if has_inline_thought: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| case RunEvent.tool_call_started.value | TeamRunEvent.tool_call_started: | |
| tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment] | |
| tool = tool_event.tool | |
| if tool: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| if getattr(tool, "tool_call_id", None): | |
| tool_start_times[tool.tool_call_id] = time.time() | |
| current_id = current_agent_info.get("agent_id") | |
| # If leader calls a tool, it's either an internal tool (code, etc) or delegation. | |
| # During the tool call itself, the agent is "active". | |
| async for e in update_status_and_yield(current_id, "active"): | |
| yield e | |
| trace_stream( | |
| "tool_call_started", | |
| tool_name=getattr(tool, "tool_name", ""), | |
| tool_call_id=getattr(tool, "tool_call_id", None), | |
| ) | |
| current_text_index = len(full_content) | |
| yield _build_tool_call_event( | |
| tool, | |
| current_text_index, | |
| agent_info=current_agent_info | |
| ) | |
| case RunEvent.tool_call_completed.value | TeamRunEvent.tool_call_completed: | |
| tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment] | |
| tool = tool_event.tool | |
| if tool: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| duration_ms = None | |
| if tool.tool_call_id and tool.tool_call_id in tool_start_times: | |
| duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000) | |
| trace_stream( | |
| "tool_call_completed", | |
| tool_name=tool.tool_name or "", | |
| tool_call_id=tool.tool_call_id, | |
| is_error=bool(tool.tool_call_error), | |
| ) | |
| tool_result_event, output = _build_tool_result_event( | |
| tool, | |
| duration_ms, | |
| self._normalize_tool_output, | |
| agent_info=current_agent_info, | |
| ) | |
| yield tool_result_event | |
| self._collect_search_sources(output, sources_map) | |
| case RunEvent.run_completed.value | TeamRunEvent.run_completed: | |
| # Extract agent info to check if this is a member or leader | |
| event_agent_info = _extract_agent_info_from_event( | |
| run_event, | |
| leader_id=request.agent_id, | |
| leader_name=request.agent_name, | |
| leader_emoji=request.agent_emoji, | |
| agent_metadata=agent_metadata, | |
| ) | |
| is_member_completion = event_agent_info.get("agent_role") == "member" | |
| # In Team Mode, only terminate when the LEADER (no agent_id on event) completes. | |
| # Member completions should just let the main loop continue. | |
| if is_team_mode and is_member_completion: | |
| active_id = event_agent_info.get("agent_id") | |
| active_name = event_agent_info.get("agent_name") | |
| active_model = event_agent_info.get("model") | |
| active_provider = event_agent_info.get("provider") | |
| logger.info( | |
| f"[TEAM] Member {active_name} completed " | |
| f"(Model: {active_model} | Provider: {active_provider}). " | |
| "Continuing stream..." | |
| ) | |
| # Ensure member is marked as ready if not already handled by TeamRunEvent.run_completed | |
| async for e in update_status_and_yield(active_id, "ready"): | |
| yield e | |
| continue | |
| # Leader completed | |
| if is_team_mode: | |
| async for e in update_status_and_yield(request.agent_id, "idle"): | |
| yield e | |
| final_content, output = _extract_completed_content_and_output( | |
| run_event, | |
| full_content, | |
| ) | |
| yield DoneEvent( | |
| content=final_content or "", | |
| output=output, | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| if request: | |
| asyncio.create_task(self._maybe_optimize_memories(agent, request)) | |
| # 4. Trigger Async Session Summary Update | |
| # Only if conversation_id exists (Main Chat Flow) AND summary is enabled | |
| if request.conversation_id and request.enable_session_summary: | |
| # Prepare summary lines: | |
| # - Normal flow: incremental update with last user + new assistant | |
| # - Regenerate/Edit (or single-turn rebuild): rebuild from current request context + new assistant | |
| if should_rebuild_summary: | |
| new_lines = [ | |
| m for m in messages | |
| if m.get("role") in ("user", "assistant") | |
| ] | |
| new_lines.append({"role": "assistant", "content": final_content}) | |
| else: | |
| new_lines = [] | |
| last_user = next((m for m in reversed(messages) if m.get("role") == "user"), None) | |
| if last_user: | |
| new_lines.append(last_user) | |
| new_lines.append({"role": "assistant", "content": final_content}) | |
| _log_verbose_info(f"Triggering async summary update for {request.conversation_id} with {len(new_lines)} messages (rebuild: {should_rebuild_summary}, is_editing: {request.is_editing})") | |
| asyncio.create_task(update_session_summary( | |
| conversation_id=request.conversation_id, | |
| old_summary=old_summary_json, | |
| new_messages=new_lines, | |
| database_provider=request.database_provider, | |
| memory_provider=request.memory_provider, | |
| memory_model=request.memory_model, | |
| memory_api_key=request.memory_api_key, | |
| memory_base_url=request.memory_base_url, | |
| summary_provider=request.summary_provider, | |
| summary_model=request.summary_model, | |
| summary_api_key=request.summary_api_key, | |
| summary_base_url=request.summary_base_url, | |
| rebuild_from_scratch=should_rebuild_summary, | |
| )) | |
| return | |
| case RunEvent.run_error.value | TeamRunEvent.run_error: | |
| error_msg = _extract_best_error_message(run_event) | |
| active_id = current_agent_info.get("agent_id") | |
| if active_id: | |
| async for e in update_status_and_yield(active_id, "error"): | |
| yield e | |
| yield ErrorEvent(error=error_msg).model_dump() | |
| return | |
| else: | |
| # Simple event Fallback (no detailed event type), just check for content | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| if raw_reasoning: | |
| for event in emit_thought_part(str(raw_reasoning)): | |
| yield event | |
| if inline_thought_chunk: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| except Exception as exc: | |
| logger.error(f"Stream chat error: {exc}") | |
| yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump() | |
| async def _continue_hitl_run( | |
| self, | |
| request: StreamChatRequest, | |
| ) -> AsyncGenerator[dict[str, Any], None]: | |
| """ | |
| Continue a paused HITL run after user submits form. | |
| This method: | |
| 1. Retrieves requirements from storage | |
| 2. Rebuilds continuation messages with submitted form values | |
| 3. Runs agent.arun() and streams completion | |
| 4. Cleans up storage record | |
| """ | |
| try: | |
| run_id = request.run_id | |
| field_values = request.field_values or {} | |
| _log_verbose_info( | |
| f"[HITL] Continuing run_id={run_id!r} " | |
| f"with database_provider={request.database_provider} " | |
| f"and field_values={list(field_values.keys())}" | |
| ) | |
| # 1. Fetch Session Summary from DB | |
| session_summary_text = None | |
| old_summary_json = None | |
| if request.conversation_id: | |
| try: | |
| from ..models.db import DbFilter, DbQueryRequest | |
| from .db_service import execute_db_async, get_db_adapter | |
| adapter = get_db_adapter(request.database_provider) | |
| if adapter: | |
| req = DbQueryRequest( | |
| providerId=adapter.config.id, | |
| action="select", | |
| table="conversations", | |
| columns=["session_summary"], | |
| filters=[DbFilter(op="eq", column="id", value=request.conversation_id)], | |
| maybeSingle=True, | |
| ) | |
| result = await execute_db_async(adapter, req) | |
| if result.data and isinstance(result.data, dict): | |
| row = result.data | |
| raw_summary = row.get("session_summary") | |
| if raw_summary: | |
| if isinstance(raw_summary, str): | |
| try: | |
| old_summary_json = json.loads(raw_summary) | |
| except (ValueError, json.JSONDecodeError): | |
| pass | |
| elif isinstance(raw_summary, dict): | |
| old_summary_json = raw_summary | |
| if old_summary_json: | |
| session_summary_text = old_summary_json.get("summary") | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch session summary in HITL flow: {e}") | |
| # Retrieve requirements from Supabase | |
| hitl_storage = get_hitl_storage(request.database_provider) | |
| pending = await hitl_storage.get_pending_run(run_id) | |
| requirements = None | |
| saved_messages = None | |
| saved_run_output = None | |
| if isinstance(pending, dict): | |
| requirements = pending.get("requirements") | |
| saved_messages = pending.get("messages") | |
| saved_run_output = pending.get("run_output") | |
| else: | |
| requirements = pending | |
| _log_verbose_info( | |
| "[HITL] loaded pending payload: " | |
| f"has_requirements={bool(requirements)}, " | |
| f"messages_type={type(saved_messages).__name__ if saved_messages is not None else 'None'}, " | |
| f"has_run_output={saved_run_output is not None}, " | |
| f"run_output_type={type(saved_run_output).__name__ if saved_run_output is not None else 'None'}" | |
| ) | |
| if not requirements: | |
| logger.error( | |
| f"[HITL] No pending run found for run_id={run_id!r} " | |
| f"(database_provider={request.database_provider})" | |
| ) | |
| yield ErrorEvent(error="Form session expired or not found").model_dump() | |
| return | |
| # Enable skills for the definitive user-facing chat agent | |
| request.enable_skills = True | |
| # Get agent (same provider as original request) | |
| agent = get_agent_for_provider(request) | |
| _log_verbose_info(f"[HITL Continue] Agent instructions: {getattr(agent, 'instructions', None)}") | |
| full_content = "" | |
| full_thought = "" | |
| sources_map: dict[str, Any] = {} | |
| tool_start_times: dict[str, float] = {} | |
| should_break_next_thought = False | |
| in_reasoning_phase = False | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| stream_trace = _is_stream_trace_enabled() | |
| paused_again = False # Flag to prevent cleanup when multi-form chaining occurs | |
| stream_had_error = False | |
| completed_content_fallback = "" | |
| saw_terminal_completion = False | |
| continuation_event_count = 0 | |
| last_event_name: str | None = None | |
| last_event_type: str | None = None | |
| last_event_run_id: str | None = None | |
| # Current agent info for Team mode (updated per event) | |
| current_agent_info: dict[str, Any] = {"agent_id": request.agent_id, "agent_name": request.agent_name} | |
| def trace_stream(stage: str, **kwargs: Any) -> None: | |
| if not stream_trace: | |
| return | |
| payload = ", ".join([f"{k}={v}" for k, v in kwargs.items()]) | |
| logger.info(f"[STREAM_TRACE][hitl] {stage} | {payload}") | |
| def emit_thought_part(part: str): | |
| nonlocal full_thought, full_content, should_break_next_thought, in_reasoning_phase, reasoning_closed_for_current_cycle | |
| text = _strip_internal_tool_trace(str(part or "")) | |
| if not text or not text.strip(): | |
| return | |
| should_break_next_thought = False | |
| in_reasoning_phase = True | |
| full_thought += text | |
| trace_stream("emit_reasoning", reasoning_preview=_preview(text)) | |
| current_text_index = len(full_content) | |
| yield ThoughtEvent( | |
| content=text, | |
| text_index=current_text_index, | |
| agent_id=current_agent_info.get("agent_id"), | |
| agent_name=current_agent_info.get("agent_name"), | |
| ).model_dump(by_alias=True, exclude_none=True) | |
| def process_text(text: str): | |
| nonlocal full_content, in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle | |
| clean_text = _strip_internal_tool_trace(text) | |
| if clean_text: | |
| has_visible_text = bool(clean_text.strip()) | |
| if has_visible_text: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = True | |
| full_content += clean_text | |
| yield TextEvent( | |
| content=clean_text, | |
| agent_id=current_agent_info.get("agent_id"), | |
| agent_name=current_agent_info.get("agent_name"), | |
| ).model_dump(by_alias=True, exclude_none=True) | |
| async def _iterate_run_stream(stream: Any): | |
| """ | |
| Normalize both async and sync Agno run streams into an async iterator. | |
| """ | |
| if hasattr(stream, "__aiter__"): | |
| async for item in stream: | |
| yield item | |
| return | |
| iterator = iter(stream) | |
| sentinel = object() | |
| while True: | |
| item = await asyncio.to_thread(lambda: next(iterator, sentinel)) | |
| if item is sentinel: | |
| break | |
| yield item | |
| async def _stream_events(stream): | |
| nonlocal full_content, full_thought, sources_map, tool_start_times, paused_again, stream_had_error | |
| nonlocal in_reasoning_phase, should_break_next_thought, reasoning_closed_for_current_cycle | |
| nonlocal in_content_think_block, inline_tool_trace_depth, inline_protocol_tail | |
| nonlocal completed_content_fallback, saw_terminal_completion | |
| nonlocal continuation_event_count, last_event_name, last_event_type, last_event_run_id | |
| async for run_event in _iterate_run_stream(stream): | |
| _append_raw_event_log( | |
| phase="hitl_continuation", | |
| request=request, | |
| run_id=run_id, | |
| run_event=run_event, | |
| ) | |
| continuation_event_count += 1 | |
| last_event_type = type(run_event).__name__ | |
| last_event_name = str(getattr(run_event, "event", None) or last_event_type) | |
| last_event_run_id = str(raw_event_run_id) if raw_event_run_id else None | |
| # Extract agent info for Team mode (though Team HITL is currently disabled) | |
| current_agent_info = _extract_agent_info_from_event( | |
| run_event, | |
| leader_id=request.agent_id, | |
| leader_name=request.agent_name, | |
| leader_emoji=request.agent_emoji, | |
| ) | |
| # When yield_run_output=True, acontinue_run may yield the final RunOutput object. | |
| # Capture its canonical content as a robust fallback for providers that emit sparse events. | |
| # IMPORTANT: Do NOT re-emit this content via process_text() if we already received | |
| # streaming chunks (run_content events). Doing so would cause the full answer to be | |
| # appended a second time, resulting in visible duplication in the UI. | |
| # Only use RunOutput.content as a fallback when the stream produced nothing. | |
| if isinstance(run_event, RunOutput): | |
| saw_terminal_completion = True | |
| completed_content_fallback, _ = _extract_completed_content_and_output( | |
| run_event, | |
| completed_content_fallback or full_content, | |
| ) | |
| # Only emit if no content was streamed yet (sparse-event provider fallback). | |
| if not full_content: | |
| text_from_output = _extract_text_from_message_content( | |
| getattr(run_event, "content", None) | |
| ).strip() | |
| if text_from_output: | |
| for e in process_text(text_from_output): | |
| yield e | |
| continue | |
| # HITL Pause Check | |
| if hasattr(run_event, 'is_paused') and run_event.is_paused: | |
| logger.info(f"Agent paused again during continuation (multi-form chain, run_id: {run_id})") | |
| # Extract new requirements | |
| new_requirements = getattr(run_event, 'active_requirements', None) or getattr(run_event, 'requirements', None) | |
| if new_requirements: | |
| form_requirements = [req for req in new_requirements if _is_interactive_form_requirement(req)] | |
| if form_requirements: | |
| # Save new form requirements (overwrites previous in memory) | |
| hitl_storage_multi = get_hitl_storage(request.database_provider) | |
| saved = await hitl_storage_multi.save_pending_run( | |
| run_id=run_id, | |
| requirements=form_requirements, | |
| conversation_id=request.conversation_id, | |
| user_id=request.user_id, | |
| agent_model=request.model, | |
| messages=saved_messages, # Reuse saved messages | |
| run_output=( | |
| { | |
| "run_id": getattr(run_event, "run_id", None) or run_id, | |
| "session_id": getattr(run_event, "session_id", None) | |
| or request.conversation_id, | |
| "user_id": request.user_id, | |
| "messages": saved_messages or [], | |
| "tools": [ | |
| tool.to_dict() if hasattr(tool, "to_dict") else tool | |
| for tool in (getattr(run_event, "tools", None) or []) | |
| if tool is not None | |
| ], | |
| "requirements": [ | |
| req.to_dict() if hasattr(req, "to_dict") else req | |
| for req in form_requirements | |
| ], | |
| "status": "PAUSED", | |
| } | |
| ), | |
| ) | |
| if not saved: | |
| raise RuntimeError("Failed to persist chained HITL pending run") | |
| # Extract form fields and notify frontend | |
| for req in form_requirements: | |
| if (hasattr(req, 'needs_external_execution') and req.needs_external_execution) or \ | |
| (req.tool_execution and req.tool_execution.tool_name == "interactive_form"): | |
| form_id, title, fields = _extract_interactive_form_payload( | |
| req, | |
| default_title="Please provide additional information", | |
| ) | |
| yield FormRequestEvent( | |
| run_id=run_id, | |
| form_id=form_id, | |
| title=title, | |
| fields=fields | |
| ).model_dump() | |
| # Send partial done event | |
| yield DoneEvent( | |
| content=full_content or "", | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| logger.info(f"Multi-form: saved second form, waiting for user (run_id: {run_id})") | |
| paused_again = True # Mark as paused again to skip cleanup | |
| return | |
| # If no form requirements, just continue | |
| logger.warning(f"Agent paused again but no interactive_form found (run_id: {run_id})") | |
| yield ErrorEvent(error="Agent paused unexpectedly").model_dump() | |
| return | |
| # Check if this is a detailed event (from stream_events=True or implicit) | |
| if hasattr(run_event, 'event'): | |
| match run_event.event: | |
| case RunEvent.run_content.value: | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| if had_protocol: | |
| trace_stream( | |
| "strip_tool_protocol", | |
| depth=inline_tool_trace_depth, | |
| tail_len=len(inline_protocol_tail), | |
| cleaned_preview=_preview(raw_content_chunk), | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event, trace_fn=trace_stream) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| reasoning = raw_reasoning | |
| if reasoning and content_chunk and _is_reasoning_duplicate_of_content( | |
| str(reasoning), | |
| str(content_chunk), | |
| ): | |
| trace_stream( | |
| "suppress_reasoning_overlap", | |
| event="run_content", | |
| reasoning_preview=_preview(reasoning), | |
| content_preview=_preview(content_chunk), | |
| ) | |
| reasoning = "" | |
| has_content_chunk = bool(content_chunk) | |
| has_inline_thought = bool(inline_thought_chunk) | |
| trace_stream( | |
| "run_content", | |
| has_content=has_content_chunk, | |
| has_reasoning=bool(reasoning) or has_inline_thought, | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), | |
| ) | |
| has_any_thought = bool(reasoning) or has_inline_thought | |
| if has_any_thought and reasoning_closed_for_current_cycle: | |
| reasoning_closed_for_current_cycle = False | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| if reasoning: | |
| for event in emit_thought_part(str(reasoning)): | |
| yield event | |
| if has_inline_thought: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| trace_stream( | |
| "emit_content", | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| ) | |
| case RunEvent.reasoning_content_delta.value: | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, had_protocol = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| if had_protocol: | |
| trace_stream( | |
| "strip_tool_protocol", | |
| depth=inline_tool_trace_depth, | |
| tail_len=len(inline_protocol_tail), | |
| cleaned_preview=_preview(raw_content_chunk), | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| reasoning = raw_reasoning | |
| if reasoning and content_chunk and _is_reasoning_duplicate_of_content( | |
| str(reasoning), | |
| str(content_chunk), | |
| ): | |
| trace_stream( | |
| "suppress_reasoning_overlap", | |
| event="reasoning_content_delta", | |
| reasoning_preview=_preview(reasoning), | |
| content_preview=_preview(content_chunk), | |
| ) | |
| reasoning = "" | |
| has_content_chunk = bool(content_chunk) | |
| has_inline_thought = bool(inline_thought_chunk) | |
| trace_stream( | |
| "reasoning_delta", | |
| has_content=has_content_chunk, | |
| has_reasoning=bool(reasoning) or has_inline_thought, | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| reasoning_preview=_preview((reasoning or "") + (inline_thought_chunk or "")), | |
| ) | |
| has_any_thought = bool(reasoning) or has_inline_thought | |
| if has_any_thought and reasoning_closed_for_current_cycle: | |
| reasoning_closed_for_current_cycle = False | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| if reasoning: | |
| for event in emit_thought_part(str(reasoning)): | |
| yield event | |
| if has_inline_thought: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| trace_stream( | |
| "emit_content", | |
| reasoning_closed=reasoning_closed_for_current_cycle, | |
| content_preview=_preview(content_chunk), | |
| ) | |
| case RunEvent.tool_call_started.value: | |
| tool_event: ToolCallStartedEvent = run_event # type: ignore[assignment] | |
| tool = tool_event.tool | |
| if tool: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| if tool.tool_call_id: | |
| tool_start_times[tool.tool_call_id] = time.time() | |
| trace_stream( | |
| "tool_call_started", | |
| tool_name=tool.tool_name or "", | |
| tool_call_id=tool.tool_call_id, | |
| ) | |
| current_text_index = len(full_content) | |
| yield _build_tool_call_event(tool, current_text_index) | |
| case RunEvent.tool_call_completed.value: | |
| tool_event: ToolCallCompletedEvent = run_event # type: ignore[assignment] | |
| tool = tool_event.tool | |
| if tool: | |
| in_reasoning_phase = False | |
| should_break_next_thought = True | |
| reasoning_closed_for_current_cycle = False | |
| in_content_think_block = False | |
| inline_tool_trace_depth = 0 | |
| inline_protocol_tail = "" | |
| duration_ms = None | |
| if tool.tool_call_id and tool.tool_call_id in tool_start_times: | |
| duration_ms = int((time.time() - tool_start_times[tool.tool_call_id]) * 1000) | |
| trace_stream( | |
| "tool_call_completed", | |
| tool_name=tool.tool_name or "", | |
| tool_call_id=tool.tool_call_id, | |
| is_error=bool(tool.tool_call_error), | |
| ) | |
| tool_result_event, output = _build_tool_result_event( | |
| tool, | |
| duration_ms, | |
| self._normalize_tool_output, | |
| ) | |
| yield tool_result_event | |
| self._collect_search_sources(output, sources_map) | |
| case RunEvent.run_completed.value: | |
| saw_terminal_completion = True | |
| completed_content_fallback, _ = _extract_completed_content_and_output( | |
| run_event, | |
| completed_content_fallback or full_content, | |
| ) | |
| case RunEvent.run_error.value: | |
| error_msg = _extract_best_error_message(run_event) | |
| stream_had_error = True | |
| yield ErrorEvent(error=error_msg).model_dump() | |
| return | |
| else: | |
| # Simple event Fallback | |
| raw_content_chunk = _extract_text_chunk(run_event) | |
| raw_content_chunk, inline_tool_trace_depth, inline_protocol_tail, _ = _strip_inline_tool_protocol( | |
| raw_content_chunk, | |
| inline_tool_trace_depth, | |
| inline_protocol_tail, | |
| ) | |
| raw_reasoning = _extract_reasoning_chunk(run_event) | |
| content_segments, in_content_think_block = _split_content_by_think_tags( | |
| raw_content_chunk, | |
| in_content_think_block, | |
| ) | |
| content_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "text" | |
| ) | |
| inline_thought_chunk = "".join( | |
| seg_text for seg_type, seg_text in content_segments if seg_type == "thought" | |
| ) | |
| if raw_reasoning: | |
| for event in emit_thought_part(str(raw_reasoning)): | |
| yield event | |
| if inline_thought_chunk: | |
| for event in emit_thought_part(str(inline_thought_chunk)): | |
| yield event | |
| if content_chunk: | |
| for e in process_text(content_chunk): | |
| yield e | |
| def _build_fallback_messages(): | |
| if not saved_messages: | |
| return None | |
| updated_messages = list(saved_messages) | |
| for req in requirements: | |
| tool_exec = getattr(req, 'tool_execution', None) | |
| tool_name = getattr(tool_exec, 'tool_name', None) if tool_exec else None | |
| if tool_name != "interactive_form": | |
| continue | |
| tool_args = getattr(tool_exec, 'tool_args', {}) if tool_exec else {} | |
| tool_call_id = getattr(req, "id", None) or tool_args.get("id") or f"form-{int(time.time() * 1000)}" | |
| updated_messages.append({ | |
| "role": "assistant", | |
| "content": None, | |
| "tool_calls": [{ | |
| "id": tool_call_id, | |
| "type": "function", | |
| "function": { | |
| "name": "interactive_form", | |
| "arguments": json.dumps(tool_args or {}), | |
| } | |
| }], | |
| }) | |
| updated_messages.append({ | |
| "role": "tool", | |
| "content": json.dumps(field_values), | |
| "tool_call_id": tool_call_id, | |
| }) | |
| return updated_messages | |
| def _apply_field_values_to_requirements() -> list[Any]: | |
| """ | |
| Resolve pending HITL requirements with the submitted form payload. | |
| Prefer Agno-native requirement resolution so we can use acontinue_run(). | |
| """ | |
| resolved_requirements: list[Any] = [] | |
| serialized_values = json.dumps(field_values, ensure_ascii=False) | |
| for req in requirements or []: | |
| try: | |
| if hasattr(req, "needs_external_execution") and req.needs_external_execution: | |
| req.set_external_execution_result(serialized_values) | |
| tool_exec = getattr(req, "tool_execution", None) | |
| if tool_exec is not None and getattr(tool_exec, "result", None) is None: | |
| tool_exec.result = serialized_values | |
| elif hasattr(req, "needs_user_input") and req.needs_user_input: | |
| req.provide_user_input(field_values) | |
| elif hasattr(req, "needs_confirmation") and req.needs_confirmation: | |
| req.confirm() | |
| except Exception as req_err: | |
| logger.warning( | |
| f"[HITL] Failed to resolve requirement {getattr(req, 'id', None)} for run_id={run_id}: {req_err}" | |
| ) | |
| resolved_requirements.append(req) | |
| return resolved_requirements | |
| def _build_continuation_agent_input(base_messages: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| messages = self._inject_local_time_context(list(base_messages), request, []) | |
| system_messages = [m for m in messages if m.get("role") == "system"] | |
| chat_messages = [m for m in messages if m.get("role") != "system"] | |
| raw_turn_limit = request.context_turn_limit | |
| turn_limit = ( | |
| max(1, min(50, int(raw_turn_limit))) | |
| if isinstance(raw_turn_limit, int) and raw_turn_limit > 0 | |
| else 2 | |
| ) | |
| user_indices = [i for i, m in enumerate(chat_messages) if m.get("role") == "user"] | |
| user_turn_count = len(user_indices) | |
| if user_turn_count > turn_limit: | |
| cutoff_index = user_indices[-turn_limit] | |
| recent_history = chat_messages[cutoff_index:] | |
| else: | |
| recent_history = chat_messages | |
| should_inject_summary = bool(session_summary_text) and (user_turn_count > turn_limit) | |
| if should_inject_summary: | |
| summary_prompt = ( | |
| "\n\nSession memory summary:\n" | |
| "Here is a summary of the conversation so far. Use this to understand long-term context, " | |
| "but prioritize the details in the recent messages below.\n" | |
| f"{session_summary_text}\n" | |
| ) | |
| if system_messages: | |
| last_sys = system_messages[-1] | |
| if "Session memory summary:" not in str(last_sys.get("content", "")): | |
| last_sys["content"] = str(last_sys.get("content", "")) + summary_prompt | |
| else: | |
| system_messages.append({"role": "system", "content": summary_prompt}) | |
| return system_messages + recent_history | |
| resolved_requirements = _apply_field_values_to_requirements() | |
| stream = None | |
| try: | |
| restored_run_output = None | |
| if isinstance(saved_run_output, dict): | |
| try: | |
| restored_run_output = RunOutput.from_dict(dict(saved_run_output)) | |
| # Ensure external-execution tool results are concretely attached to | |
| # run_response.tools before acontinue_run() processes updates. | |
| if restored_run_output and isinstance(restored_run_output.tools, list): | |
| serialized_values = json.dumps(field_values, ensure_ascii=False) | |
| tool_result_by_id: dict[str, Any] = {} | |
| for req in resolved_requirements or []: | |
| tool_exec = getattr(req, "tool_execution", None) | |
| if not tool_exec: | |
| continue | |
| tcid = getattr(tool_exec, "tool_call_id", None) | |
| if tcid: | |
| tool_result_by_id[str(tcid)] = getattr(tool_exec, "result", None) | |
| for tool in restored_run_output.tools: | |
| if getattr(tool, "result", None) is not None: | |
| continue | |
| tcid = getattr(tool, "tool_call_id", None) | |
| if tcid and str(tcid) in tool_result_by_id: | |
| tool.result = tool_result_by_id[str(tcid)] | |
| continue | |
| if getattr(tool, "tool_name", None) == "interactive_form": | |
| tool.result = serialized_values | |
| # OpenAI-compatible tool flow requires an assistant message | |
| # with matching tool_calls before any tool message can appear. | |
| if restored_run_output and isinstance(restored_run_output.tools, list): | |
| existing_messages = ( | |
| list(restored_run_output.messages) | |
| if isinstance(restored_run_output.messages, list) | |
| else [] | |
| ) | |
| existing_tool_call_ids: set[str] = set() | |
| for msg in existing_messages: | |
| msg_tool_calls = getattr(msg, "tool_calls", None) or [] | |
| for tc in msg_tool_calls: | |
| if isinstance(tc, dict) and tc.get("id"): | |
| existing_tool_call_ids.add(str(tc.get("id"))) | |
| for tool in restored_run_output.tools: | |
| tool_call_id = getattr(tool, "tool_call_id", None) | |
| tool_name = getattr(tool, "tool_name", None) or "interactive_form" | |
| if not tool_call_id or str(tool_call_id) in existing_tool_call_ids: | |
| continue | |
| tool_args = getattr(tool, "tool_args", None) or {} | |
| existing_messages.append( | |
| Message( | |
| role="assistant", | |
| content="", | |
| tool_calls=[ | |
| { | |
| "id": str(tool_call_id), | |
| "type": "function", | |
| "function": { | |
| "name": str(tool_name), | |
| "arguments": json.dumps(tool_args, ensure_ascii=False), | |
| }, | |
| } | |
| ], | |
| ) | |
| ) | |
| existing_tool_call_ids.add(str(tool_call_id)) | |
| restored_run_output.messages = existing_messages | |
| if resolved_requirements: | |
| restored_run_output.requirements = resolved_requirements | |
| except Exception as restore_err: | |
| logger.warning( | |
| f"[HITL] Failed to restore RunOutput for run_id={run_id}, fallback to run_id path: {restore_err}" | |
| ) | |
| _log_verbose_info( | |
| f"[HITL] continuation restore mode: {'run_response' if restored_run_output is not None else 'run_id'} " | |
| f"(has_saved_run_output={isinstance(saved_run_output, dict)})" | |
| ) | |
| _log_verbose_info( | |
| f"Running HITL continuation via continue_run (run_id: {run_id}, session_id: {request.conversation_id})" | |
| ) | |
| if restored_run_output is not None: | |
| stream = agent.continue_run( | |
| run_response=restored_run_output, | |
| stream=True, | |
| stream_events=True, | |
| yield_run_output=True, | |
| user_id=request.user_id, | |
| session_id=request.conversation_id, | |
| output_schema=request.output_schema, | |
| ) | |
| else: | |
| stream = agent.continue_run( | |
| run_id=run_id, | |
| requirements=resolved_requirements, | |
| stream=True, | |
| stream_events=True, | |
| yield_run_output=True, | |
| user_id=request.user_id, | |
| session_id=request.conversation_id, | |
| output_schema=request.output_schema, | |
| ) | |
| except Exception as continue_err: | |
| logger.warning( | |
| f"[HITL] continue_run failed for run_id={run_id}, fallback to rebuilt arun: {continue_err}" | |
| ) | |
| fallback_messages = _build_fallback_messages() | |
| if not fallback_messages: | |
| yield ErrorEvent(error="Form session cannot be resumed (missing state)").model_dump() | |
| return | |
| agent_input = _build_continuation_agent_input(fallback_messages) | |
| stream = agent.arun( | |
| input=agent_input, | |
| stream=True, | |
| stream_events=True, | |
| user_id=request.user_id, | |
| session_id=request.conversation_id, | |
| output_schema=request.output_schema, | |
| ) | |
| async for event in _stream_events(stream): | |
| yield event | |
| _log_verbose_info( | |
| "[HITL] continuation stream summary: " | |
| f"run_id={run_id}, " | |
| f"events={continuation_event_count}, " | |
| f"last_event={last_event_name}, " | |
| f"last_event_type={last_event_type}, " | |
| f"last_event_run_id={last_event_run_id}, " | |
| f"saw_terminal_completion={saw_terminal_completion}, " | |
| f"stream_had_error={stream_had_error}, " | |
| f"paused_again={paused_again}" | |
| ) | |
| if stream_had_error: | |
| logger.warning(f"HITL run {run_id} ended with stream error; skipping done/cleanup") | |
| return | |
| if not saw_terminal_completion: | |
| logger.warning( | |
| f"HITL run {run_id} stream ended without terminal completion event; skipping done/cleanup" | |
| ) | |
| yield ErrorEvent(error="HITL continuation ended before completion").model_dump() | |
| return | |
| # Stream completed, send done event | |
| yield DoneEvent( | |
| content=full_content or completed_content_fallback, | |
| thought=full_thought.strip() or None, | |
| sources=list(sources_map.values()) or None, | |
| ).model_dump() | |
| # Clean up Supabase (skip if paused again for multi-form) | |
| if not paused_again: | |
| await hitl_storage.delete_pending_run(run_id) | |
| logger.info(f"HITL run {run_id} completed and cleaned up") | |
| else: | |
| logger.info(f"HITL run {run_id} paused again (multi-form), skipping cleanup") | |
| # 6. Trigger Async Session Summary Update | |
| if request.conversation_id and not paused_again: | |
| # For HITL resumption, extract the last turn's context from saved_messages | |
| # (matching normal flow: only last user + assistant, not full history) | |
| summary_messages = [] | |
| # Extract only the last user-assistant turn from saved_messages | |
| if saved_messages: | |
| last_user_idx = -1 | |
| for i in range(len(saved_messages) - 1, -1, -1): | |
| if saved_messages[i].get("role") == "user": | |
| last_user_idx = i | |
| break | |
| if last_user_idx >= 0: | |
| # Include from last user message to end of saved_messages | |
| # This captures: user question -> assistant form(s) -> any intermediate interactions | |
| for msg in saved_messages[last_user_idx:]: | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| # Only include user/assistant messages with content for summary | |
| if role in ("user", "assistant") and content: | |
| summary_messages.append({"role": role, "content": content}) | |
| # Add the form submission as user input (provides structured data context) | |
| form_submission_text = f"[Form Submitted] Values: {json.dumps(field_values)}" | |
| summary_messages.append({"role": "user", "content": form_submission_text}) | |
| # Add the new assistant response (based on form data) | |
| summary_messages.append({"role": "assistant", "content": full_content}) | |
| _log_verbose_info(f"Triggering async summary update for {request.conversation_id} (Resumed HITL flow, {len(summary_messages)} messages)") | |
| asyncio.create_task(update_session_summary( | |
| conversation_id=request.conversation_id, | |
| old_summary=old_summary_json, | |
| new_messages=summary_messages, | |
| database_provider=request.database_provider, | |
| memory_provider=request.memory_provider, | |
| memory_model=request.memory_model, | |
| memory_api_key=request.memory_api_key, | |
| memory_base_url=request.memory_base_url, | |
| summary_provider=request.summary_provider, | |
| summary_model=request.summary_model, | |
| summary_api_key=request.summary_api_key, | |
| summary_base_url=request.summary_base_url, | |
| rebuild_from_scratch=False, # HITL resumption is usually incremental | |
| )) | |
| except Exception as exc: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| logger.error(f"HITL continuation error: {exc}\n{error_details}") | |
| yield ErrorEvent(error=_extract_best_error_message(exc)).model_dump() | |
| def _collect_enabled_tool_names(self, request: StreamChatRequest) -> set[str]: | |
| names: list[str] = [] | |
| if request.provider != "gemini": | |
| for tool_id in request.tool_ids or []: | |
| names.append(resolve_tool_name(str(tool_id))) | |
| for tool_def in request.tools or []: | |
| if hasattr(tool_def, "model_dump"): | |
| tool_def = tool_def.model_dump() | |
| if not isinstance(tool_def, dict): | |
| continue | |
| name = tool_def.get("function", {}).get("name") or tool_def.get("name") | |
| if name: | |
| names.append(resolve_tool_name(str(name))) | |
| for user_tool in request.user_tools or []: | |
| if hasattr(user_tool, "name") and user_tool.name: | |
| names.append(str(user_tool.name)) | |
| return set(names) | |
| def _inject_local_time_context( | |
| self, | |
| messages: list[dict[str, Any]], | |
| request: StreamChatRequest, | |
| pre_events: list[dict[str, Any]], | |
| ) -> list[dict[str, Any]]: | |
| if not messages: | |
| return messages | |
| timezone = request.user_timezone or "UTC" | |
| locale = request.user_locale or "en-US" | |
| time_result = self._compute_local_time(timezone, locale) | |
| try: | |
| local_date = datetime.fromisoformat(str(time_result.get("iso"))).strftime("%Y-%m-%d") | |
| except Exception: | |
| local_date = str(time_result.get("formatted", "")).split(" ")[0] or datetime.now().strftime( | |
| "%Y-%m-%d" | |
| ) | |
| tz_label = str(time_result.get("timezone") or timezone) | |
| note = ( | |
| f"\n\n[Time note for this query]\n" | |
| f"Note: local date is {local_date} ({tz_label}). " | |
| "Interpret relative time terms using this date." | |
| ) | |
| updated: list[dict[str, Any]] = [] | |
| for msg in messages: | |
| if not isinstance(msg, dict) or msg.get("role") != "user": | |
| updated.append(msg) | |
| continue | |
| content = msg.get("content") | |
| if isinstance(content, str): | |
| if "[Time note for this query]" in content: | |
| updated.append(msg) | |
| else: | |
| updated.append({**msg, "content": f"{content}{note}"}) | |
| continue | |
| if isinstance(content, list): | |
| has_note = False | |
| for part in content: | |
| if isinstance(part, dict): | |
| if "[Time note for this query]" in str(part.get("text", "")) or "[Time note for this query]" in str(part.get("content", "")): | |
| has_note = True | |
| break | |
| if has_note: | |
| updated.append(msg) | |
| else: | |
| updated.append({**msg, "content": [*content, {"type": "text", "text": note}]}) | |
| continue | |
| updated.append(msg) | |
| return updated | |
| def _compute_local_time(self, timezone: str, locale: str) -> dict[str, Any]: | |
| try: | |
| tzinfo = ZoneInfo(timezone) | |
| now = datetime.now(tzinfo) | |
| except Exception: | |
| now = datetime.now() | |
| return { | |
| "timezone": timezone, | |
| "locale": locale, | |
| "formatted": now.strftime("%Y-%m-%d %H:%M:%S"), | |
| "iso": now.isoformat(), | |
| "now": now, | |
| } | |
| def _inject_tool_guidance( | |
| self, | |
| messages: list[dict[str, Any]], | |
| enabled_tools: set[str], | |
| request: Any | None = None, | |
| ) -> list[dict[str, Any]]: | |
| if not enabled_tools: | |
| return messages | |
| updated = list(messages) | |
| system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) | |
| no_tool_narration_guidance = ( | |
| "\n\n[OUTPUT DIRECTIVES]\n" | |
| "1. The main text (Answer) must contain ONLY the final helpful content and necessary explanations for the user.\n" | |
| "2. In the main text, NEVER describe that you are going to, are currently, or have already called any tools, searched, browsed, retrieved memory, or queried databases. These are internal traces (Trace).\n" | |
| "3. In the main text, do NOT refer to yourself performing actions (e.g., \"Let me check\", \"I will search\", \"I have retrieved\").\n" | |
| " Instead, directly present results as established information.\n" | |
| "4. If citing sources, use neutral phrasing such as: \"According to available data\", \"Based on public information\", \"According to the returned data\".\n" | |
| " Never mention tool names or the calling process.\n" | |
| "5. If information is insufficient, directly state the missing gap and ask clarifying questions.\n" | |
| " Do NOT say \"Let me check again\" or similar transitional action phrases.\n" | |
| "6. Once you start presenting the final answer, do not switch back to planning, searching, or tool-calling language.\n" | |
| "7. The final answer should begin naturally with the content itself, without meta commentary or transitional phrases.\n" | |
| ) | |
| updated = self._append_system_message(updated, no_tool_narration_guidance, system_index) | |
| system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) | |
| if "interactive_form" in enabled_tools: | |
| form_guidance = ( | |
| "\n[TOOL USE GUIDANCE]\n" | |
| "When you need to collect structured information from the user (e.g. preferences, requirements, " | |
| "booking details), use the 'interactive_form' tool.\n" | |
| "CRITICAL: DO NOT list questions in text or markdown. YOU MUST USE the 'interactive_form' tool to " | |
| "display fields.\n" | |
| "CRITICAL: If the user explicitly asks you to confirm something via a form, interactive form, or form tool, " | |
| "you MUST call 'interactive_form' in this response instead of asking in plain text.\n" | |
| "CRITICAL: If you need approval before installing a dependency, you MUST use 'interactive_form'. " | |
| "Do NOT ask yes/no approval questions in normal prose when the tool is available.\n" | |
| "CRITICAL: For approval forms, ask only for information the model does NOT already know. " | |
| "If skill_id or package_name are already known from the current tool result or failure context, " | |
| "do NOT include text inputs for them in the form.\n" | |
| "CRITICAL: For dependency installation approval, prefer a single required field such as " | |
| "'approve_install', and place the package name in the form title or description.\n" | |
| "Keep forms concise (3-6 fields).\n\n" | |
| "[SIMPLIFIED PAYLOAD]\n" | |
| "You may use a minimal payload to reduce tool-call size.\n" | |
| "- 'id' and 'title' are optional.\n" | |
| "- Each field may be minimal (e.g., {'name':'budget'}) or even a short string label.\n" | |
| "- Backend will auto-fill missing label/type defaults.\n\n" | |
| "[MANDATORY TEXT-FIRST RULE]\n" | |
| "CRITICAL: You MUST output meaningful introductory text BEFORE calling 'interactive_form'.\n" | |
| "- NEVER call 'interactive_form' as the very first thing in your response\n" | |
| "- ALWAYS explain the context, acknowledge the user's request, or provide guidance BEFORE the form\n" | |
| "- Minimum: Output at least 1-2 sentences before the form call\n" | |
| '- Example: "I can help you with that. To provide the best recommendation, please share some ' | |
| 'details below:"\n\n' | |
| "[SINGLE FORM PER RESPONSE]\n" | |
| "CRITICAL: You may call 'interactive_form' ONLY ONCE per response. Do NOT call it multiple times in " | |
| "the same answer.\n" | |
| "If you need to collect information, design ONE comprehensive form that gathers all necessary " | |
| "details at once.\n\n" | |
| "[MULTI-TURN INTERACTIONS]\n" | |
| "1. If the information from a submitted form is insufficient, you MAY present another " | |
| "'interactive_form' in your NEXT response (after the user submits the first form).\n" | |
| "2. LIMIT: Use at most 2-3 forms total across the entire conversation. Excessive questioning " | |
| "frustrates users.\n" | |
| "3. INTERLEAVING: You can place the form anywhere in your response. Output introductory text FIRST " | |
| "(e.g., \"I can help with that. Please provide some details below:\"), then call 'interactive_form' " | |
| "once.\n" | |
| "4. If the user has provided enough context through previous forms, proceed directly to the final " | |
| "answer without requesting more information." | |
| ) | |
| updated = self._append_system_message(updated, form_guidance, system_index) | |
| system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) | |
| search_tools_requiring_citations = { | |
| "Tavily_web_search", | |
| "Tavily_academic_search", | |
| "web_search_using_tavily", | |
| "web_search", | |
| "search_news", | |
| "search_exa", | |
| "search_arxiv_and_return_articles", | |
| "search_wikipedia", | |
| } | |
| if enabled_tools.intersection(search_tools_requiring_citations): | |
| citation_prompt = ( | |
| "\n\n[IMPORTANT] You have access to search tools. When you use them to answer a question, " | |
| "you MUST cite the search results in your answer using the format [1], [2], etc., " | |
| "corresponding to the index of the search result provided in the tool output. Do not fabricate " | |
| "citations." | |
| ) | |
| updated = self._append_system_message(updated, citation_prompt, system_index) | |
| if "local_time" in enabled_tools: | |
| local_time_guidance = ( | |
| "\n\n[TIME CONTEXT GUIDANCE]\n" | |
| "A local-date note is already appended to each user query before model execution.\n" | |
| "Do not call local_time again unless the user explicitly asks to refresh/recheck time." | |
| ) | |
| updated = self._append_system_message(updated, local_time_guidance, system_index) | |
| # [REINFORCED] System-level long-term memory guidance for the internal agent-memory skill | |
| if getattr(request, "enable_long_term_memory", False): | |
| memory_guidance = ( | |
| "\n\n[SYSTEM-LEVEL META-RULE: LONG-TERM MEMORY]\n" | |
| "You are equipped with a persistent memory layer ('agent-memory'). This is a core infrastructure skill that overrides situational constraints.\n" | |
| "1) MANDATORY PREREQUISITE: If a query involves personal history, preferences, or prior facts, searching memory is a mandatory PREREQUISITE. " | |
| "This MUST be executed BEFORE any other skill logic (e.g., Roleplay, Image Search) to avoid factual inconsistency.\n" | |
| "2) PERSISTENCE AGAINST OVERRIDES: This meta-rule remains in effect even if other skills (like roleplay) demand immediate responses. " | |
| "Retrieving ground-truth user context is the first step of all processing.\n" | |
| "3) USAGE: Call 'execute_skill_script' with skill_id='agent-memory'. Factual alignment via memory retrieval is non-negotiable." | |
| ) | |
| updated = self._append_system_message(updated, memory_guidance, system_index) | |
| return updated | |
| def _append_system_message( | |
| self, | |
| messages: list[dict[str, Any]], | |
| addition: str, | |
| system_index: int, | |
| ) -> list[dict[str, Any]]: | |
| updated = list(messages) | |
| if system_index != -1: | |
| updated[system_index] = { | |
| **updated[system_index], | |
| "content": f"{updated[system_index].get('content', '')}{addition}", | |
| } | |
| else: | |
| updated.insert(0, {"role": "system", "content": addition}) | |
| return updated | |
| def _normalize_tool_output(self, output: Any) -> Any: | |
| if hasattr(output, "model_dump"): | |
| try: | |
| return output.model_dump() | |
| except Exception: | |
| return str(output) | |
| if isinstance(output, dict): | |
| return output | |
| if isinstance(output, list): | |
| return [self._normalize_tool_output(item) for item in output] | |
| return output | |
| def _collect_search_sources(self, result: Any, sources_map: dict[str, Any]) -> None: | |
| def _extract_results(payload: Any) -> list[dict[str, Any]]: | |
| if isinstance(payload, list): | |
| return [item for item in payload if isinstance(item, dict)] | |
| if isinstance(payload, dict): | |
| for key in ("results", "items", "data", "sources", "articles", "news", "papers"): | |
| value = payload.get(key) | |
| if isinstance(value, list): | |
| return [item for item in value if isinstance(item, dict)] | |
| return [] | |
| results = _extract_results(result) | |
| if not results: | |
| return | |
| for item in results: | |
| url = ( | |
| item.get("url") | |
| or item.get("link") | |
| or item.get("uri") | |
| or item.get("source") | |
| or item.get("href") | |
| ) | |
| if not url or url in sources_map: | |
| continue | |
| title = ( | |
| item.get("title") | |
| or item.get("name") | |
| or item.get("headline") | |
| or item.get("paper_title") | |
| or "Unknown Source" | |
| ) | |
| snippet = ( | |
| item.get("content") | |
| or item.get("snippet") | |
| or item.get("summary") | |
| or item.get("abstract") | |
| or "" | |
| ) | |
| sources_map[url] = SourceEvent( | |
| uri=url, | |
| title=title, | |
| snippet=str(snippet)[:200], | |
| ).model_dump() | |
| async def _maybe_optimize_memories(self, agent: Agent, request: StreamChatRequest) -> None: | |
| return | |
| def _map_field_type_to_frontend(self, field_type: Any) -> str: | |
| """ | |
| Map Python/Agno field types to frontend form types. | |
| Args: | |
| field_type: Python type (class or string) | |
| Returns: | |
| Frontend form field type (text, number, checkbox, etc.) | |
| """ | |
| # Handle cases where field_type is a class/type instead of a string | |
| field_type_str = "" | |
| if isinstance(field_type, type): | |
| field_type_str = field_type.__name__ | |
| elif not isinstance(field_type, str): | |
| field_type_str = str(field_type) | |
| else: | |
| field_type_str = field_type | |
| type_mapping = { | |
| "str": "text", | |
| "int": "number", | |
| "float": "number", | |
| "bool": "checkbox", | |
| "date": "date", | |
| "time": "time", | |
| "datetime": "datetime", | |
| "list": "text", | |
| "dict": "textarea", | |
| } | |
| return type_mapping.get(field_type_str.lower(), "text") | |
| _stream_chat_service: StreamChatService | None = None | |
| def get_stream_chat_service() -> StreamChatService: | |
| global _stream_chat_service | |
| if _stream_chat_service is None: | |
| _stream_chat_service = StreamChatService() | |
| return _stream_chat_service | |