Spaces:
Paused
Paused
| # -*- coding: utf-8 -*- | |
| """MCP Tools - Grok AI 对话工具""" | |
| import json | |
| from typing import Optional | |
| from app.services.grok.client import GrokClient | |
| from app.core.logger import logger | |
| from app.core.exception import GrokApiException | |
| async def ask_grok_impl( | |
| query: str, | |
| model: str = "grok-3-fast", | |
| system_prompt: Optional[str] = None | |
| ) -> str: | |
| """ | |
| 内部实现: 调用Grok API并收集完整响应 | |
| Args: | |
| query: 用户问题 | |
| model: 模型名称 | |
| system_prompt: 系统提示词 | |
| Returns: | |
| str: 完整的Grok响应内容 | |
| """ | |
| try: | |
| # 构建消息列表 | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": query}) | |
| # 构建请求 | |
| request_data = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": True | |
| } | |
| logger.info(f"[MCP] ask_grok 调用, 模型: {model}") | |
| # 调用Grok客户端(流式) | |
| response_iterator = await GrokClient.openai_to_grok(request_data) | |
| # 收集所有流式响应块 | |
| content_parts = [] | |
| async for chunk in response_iterator: | |
| if isinstance(chunk, bytes): | |
| chunk = chunk.decode('utf-8') | |
| # 解析SSE格式 | |
| if chunk.startswith("data: "): | |
| data_str = chunk[6:].strip() | |
| if data_str == "[DONE]": | |
| break | |
| try: | |
| data = json.loads(data_str) | |
| choices = data.get("choices", []) | |
| if choices: | |
| delta = choices[0].get("delta", {}) | |
| if content := delta.get("content"): | |
| content_parts.append(content) | |
| except json.JSONDecodeError: | |
| continue | |
| result = "".join(content_parts) | |
| logger.info(f"[MCP] ask_grok 完成, 响应长度: {len(result)}") | |
| return result | |
| except GrokApiException as e: | |
| logger.error(f"[MCP] Grok API错误: {str(e)}") | |
| raise Exception(f"Grok API调用失败: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"[MCP] ask_grok异常: {str(e)}", exc_info=True) | |
| raise Exception(f"处理请求时出错: {str(e)}") | |