Spaces:
Running
Running
| """Stdout callback handler for pretty console output.""" | |
| from typing import Any | |
| from uuid import UUID | |
| from ..base import BaseCallbackHandler | |
| __all__ = ["StdoutCallbackHandler"] | |
| class StdoutCallbackHandler(BaseCallbackHandler): | |
| """ | |
| Prints events to stdout with colors and emojis. | |
| Similar to LangChain's verbose mode. | |
| """ | |
| def __init__( | |
| self, | |
| color: bool = True, | |
| show_prompts: bool = False, | |
| show_outputs: bool = True, | |
| truncate_length: int = 200, | |
| ): | |
| self.color = color | |
| self.show_prompts = show_prompts | |
| self.show_outputs = show_outputs | |
| self.truncate_length = truncate_length | |
| self._indent = 0 | |
| def _truncate(self, text: str) -> str: | |
| if len(text) <= self.truncate_length: | |
| return text | |
| return text[: self.truncate_length] + "..." | |
| def _print(self, message: str) -> None: | |
| _ = message # Used in subclass implementations | |
| " " * self._indent | |
| # === Run lifecycle === | |
| def on_run_start( | |
| self, | |
| *, | |
| run_id: UUID, | |
| query: str, | |
| num_agents: int = 0, | |
| execution_order: list[str] | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, query, kwargs) # Unused but required by interface | |
| self._print(f"π Run started: {num_agents} agents") | |
| if execution_order: | |
| self._print(f" Order: {' β '.join(execution_order)}") | |
| self._indent += 1 | |
| def on_run_end( | |
| self, | |
| *, | |
| run_id: UUID, | |
| output: str, | |
| success: bool = True, | |
| error: BaseException | None = None, | |
| total_tokens: int = 0, | |
| total_time_ms: float = 0.0, | |
| executed_agents: list[str] | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, output, executed_agents, kwargs) # Unused but required by interface | |
| self._indent = max(0, self._indent - 1) | |
| if success: | |
| self._print(f"β Run completed: {total_tokens} tokens, {total_time_ms:.0f}ms") | |
| else: | |
| self._print(f"β Run failed: {error}") | |
| # === Agent lifecycle === | |
| def on_agent_start( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| agent_name: str = "", | |
| step_index: int = 0, | |
| prompt: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, kwargs) # Unused but required by interface | |
| name = agent_name or agent_id | |
| self._print(f"βΆοΈ [{step_index}] {name} started") | |
| if self.show_prompts and prompt: | |
| self._print(f" Prompt: {self._truncate(prompt)}") | |
| self._indent += 1 | |
| def on_agent_end( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| output: str, | |
| agent_name: str = "", | |
| step_index: int = 0, | |
| tokens_used: int = 0, | |
| duration_ms: float = 0.0, | |
| is_final: bool = False, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, kwargs) # Unused but required by interface | |
| self._indent = max(0, self._indent - 1) | |
| name = agent_name or agent_id | |
| final_marker = " [FINAL]" if is_final else "" | |
| self._print(f"β [{step_index}] {name} completed: {tokens_used} tokens, {duration_ms:.0f}ms{final_marker}") | |
| if self.show_outputs and output: | |
| self._print(f" Output: {self._truncate(output)}") | |
| def on_agent_error( | |
| self, | |
| error: BaseException, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| error_type: str = "", | |
| will_retry: bool = False, | |
| attempt: int = 0, | |
| max_attempts: int = 0, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, error_type, kwargs) # Unused but required by interface | |
| retry_info = f" (retry {attempt}/{max_attempts})" if will_retry else "" | |
| self._print(f"β {agent_id} error: {error}{retry_info}") | |
| # === Retry === | |
| def on_retry( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| attempt: int, | |
| max_attempts: int = 0, | |
| delay_ms: float = 0.0, | |
| error: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, error, kwargs) # Unused but required by interface | |
| self._print(f"π {agent_id} retry {attempt}/{max_attempts} (delay: {delay_ms:.0f}ms)") | |
| # === Token streaming === | |
| def on_llm_new_token( | |
| self, | |
| token: str, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| is_first: bool = False, | |
| is_last: bool = False, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (token, run_id, agent_id, kwargs) # Unused but required by interface | |
| if is_first: | |
| pass | |
| if is_last: | |
| pass # newline | |
| # === Planning === | |
| def on_plan_created( | |
| self, | |
| *, | |
| run_id: UUID, | |
| num_steps: int, | |
| execution_order: list[str], | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, execution_order, kwargs) # Unused but required by interface | |
| self._print(f"π Plan: {num_steps} steps") | |
| def on_topology_changed( | |
| self, | |
| *, | |
| run_id: UUID, | |
| reason: str, | |
| old_remaining: list[str], | |
| new_remaining: list[str], | |
| change_count: int = 0, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, old_remaining, new_remaining, kwargs) # Unused but required by interface | |
| self._print(f"π Topology changed #{change_count}: {reason}") | |
| # === Pruning/Fallback === | |
| def on_prune( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str, | |
| reason: str, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, kwargs) # Unused but required by interface | |
| self._print(f"βοΈ Pruned {agent_id}: {reason}") | |
| def on_fallback( | |
| self, | |
| *, | |
| run_id: UUID, | |
| failed_agent_id: str, | |
| fallback_agent_id: str, | |
| reason: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, reason, kwargs) # Unused but required by interface | |
| self._print(f"π Fallback: {failed_agent_id} β {fallback_agent_id}") | |
| # === Parallel execution === | |
| def on_parallel_start( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_ids: list[str], | |
| group_index: int = 0, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, kwargs) # Unused but required by interface | |
| agents = ", ".join(agent_ids) | |
| self._print(f"β‘ Parallel group {group_index}: [{agents}]") | |
| self._indent += 1 | |
| def on_parallel_end( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_ids: list[str], | |
| group_index: int = 0, | |
| successful: list[str] | None = None, | |
| failed: list[str] | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, failed, kwargs) # Unused but required by interface | |
| self._indent = max(0, self._indent - 1) | |
| success_count = len(successful or []) | |
| total = len(agent_ids) | |
| self._print(f"β‘ Parallel group {group_index} done: {success_count}/{total}") | |
| # === Budget === | |
| def on_budget_warning( | |
| self, | |
| *, | |
| run_id: UUID, | |
| budget_type: str, | |
| current: float, | |
| limit: float, | |
| ratio: float = 0.0, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, current, limit, kwargs) # Unused but required by interface | |
| self._print(f"β οΈ Budget warning: {budget_type} at {ratio:.0%}") | |
| def on_budget_exceeded( | |
| self, | |
| *, | |
| run_id: UUID, | |
| budget_type: str, | |
| current: float, | |
| limit: float, | |
| action_taken: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, action_taken, kwargs) # Unused but required by interface | |
| self._print(f"π Budget exceeded: {budget_type} ({current:.0f}/{limit:.0f})") | |
| # === Tool lifecycle === | |
| def on_tool_start( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str = "", | |
| tool_name: str, | |
| action: str = "", | |
| arguments: dict[str, Any] | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, agent_id, kwargs) # Unused but required by interface | |
| args_preview = "" | |
| if arguments: | |
| args_str = str(arguments) | |
| args_preview = f" ({self._truncate(args_str)})" | |
| self._print(f"π§ Tool {tool_name}.{action} started{args_preview}") | |
| self._indent += 1 | |
| def on_tool_end( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str = "", | |
| tool_name: str, | |
| action: str = "", | |
| success: bool = True, | |
| output_size: int = 0, | |
| duration_ms: float = 0.0, | |
| result_summary: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, agent_id, kwargs) # Unused but required by interface | |
| self._indent = max(0, self._indent - 1) | |
| icon = "β " if success else "β" | |
| summary = f": {result_summary}" if result_summary else "" | |
| self._print(f"{icon} Tool {tool_name}.{action} done ({duration_ms:.0f}ms, {output_size}b){summary}") | |
| def on_tool_error( | |
| self, | |
| *, | |
| run_id: UUID, | |
| agent_id: str = "", | |
| tool_name: str, | |
| action: str = "", | |
| error_type: str = "", | |
| error_message: str = "", | |
| **kwargs: Any, | |
| ) -> None: | |
| _ = (run_id, agent_id, kwargs) # Unused but required by interface | |
| self._print(f"β Tool {tool_name}.{action} error: {error_type}: {self._truncate(error_message)}") | |