Spaces:
Running
Running
| """ | |
| Simple generation services (title, daily tip, related questions, agent selection). | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime | |
| import re | |
| from typing import Any | |
| from zoneinfo import ZoneInfo | |
| from ..models.generation import ( | |
| DailyTipResponse, | |
| TitleSpaceResponse, | |
| ) | |
| from ..models.stream_chat import StreamChatRequest | |
| from .llm_utils import run_agent_completion, safe_json_parse | |
| def _get_output_value(output_obj: Any, *keys: str) -> Any: | |
| if not output_obj: | |
| return None | |
| if isinstance(output_obj, dict): | |
| for key in keys: | |
| if key in output_obj: | |
| return output_obj.get(key) | |
| return None | |
| for key in keys: | |
| if hasattr(output_obj, key): | |
| return getattr(output_obj, key) | |
| return None | |
| def _build_time_context(user_timezone: str | None, user_locale: str | None) -> str: | |
| timezone = user_timezone or "UTC" | |
| locale = user_locale or "en-US" | |
| try: | |
| tzinfo = ZoneInfo(timezone) | |
| now = datetime.now(tzinfo) | |
| except Exception: | |
| timezone = "UTC" | |
| now = datetime.utcnow() | |
| formatted = now.strftime("%Y-%m-%d %H:%M:%S") | |
| return ( | |
| "\n\nLocal time context:\n" | |
| f"##today local time: {formatted} ({timezone})\n" | |
| f"locale: {locale}\n" | |
| f"iso: {now.isoformat()}\n" | |
| ) | |
| def _append_time_context( | |
| messages: list[dict[str, Any]], | |
| user_timezone: str | None, | |
| user_locale: str | None, | |
| reference_text: str | None = None, | |
| ) -> list[dict[str, Any]]: | |
| time_context = _build_time_context(user_timezone, user_locale) | |
| if not messages: | |
| return [{"role": "system", "content": time_context.strip()}] | |
| updated = list(messages) | |
| system_index = next((i for i, m in enumerate(updated) if m.get("role") == "system"), -1) | |
| if system_index != -1: | |
| updated[system_index] = { | |
| **updated[system_index], | |
| "content": f"{updated[system_index].get('content', '')}{time_context}", | |
| } | |
| else: | |
| updated.insert(0, {"role": "system", "content": time_context.strip()}) | |
| return updated | |
| def _normalize_title_text_output(raw: str) -> str: | |
| text = str(raw or "").strip() | |
| if not text: | |
| return "" | |
| # Common wrappers from models | |
| text = text.replace("\r", "\n").strip() | |
| first_line = next((line.strip() for line in text.split("\n") if line.strip()), "") | |
| text = first_line or text | |
| for prefix in ("title:", "标题:", "标题:", "- ", "* ", "# "): | |
| if text.lower().startswith(prefix.lower()): | |
| text = text[len(prefix):].strip() | |
| for prefix in ( | |
| "suggested title:", | |
| "suggested title:", | |
| "generated title:", | |
| "generated title:", | |
| "conversation title:", | |
| "conversation title:", | |
| ): | |
| if text.lower().startswith(prefix.lower()): | |
| text = text[len(prefix):].strip() | |
| text = text.replace("**", "").replace("__", "").strip() | |
| text = text.strip().strip("\"'“”‘’") | |
| text = re.sub(r"^(?:user|assistant|system)\s*:\s*", "", text, flags=re.IGNORECASE) | |
| text = re.sub( | |
| r"^(?:the user(?:'s)? (?:is asking|asks|wants|needs) (?:about|for)\s+|" | |
| r"this conversation is about\s+|the conversation is about\s+|" | |
| r"topic\s*:\s*|summary\s*:\s*)", | |
| "", | |
| text, | |
| flags=re.IGNORECASE, | |
| ) | |
| text = re.split(r"[.?!。!?]\s*", text, maxsplit=1)[0].strip() | |
| text = re.sub(r"\s+", " ", text).strip(" ,;:[](){}") | |
| text = text.strip().strip("\"'“”‘’") | |
| return text[:120].strip() | |
| def _truncate_title_phrase(text: str, max_words: int = 5, max_chars: int = 36) -> str: | |
| normalized = re.sub(r"\s+", " ", str(text or "")).strip() | |
| if not normalized: | |
| return "" | |
| if " " not in normalized: | |
| return normalized[:max_chars].strip(" ,;:-") | |
| words = normalized.split(" ") | |
| return " ".join(words[:max_words]).strip(" ,;:-") | |
| def _finalize_title(raw: Any, fallback: str = "New Conversation") -> str: | |
| title = _normalize_title_text_output(str(raw or "")) | |
| if not title: | |
| return fallback | |
| disallowed_starts = ( | |
| "here is", | |
| "here's", | |
| "i can", | |
| "i would", | |
| "sure", | |
| "certainly", | |
| "let me", | |
| "user:", | |
| "assistant:", | |
| "system:", | |
| ) | |
| lowered = title.lower() | |
| if lowered.startswith(disallowed_starts): | |
| return fallback | |
| title = _truncate_title_phrase(title) | |
| return title or fallback | |
| def _title_prompt(task_suffix: str = "") -> str: | |
| suffix = f"\n{task_suffix.strip()}" if task_suffix and task_suffix.strip() else "" | |
| return ( | |
| "Generate a short conversation title from the provided text.\n" | |
| "The text may be a user message or a short transcript.\n\n" | |
| "Rules:\n" | |
| "- Return only the title.\n" | |
| "- Maximum 5 words.\n" | |
| "- Do not answer the user.\n" | |
| "- Do not explain.\n" | |
| "- Do not use quotes or markdown." | |
| f"{suffix}\n\n" | |
| "Return only the title text." | |
| ) | |
| def _extract_single_emoji_text(raw: str) -> str: | |
| import re | |
| text = str(raw or "").strip() | |
| if not text: | |
| return "" | |
| # JSON / key-value fallback parsing still allowed, but not required. | |
| parsed = safe_json_parse(text) | |
| if isinstance(parsed, dict): | |
| emojis = parsed.get("emojis") | |
| if isinstance(emojis, list) and emojis: | |
| return str(emojis[0]).strip() | |
| if isinstance(emojis, str) and emojis.strip(): | |
| return emojis.strip() | |
| m = re.search(r"emojis=\[[\"']?(.*?)[\"']?\]", text) | |
| if m: | |
| return m.group(1).strip() | |
| # Extract first likely emoji grapheme-ish token | |
| # Covers most symbols/pictographs and optional variation selector / ZWJ tails. | |
| m = re.search( | |
| r"([\u2600-\u27BF\U0001F300-\U0001FAFF](?:\uFE0F)?(?:\u200D[\u2600-\u27BF\U0001F300-\U0001FAFF](?:\uFE0F)?)*)", | |
| text, | |
| ) | |
| if m: | |
| return m.group(1).strip() | |
| # Last resort: first non-empty token | |
| token = next((part for part in re.split(r"\s+", text) if part), "") | |
| return token[:8].strip() | |
| async def generate_daily_tip( | |
| *, | |
| provider: str, | |
| language: str | None, | |
| category: str | None, | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> str: | |
| language_block = f"\n\n## Language\nReply in {language}." if language else "" | |
| category_block = f"\n\n## Category\n{category}" if category else "" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "## Task\n" | |
| "Generate a short, practical tip for today. Keep it to 1-2 sentences and avoid emojis." | |
| f"{category_block}{language_block}\n\n" | |
| "## Output\n" | |
| "Return only the tip text." | |
| ), | |
| }, | |
| {"role": "user", "content": "Daily tip."}, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, "daily tip") | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| output_schema=DailyTipResponse, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| output_obj = result.get("output") | |
| tip = None | |
| if output_obj: | |
| tip = _get_output_value(output_obj, "tip") | |
| if not tip: | |
| parsed = safe_json_parse(content) | |
| if isinstance(parsed, dict): | |
| tip = parsed.get("tip") | |
| if not tip and thought: | |
| parsed = safe_json_parse(thought) | |
| if isinstance(parsed, dict): | |
| tip = parsed.get("tip") | |
| if not tip: | |
| tip = content # Fallback for plain text | |
| # Cleanup if model included "tip='...'" | |
| if tip and isinstance(tip, str) and tip.startswith("tip="): | |
| import re | |
| match = re.search(r"tip=['\"](.*?)['\"]", tip) | |
| if match: | |
| tip = match.group(1) | |
| return (tip or content or "").strip() | |
| async def generate_title( | |
| *, | |
| provider: str, | |
| first_message: str, | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> dict[str, Any]: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": _title_prompt(), | |
| }, | |
| {"role": "user", "content": first_message}, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, first_message) | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=None, | |
| output_schema=None, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| title = "" | |
| parsed = safe_json_parse(content) or {} | |
| if isinstance(parsed, dict): | |
| title = str(parsed.get("title") or "").strip() | |
| if not title and thought: | |
| parsed_thought = safe_json_parse(thought) or {} | |
| if isinstance(parsed_thought, dict): | |
| title = str(parsed_thought.get("title") or "").strip() | |
| # Robust cleanup for models like GLM | |
| if (not title or str(title).strip().startswith("title=")) and content: | |
| import re | |
| m_title = re.search(r"title=['\"](.*?)['\"]", content, flags=re.DOTALL) | |
| if m_title: | |
| title = m_title.group(1) | |
| # Final normalization | |
| title = _finalize_title(title or content) | |
| return {"title": title} | |
| async def generate_emoji( | |
| *, | |
| provider: str, | |
| first_message: str, | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> dict[str, Any]: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "## Task\n" | |
| "Select exactly 1 emoji that best matches the user's message topic.\n" | |
| "Do NOT answer the message.\n\n" | |
| "## Output\n" | |
| "Return only 1 emoji character. No JSON. No words." | |
| ), | |
| }, | |
| {"role": "user", "content": first_message}, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, first_message) | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=None, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| emoji = _extract_single_emoji_text(content) | |
| if not emoji: | |
| emoji = _extract_single_emoji_text(thought) | |
| return {"emojis": [emoji] if emoji else []} | |
| async def generate_title_and_space( | |
| *, | |
| provider: str, | |
| first_message: str, | |
| spaces: list[dict[str, Any]], | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> dict[str, Any]: | |
| space_labels = ", ".join([str(s.get("label", "")).strip() for s in spaces or []]).strip() | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| f"{_title_prompt()}\n\n" | |
| "## Additional Tasks\n" | |
| f'1. Select the most appropriate space from the following list: [{space_labels}]. ' | |
| "If none fit well, return null.\n" | |
| "2. Select 1 emoji that best matches the conversation.\n\n" | |
| "## Output\n" | |
| 'Return the result as a JSON object with keys "title", "spaceLabel", and "emojis".' | |
| ), | |
| }, | |
| {"role": "user", "content": first_message}, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, first_message) | |
| response_format = {"type": "json_object"} if provider != "gemini" else None | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| output_schema=TitleSpaceResponse, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| # Try structured output first | |
| output_obj = result.get("output") | |
| title = None | |
| space_label = None | |
| emojis = [] | |
| # If it's a dict or model, extract directly | |
| if output_obj: | |
| title = _get_output_value(output_obj, "title") | |
| space_label = _get_output_value(output_obj, "space_label", "spaceLabel") | |
| emojis = _get_output_value(output_obj, "emojis") or [] | |
| # If above failed, fallback to manual parsing of content | |
| if not title: | |
| parsed = safe_json_parse(content) or {} | |
| if isinstance(parsed, dict): | |
| title = parsed.get("title") | |
| space_label = space_label or parsed.get("spaceLabel") or parsed.get("space_label") | |
| emojis = emojis or parsed.get("emojis") | |
| if not title and thought: | |
| parsed = safe_json_parse(thought) or {} | |
| if isinstance(parsed, dict): | |
| title = parsed.get("title") | |
| space_label = space_label or parsed.get("spaceLabel") or parsed.get("space_label") | |
| emojis = emojis or parsed.get("emojis") | |
| # Robust cleanup for models like GLM (even if parsing failed partially) | |
| if (not title or (isinstance(title, str) and title.strip().startswith("title="))) and content: | |
| import re | |
| m_title = re.search(r"title=['\"](.*?)['\"]", content, flags=re.DOTALL) | |
| if m_title: | |
| title = m_title.group(1) | |
| if not emojis: | |
| m_emojis = re.search(r"emojis=\[[\"']?(.*?)[\"']?\]", content) | |
| if m_emojis: | |
| emojis = [m_emojis.group(1)] | |
| if not space_label: | |
| m_space = re.search(r"space_?label=['\"](.*?)['\"]", content, re.IGNORECASE) or re.search(r"space=['\"](.*?)['\"]", content, re.IGNORECASE) | |
| if m_space: | |
| space_label = m_space.group(1) | |
| # Normalize values | |
| title = _finalize_title(title or content) | |
| # IMPROVED matching: Case-insensitive and trimmed | |
| search_label = str(space_label or "").strip().lower() | |
| selected_space = next( | |
| (s for s in spaces if s.get("label", "").strip().lower() == search_label), | |
| None | |
| ) | |
| if not isinstance(emojis, list): | |
| emojis = [] | |
| emojis = [str(item).strip().strip("'").strip('"') for item in emojis if str(item).strip()][:1] | |
| return {"title": title, "space": selected_space, "emojis": emojis} | |
| def _sanitize_option_text(text: Any) -> str: | |
| return ( | |
| str(text or "") | |
| .replace("\r", " ") | |
| .replace("\n", " ") | |
| .replace("{", "") | |
| .replace("}", "") | |
| .split() | |
| ) | |
| async def generate_title_space_and_agent( | |
| *, | |
| provider: str, | |
| first_message: str, | |
| spaces_with_agents: list[dict[str, Any]], | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> dict[str, Any]: | |
| space_lines = [] | |
| for space in spaces_with_agents or []: | |
| agent_entries = [] | |
| for agent in space.get("agents") or []: | |
| if isinstance(agent, str): | |
| agent_entries.append({"name": agent}) | |
| else: | |
| agent_entries.append( | |
| {"name": agent.get("name", ""), "description": agent.get("description", "")} | |
| ) | |
| agent_tokens = [] | |
| for agent in agent_entries: | |
| name = " ".join(_sanitize_option_text(agent.get("name"))) | |
| description = " ".join(_sanitize_option_text(agent.get("description"))) | |
| if name and description: | |
| agent_tokens.append(f"{name} - {description}") | |
| elif name: | |
| agent_tokens.append(name) | |
| space_label = " ".join(_sanitize_option_text(space.get("label"))) | |
| space_description = " ".join(_sanitize_option_text(space.get("description"))) | |
| space_token = f"{space_label} - {space_description}" if space_description else space_label | |
| space_lines.append(f"{space_token}:{{{','.join(agent_tokens)}}}") | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| f"{_title_prompt()}\n\n" | |
| "## Additional Tasks\n" | |
| "1. Select the most appropriate space from the list below and return its spaceLabel.\n" | |
| "2. If the chosen space has agents, select the best matching agent by agentName. Otherwise return null.\n" | |
| "3. Select 1 emoji that best matches the conversation.\n\n" | |
| "## Output\n" | |
| 'Return the result as JSON with keys "title", "spaceLabel", "agentName", and "emojis". ' | |
| '"emojis" must be an array with 1 emoji character.' | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{first_message}\n\nSpaces and agents:\n" + "\n".join(space_lines), | |
| }, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, first_message) | |
| response_format = {"type": "json_object"} if provider != "gemini" else None | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| # Avoid provider-side grammar cache collisions for lightweight JSON routes. | |
| # We parse JSON content locally instead of forcing native structured output. | |
| output_schema=None, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| # Try structured output first | |
| output_obj = result.get("output") | |
| title = None | |
| space_label = None | |
| agent_name = None | |
| emojis = [] | |
| # If it's a dict or model, extract directly | |
| if output_obj: | |
| title = _get_output_value(output_obj, "title") | |
| space_label = _get_output_value(output_obj, "space_label", "spaceLabel") | |
| agent_name = _get_output_value(output_obj, "agent_name", "agentName") | |
| emojis = _get_output_value(output_obj, "emojis") or [] | |
| # Fallback to manual parsing of content | |
| if not title: | |
| parsed = safe_json_parse(content) or {} | |
| if isinstance(parsed, dict): | |
| title = parsed.get("title") | |
| space_label = space_label or parsed.get("spaceLabel") or parsed.get("space_label") | |
| agent_name = agent_name or parsed.get("agentName") or parsed.get("agent_name") | |
| emojis = emojis or parsed.get("emojis") | |
| if not title and thought: | |
| parsed = safe_json_parse(thought) or {} | |
| if isinstance(parsed, dict): | |
| title = parsed.get("title") | |
| space_label = space_label or parsed.get("spaceLabel") or parsed.get("space_label") | |
| agent_name = agent_name or parsed.get("agentName") or parsed.get("agent_name") | |
| emojis = emojis or parsed.get("emojis") | |
| # Robust cleanup for models like GLM | |
| if (not title or (isinstance(title, str) and title.strip().startswith("title="))) and content: | |
| import re | |
| m_title = re.search(r"title=['\"](.*?)['\"]", content, flags=re.DOTALL) | |
| if m_title: | |
| title = m_title.group(1) | |
| if not emojis: | |
| m_emojis = re.search(r"emojis=\[[\"']?(.*?)[\"']?\]", content) | |
| if m_emojis: | |
| emojis = [m_emojis.group(1)] | |
| if not space_label: | |
| m_space = re.search(r"space_?label=['\"](.*?)['\"]", content, re.IGNORECASE) or re.search(r"space=['\"](.*?)['\"]", content, re.IGNORECASE) | |
| if m_space: | |
| space_label = m_space.group(1) | |
| if not agent_name: | |
| m_agent = re.search(r"agent_?name=['\"](.*?)['\"]", content, re.IGNORECASE) or re.search(r"agent=['\"](.*?)['\"]", content, re.IGNORECASE) | |
| if m_agent: | |
| agent_name = m_agent.group(1) | |
| # Normalize values | |
| title = _finalize_title(title or content) | |
| # Strip descriptions if model included them (e.g. "Coding - Help" -> "Coding") | |
| if space_label and " - " in space_label: | |
| space_label = space_label.split(" - ")[0].strip() | |
| if agent_name and " - " in agent_name: | |
| agent_name = agent_name.split(" - ")[0].strip() | |
| if not isinstance(emojis, list): | |
| emojis = [] | |
| emojis = [str(item).strip().strip("'").strip('"') for item in emojis if str(item).strip()][:1] | |
| return { | |
| "title": title, | |
| "spaceLabel": space_label, | |
| "agentName": agent_name, | |
| "emojis": emojis, | |
| } | |
| async def generate_space_and_agent( | |
| *, | |
| provider: str, | |
| first_message: str, | |
| spaces_with_agents: list[dict[str, Any]], | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> dict[str, Any]: | |
| space_lines: list[str] = [] | |
| for space in spaces_with_agents or []: | |
| agent_entries = [] | |
| for agent in space.get("agents") or []: | |
| if isinstance(agent, str): | |
| agent_entries.append({"name": agent}) | |
| else: | |
| agent_entries.append( | |
| {"name": agent.get("name", ""), "description": agent.get("description", "")} | |
| ) | |
| agent_tokens = [] | |
| for agent in agent_entries: | |
| name = " ".join(_sanitize_option_text(agent.get("name"))) | |
| description = " ".join(_sanitize_option_text(agent.get("description"))) | |
| if name and description: | |
| agent_tokens.append(f"{name} - {description}") | |
| elif name: | |
| agent_tokens.append(name) | |
| space_label = " ".join(_sanitize_option_text(space.get("label"))) | |
| space_description = " ".join(_sanitize_option_text(space.get("description"))) | |
| space_token = f"{space_label} - {space_description}" if space_description else space_label | |
| space_lines.append(f"{space_token}:{{{','.join(agent_tokens)}}}") | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a helpful assistant.\n" | |
| "## Task\n" | |
| "1. Select the most appropriate space from the list below and return its spaceLabel.\n" | |
| "2. If the chosen space has agents, select the best matching agent by agentName. Otherwise return null.\n\n" | |
| "## Output\n" | |
| 'Return the result as JSON with keys "spaceLabel" and "agentName".' | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{first_message}\n\nSpaces and agents:\n" + "\n".join(space_lines), | |
| }, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, first_message) | |
| response_format = {"type": "json_object"} if provider != "gemini" else None | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| # Avoid provider-side grammar cache collisions for lightweight JSON routes. | |
| # We parse JSON content locally instead of forcing native structured output. | |
| output_schema=None, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| thought = result.get("thought", "").strip() | |
| output_obj = result.get("output") | |
| space_label = None | |
| agent_name = None | |
| if output_obj: | |
| space_label = _get_output_value(output_obj, "space_label", "spaceLabel") | |
| agent_name = _get_output_value(output_obj, "agent_name", "agentName") | |
| if not space_label: | |
| parsed = safe_json_parse(content) or {} | |
| if isinstance(parsed, dict): | |
| space_label = parsed.get("spaceLabel") or parsed.get("space_label") | |
| agent_name = agent_name or parsed.get("agentName") or parsed.get("agent_name") | |
| if not space_label and thought: | |
| parsed = safe_json_parse(thought) or {} | |
| if isinstance(parsed, dict): | |
| space_label = parsed.get("spaceLabel") or parsed.get("space_label") | |
| agent_name = agent_name or parsed.get("agentName") or parsed.get("agent_name") | |
| if space_label and " - " in str(space_label): | |
| space_label = str(space_label).split(" - ")[0].strip() | |
| if agent_name and " - " in str(agent_name): | |
| agent_name = str(agent_name).split(" - ")[0].strip() | |
| return { | |
| "spaceLabel": space_label, | |
| "agentName": agent_name, | |
| } | |
| async def generate_agent_for_auto( | |
| *, | |
| provider: str, | |
| user_message: str, | |
| current_space: dict[str, Any] | None, | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> str | None: | |
| agent_entries = [] | |
| for agent in (current_space or {}).get("agents") or []: | |
| if isinstance(agent, str): | |
| agent_entries.append({"name": agent}) | |
| else: | |
| agent_entries.append( | |
| {"name": agent.get("name", ""), "description": agent.get("description", "")} | |
| ) | |
| agent_tokens = [] | |
| for agent in agent_entries: | |
| name = " ".join(_sanitize_option_text(agent.get("name"))) | |
| description = " ".join(_sanitize_option_text(agent.get("description"))) | |
| if name and description: | |
| agent_tokens.append(f"{name} - {description}") | |
| elif name: | |
| agent_tokens.append(name) | |
| space_label = (current_space or {}).get("label") or "Default" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a helpful assistant.\n" | |
| f"## Task\nSelect the best matching agent for the user's message from the \"{space_label}\" space. " | |
| "Consider the agent's name and description to determine which one is most appropriate. " | |
| "If no agent is a good match, return null.\n\n" | |
| "## Output\nReturn the result as JSON with key \"agentName\" (agent name only, or null)." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{user_message}\n\nAvailable agents in {space_label}:\n" + "\n".join(agent_tokens), | |
| }, | |
| ] | |
| messages = _append_time_context(messages, user_timezone, user_locale, user_message) | |
| response_format = {"type": "json_object"} if provider != "gemini" else None | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| # Avoid provider-side grammar cache collisions for this lightweight route. | |
| # We parse JSON content locally instead of forcing native structured output. | |
| output_schema=None, | |
| ) | |
| result = await run_agent_completion(request) | |
| output_obj = result.get("output") | |
| if output_obj: | |
| selected_agent = _get_output_value(output_obj, "agentName", "agent_name") | |
| if selected_agent: | |
| return selected_agent | |
| parsed = safe_json_parse(result.get("content", "")) or {} | |
| if isinstance(parsed, dict): | |
| return parsed.get("agentName") or parsed.get("agent_name") or None | |
| thought_parsed = safe_json_parse(result.get("thought", "")) or {} | |
| if isinstance(thought_parsed, dict): | |
| return thought_parsed.get("agentName") or thought_parsed.get("agent_name") or None | |
| return None | |
| def _normalize_related_questions(parsed: Any) -> list[str]: | |
| def _sanitize_related_question_text(value: Any) -> str: | |
| if not isinstance(value, str): | |
| return "" | |
| cleaned = " ".join(value.split()).strip() | |
| if not cleaned: | |
| return "" | |
| lowered = cleaned.lower() | |
| if ( | |
| "<|tool_" in lowered | |
| or "tool_calls_section" in lowered | |
| or "tool_call_begin" in lowered | |
| or "tool_call_end" in lowered | |
| or "endgroup" in lowered | |
| ): | |
| return "" | |
| if cleaned in {"{", "}", "[", "]"}: | |
| return "" | |
| if len(cleaned) > 240: | |
| return "" | |
| return cleaned | |
| def _normalize_list(items: list[Any]) -> list[str]: | |
| out: list[str] = [] | |
| seen: set[str] = set() | |
| for item in items or []: | |
| normalized = _sanitize_related_question_text(item) | |
| if not normalized or normalized in seen: | |
| continue | |
| seen.add(normalized) | |
| out.append(normalized) | |
| if len(out) >= 3: | |
| break | |
| return out | |
| if isinstance(parsed, list): | |
| return _normalize_list(parsed) | |
| if isinstance(parsed, dict): | |
| if isinstance(parsed.get("questions"), list): | |
| return _normalize_list(parsed["questions"]) | |
| if isinstance(parsed.get("related_questions"), list): | |
| return _normalize_list(parsed["related_questions"]) | |
| return [] | |
| async def generate_related_questions( | |
| *, | |
| provider: str, | |
| messages: list[dict[str, Any]], | |
| api_key: str, | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_ids: list[str] | None = None, | |
| user_tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| search_provider: str | None = None, | |
| tavily_api_key: str | None = None, | |
| user_timezone: str | None = None, | |
| user_locale: str | None = None, | |
| ) -> list[str]: | |
| prompt_messages = [ | |
| *(messages or []), | |
| { | |
| "role": "user", | |
| "content": ( | |
| "Based on our conversation, suggest 3 short, relevant follow-up questions I (the user) might ask you next. " | |
| "The questions MUST be from the user's perspective (e.g., 'How does X work?', 'Tell me more about Y'). " | |
| "Do NOT generate questions from the assistant's perspective (e.g., 'Do you want to know about...?'). " | |
| "Return the result as JSON with a 'questions' key containing the array of strings." | |
| ), | |
| }, | |
| ] | |
| prompt_messages = _append_time_context(prompt_messages, user_timezone, user_locale, "related questions") | |
| response_format = {"type": "json_object"} if provider != "gemini" else None | |
| request = StreamChatRequest( | |
| provider=provider, | |
| apiKey=api_key, | |
| baseUrl=base_url, | |
| model=model, | |
| messages=prompt_messages, | |
| tools=tools or [], | |
| toolChoice=tool_choice, | |
| toolIds=tool_ids or [], | |
| userTools=user_tools or [], | |
| responseFormat=response_format, | |
| # Avoid provider-side grammar cache collisions for this lightweight route. | |
| # We parse JSON content locally instead of forcing native structured output. | |
| output_schema=None, | |
| thinking=thinking, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| contextMessageLimit=context_message_limit, | |
| searchProvider=search_provider, | |
| tavilyApiKey=tavily_api_key, | |
| skipDefaultTools=True, | |
| stream=True, | |
| ) | |
| result = await run_agent_completion(request) | |
| content = result.get("content", "").strip() | |
| thought = result.get("thought", "").strip() | |
| output_obj = result.get("output") | |
| questions = [] | |
| if output_obj: | |
| questions = _get_output_value(output_obj, "questions") or [] | |
| if not questions: | |
| parsed = safe_json_parse(content) | |
| # If model returned a list directly, use it | |
| if isinstance(parsed, list): | |
| questions = parsed | |
| else: | |
| questions = _normalize_related_questions(parsed) | |
| if not questions and thought: | |
| thought_parsed = safe_json_parse(thought) | |
| if isinstance(thought_parsed, list): | |
| questions = thought_parsed | |
| else: | |
| questions = _normalize_related_questions(thought_parsed) | |
| # Robust cleanup for weird formats | |
| if not questions and content: | |
| import re | |
| # Try to find questions=["..." or questions=['...'] | |
| match = re.search(r"questions=\[([\s\S]*?)\]", content) | |
| if match: | |
| raw_list = match.group(1) | |
| questions = [q.strip().strip("'").strip('"') for q in re.findall(r"['\"](.*?)['\"]", raw_list)] | |
| # New Fallback: Try to find numbered lists like 1. "Question" or 1. **Question** | |
| if not questions: | |
| # Matches strings starting with number, dot, maybe whitespace, maybe quotes/stars, then text, then closing quotes/stars | |
| numbered_matches = re.findall(r"^\d+\.\s+[*\"']+(.*?)[*\"']+", content, re.MULTILINE) | |
| if numbered_matches: | |
| questions = [q.strip() for q in numbered_matches] | |
| else: | |
| # Last ditch: simplified numbered list without quotes | |
| numbered_matches_simple = re.findall(r"^\d+\.\s+(.*?)$", content, re.MULTILINE) | |
| if numbered_matches_simple: | |
| questions = [q.strip().strip('*').strip('"').strip("'") for q in numbered_matches_simple] | |
| return _normalize_related_questions(questions) | |