import gradio as gr import modelscope_studio.components.base as ms import modelscope_studio.components.antd as antd import modelscope_studio.components.pro as pro import modelscope_studio.components.antdx as antdx from modelscope_studio.components.pro.multimodal_input import MultimodalInputUploadConfig import json from langchain.chat_models import init_chat_model from exceptiongroup import ExceptionGroup from ui_components.config_form import ConfigForm from ui_components.mcp_servers_button import McpServersButton from mcp_client import generate_with_mcp, get_mcp_prompts from config import bot_config, default_mcp_config, default_mcp_prompts, default_mcp_servers, model_options, user_config, welcome_config, default_theme, default_locale, bot_avatars, primary_color, mcp_prompt_model, model_options_map from env import api_key, internal_mcp_config from tools.oss import file_path_to_oss_url def merge_mcp_config(mcp_config1, mcp_config2): return { "mcpServers": { **mcp_config1.get("mcpServers", {}), **mcp_config2.get("mcpServers", {}) } } def format_messages(messages, oss_cache): formatted_messages = [] for message in messages: if message["role"] == "user": contents = '' for content in message["content"]: if content["type"] == "text": contents += content["content"] elif content["type"] == "file": files = [] for file_path in content["content"]: file_url = oss_cache.get( file_path, file_path_to_oss_url(file_path)) oss_cache[file_path] = file_url files.append(file_url) contents += f"\n\nAttachment links: [{','.join(files)}]\n\n" formatted_messages.append({"role": "user", "content": contents}) elif message["role"] == "assistant": formatted_messages.append({ "role": "assistant", "content": "\n".join([ content["content"] for content in message["content"] if content["type"] == "text" ]) }) return formatted_messages def format_chatbot_header(model, model_config): tag_label = model_config.get("tag", {}).get("label") model_name = model.split('/')[1] if tag_label: return f"{model_name} `{tag_label}`" else: return model_name async def submit(input_value, config_form_value, mcp_config_value, mcp_servers_btn_value, chatbot_value, oss_state_value): model_value = config_form_value.get("model", "") model = model_value.split(":")[0] model_config = next( (x for x in model_options if x["value"] == model_value)) model_params = model_config.get("model_params", {}) sys_prompt = config_form_value.get("sys_prompt", "") enabled_mcp_servers = [ item["name"] for item in mcp_servers_btn_value["data_source"] if item.get("enabled") and not item.get("disabled") ] if input_value: chatbot_value.append({ "role": "user", "content": [{ "type": "text", "content": input_value["text"] }] + ([{ "type": "file", "content": [file for file in input_value["files"]] }] if len(input_value["files"]) > 0 else []), "class_names": dict(content="user-message-content") }) chatbot_value.append({ "role": "assistant", "loading": True, "content": [], "header": format_chatbot_header(model, model_config), "avatar": bot_avatars.get(model.split("/")[0], None), "status": "pending" }) yield gr.update( loading=True, value=None), gr.update(disabled=True), gr.update( value=chatbot_value, bot_config=bot_config( disabled_actions=['edit', 'retry', 'delete']), user_config=user_config(disabled_actions=['edit', 'delete'])) try: prev_chunk_type = None tool_name = "" tool_args = "" tool_content = "" async for chunk in generate_with_mcp( format_messages(chatbot_value[:-1], oss_state_value["oss_cache"]), mcp_config=merge_mcp_config(json.loads(mcp_config_value), internal_mcp_config), enabled_mcp_servers=enabled_mcp_servers, sys_prompt=sys_prompt, get_llm=lambda: init_chat_model( model=model, model_provider="openai", api_key=api_key, base_url="https://api-inference.modelscope.cn/v1/", **model_params)): chatbot_value[-1]["loading"] = False current_content = chatbot_value[-1]["content"] if prev_chunk_type != chunk["type"] and not ( prev_chunk_type == "tool_call_chunks" and chunk["type"] == "tool"): current_content.append({}) prev_chunk_type = chunk["type"] if chunk["type"] == "content": current_content[-1]['type'] = "text" if not isinstance(current_content[-1].get("content"), str): current_content[-1]['content'] = '' current_content[-1]['content'] += chunk['content'] elif chunk["type"] == "tool": if not isinstance(current_content[-1].get("content"), str): current_content[-1]['content'] = '' chunk_content = chunk["content"] current_content[-1]["content"] = current_content[-1][ "content"] + f'\n\n**🎯 结果**\n```\n{chunk_content}\n```' tool_name = "" tool_args = "" tool_content = "" current_content[-1]['options']["status"] = "done" elif chunk["type"] == "tool_call_chunks": current_content[-1]['type'] = "tool" current_content[-1]['editable'] = False current_content[-1]['copyable'] = False if not isinstance(current_content[-1].get("options"), dict): current_content[-1]['options'] = { "title": "", "status": "pending" } if chunk["next_tool"]: tool_name += ' ' tool_content = tool_content + f"**📝 参数**\n```json\n{tool_args}\n```\n\n" tool_args = "" if chunk["name"]: tool_name += chunk["name"] current_content[-1]['options'][ "title"] = f"**🔧 调用 MCP 工具** `{tool_name}`" if chunk["content"]: tool_args += chunk["content"] current_content[-1][ 'content'] = tool_content + f"**📝 参数**\n```json\n{tool_args}\n```" yield gr.skip(), gr.skip(), gr.update(value=chatbot_value) except ExceptionGroup as eg: e = eg.exceptions[0] chatbot_value[-1]["loading"] = False chatbot_value[-1]["content"] += [{ "type": "text", "content": f'{str(e)}' }] print('Error: ', e) raise gr.Error(str(e)) except Exception as e: chatbot_value[-1]["loading"] = False chatbot_value[-1]["content"] += [{ "type": "text", "content": f'{str(e)}' }] print('Error: ', e) raise gr.Error(str(e)) finally: chatbot_value[-1]["status"] = "done" yield gr.update(loading=False), gr.update(disabled=False), gr.update( value=chatbot_value, bot_config=bot_config(), user_config=user_config()) def cancel(chatbot_value): chatbot_value[-1]["loading"] = False chatbot_value[-1]["status"] = "done" chatbot_value[-1]["footer"] = "对话已暂停" yield gr.update(loading=False), gr.update(disabled=False), gr.update( value=chatbot_value, bot_config=bot_config(), user_config=user_config()) async def retry(config_form_value, mcp_config_value, mcp_servers_btn_value, chatbot_value, oss_state_value, e: gr.EventData): index = e._data["payload"][0]["index"] chatbot_value = chatbot_value[:index] async for chunk in submit(None, config_form_value, mcp_config_value, mcp_servers_btn_value, chatbot_value, oss_state_value): yield chunk def clear(): return gr.update(value=None) def select_welcome_prompt(input_value, e: gr.EventData): input_value["text"] = e._data["payload"][0]["value"]["description"] return gr.update(value=input_value) def select_model(e: gr.EventData): return gr.update(visible=e._data["payload"][1].get("thought", False)) async def reset_mcp_config(mcp_servers_btn_value): mcp_servers_btn_value["data_source"] = default_mcp_servers return gr.update(value=default_mcp_config), gr.update( value=mcp_servers_btn_value), gr.update( welcome_config=welcome_config(default_mcp_prompts)), gr.update( value={ "mcp_config": default_mcp_config, "mcp_prompts": default_mcp_prompts, "mcp_servers": default_mcp_servers }) def has_mcp_config_changed(old_config: dict, new_config: dict) -> bool: old_servers = old_config.get("mcpServers", {}) new_servers = new_config.get("mcpServers", {}) if set(old_servers.keys()) != set(new_servers.keys()): return True for server_name in old_servers: old_server = old_servers[server_name] new_server = new_servers.get(server_name) if new_server is None: return True if old_server.get("type") == "sse" and new_server.get("type") == "sse": if old_server.get("url") != new_server.get("url"): return True else: return True return False def save_mcp_config_wrapper(initial: bool): async def save_mcp_config(mcp_config_value, mcp_servers_btn_value, browser_state_value): mcp_config = json.loads(mcp_config_value) prev_mcp_config = json.loads(browser_state_value["mcp_config"]) browser_state_value["mcp_config"] = mcp_config_value if has_mcp_config_changed(prev_mcp_config, mcp_config): mcp_servers_btn_value["data_source"] = [{ "name": mcp_name, "enabled": True } for mcp_name in mcp_config.get("mcpServers", {}).keys() ] + default_mcp_servers browser_state_value["mcp_servers"] = mcp_servers_btn_value[ "data_source"] yield gr.update( welcome_config=welcome_config({}, loading=True)), gr.update( value=mcp_servers_btn_value), gr.skip() if not initial: gr.Success("保存成功") prompts = await get_mcp_prompts( mcp_config=merge_mcp_config(mcp_config, internal_mcp_config), get_llm=lambda: init_chat_model( model=mcp_prompt_model, model_provider="openai", api_key=api_key, base_url="https://api-inference.modelscope.cn/v1/")) browser_state_value["mcp_prompts"] = prompts yield gr.update( welcome_config=welcome_config(prompts)), gr.skip(), gr.update( value=browser_state_value) else: yield gr.skip(), gr.skip(), gr.update(value=browser_state_value) if not initial: gr.Success("保存成功") return save_mcp_config def save_mcp_servers(mcp_servers_btn_value, browser_state_value): browser_state_value["mcp_servers"] = mcp_servers_btn_value["data_source"] return gr.update(value=browser_state_value) def load(mcp_servers_btn_value, browser_state_value, url_mcp_config_value): if browser_state_value: mcp_servers_btn_value["data_source"] = browser_state_value[ "mcp_servers"] try: url_mcp_config = json.loads(url_mcp_config_value) except: url_mcp_config = {} return gr.update(value=json.dumps( merge_mcp_config(json.loads(browser_state_value["mcp_config"]), url_mcp_config), indent=4)), gr.update(welcome_config=welcome_config( browser_state_value["mcp_prompts"])), gr.update( value=mcp_servers_btn_value) elif url_mcp_config_value: try: url_mcp_config = json.loads(url_mcp_config_value) except: url_mcp_config = {} return gr.update(value=json.dumps(merge_mcp_config(url_mcp_config, {}), indent=4)), gr.skip(), gr.skip() return gr.skip() def lighten_color(hex_color, factor=0.2): hex_color = hex_color.lstrip("#") # 解析RGB值 r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) # 向白色方向调整 r = min(255, r + int((255 - r) * factor)) g = min(255, g + int((255 - g) * factor)) b = min(255, b + int((255 - b) * factor)) # 转回十六进制 return f"{r:02x}{g:02x}{b:02x}" lighten_primary_color = lighten_color(primary_color, 0.4) css = f""" @media (max-width: 768px) {{ .mcp-playground-header {{ margin-top: 36px; }} }} .ms-gr-auto-loading-default-antd {{ z-index: 1001 !important; }} .user-message-content {{ background-color: #{lighten_primary_color}; }} """ js = "function init() { window.MODEL_OPTIONS_MAP=" + json.dumps( model_options_map) + "}" with gr.Blocks(css=css, js=js) as demo: browser_state = gr.BrowserState( { "mcp_config": default_mcp_config, "mcp_prompts": default_mcp_prompts, "mcp_servers": default_mcp_servers }, storage_key="mcp_config") oss_state = gr.State({"oss_cache": {}}) with ms.Application(), antdx.XProvider( locale=default_locale, theme=default_theme), ms.AutoLoading(): with antd.Badge.Ribbon(placement="start"): with ms.Slot("text"): with antd.Typography.Link(elem_style=dict(color="#fff"), type="link", href="https://modelscope.cn/mcp", href_target="_blank"): with antd.Flex(align="center", gap=2, elem_style=dict(padding=2)): antd.Icon("ExportOutlined", elem_style=dict(marginRight=4)) ms.Text("前往") antd.Image("./assets/modelscope-mcp.png", preview=False, width=20, height=20) ms.Text("ModelScope MCP 广场") with ms.Div(elem_style=dict(overflow="hidden")): with antd.Flex(justify="center", gap="small", align="center", elem_classes="mcp-playground-header"): with ms.Div(elem_style=dict(flexShrink=0, display='flex')): antd.Image("./assets/logo.png", preview=False, elem_style=dict(backgroundColor="#fff"), width=50, height=50) antd.Typography.Title("MCP Playground", level=1, elem_style=dict(fontSize=28, margin=0)) with antd.Tabs(): with antd.Tabs.Item(label="实验场"): with antd.Flex( vertical=True, gap="middle", elem_style=dict( height= 'calc(100vh - 46px - 16px - 50px - 16px - 16px - 21px - 16px)', maxHeight=1500)): with antd.Card( elem_style=dict(flex=1, height=0, display="flex", flexDirection="column"), styles=dict(body=dict(flex=1, height=0, display='flex', flexDirection='column'))): chatbot = pro.Chatbot( height=0, bot_config=bot_config(), user_config=user_config(), welcome_config=welcome_config(default_mcp_prompts), elem_style=dict(flex=1)) with pro.MultimodalInput( upload_config=MultimodalInputUploadConfig( placeholder={ "inline": { "title": "上传文件", "description": "拖拽文件到此处或点击录音按钮开始录音" }, "drop": { "title": "将文件拖放到此处", } }, title= "上传附件(只对部分支持远程文件 URL 的 MCP Server 生效,文件个数上限: 10)", multiple=True, allow_paste_file=True, allow_speech=True, max_count=10)) as input: with ms.Slot("prefix"): with antd.Button(value=None, variant="text", color="default") as clear_btn: with ms.Slot("icon"): antd.Icon("ClearOutlined") mcp_servers_btn = McpServersButton( data_source=default_mcp_servers) with antd.Tabs.Item(label="配置"): with antd.Flex(vertical=True, gap="small"): with antd.Card(): config_form, mcp_config_confirm_btn, reset_mcp_config_btn, mcp_config = ConfigForm( ) url_mcp_config = gr.Textbox(visible=False) load_event = demo.load( fn=load, js= "(mcp_servers_btn_value, browser_state_value) => [mcp_servers_btn_value, browser_state_value, decodeURIComponent(new URLSearchParams(window.location.search).get('studio_additional_params') || '') || null]", inputs=[mcp_servers_btn, browser_state, url_mcp_config], outputs=[mcp_config, chatbot, mcp_servers_btn]) chatbot.welcome_prompt_select(fn=select_welcome_prompt, inputs=[input], outputs=[input], queue=False) retry_event = chatbot.retry( fn=retry, inputs=[config_form, mcp_config, mcp_servers_btn, chatbot, oss_state], outputs=[input, clear_btn, chatbot]) clear_btn.click(fn=clear, outputs=[chatbot], queue=False) mcp_servers_btn.change(fn=save_mcp_servers, inputs=[mcp_servers_btn, browser_state], outputs=[browser_state]) load_success_save_mcp_config_event = load_event.success( fn=save_mcp_config_wrapper(initial=True), inputs=[mcp_config, mcp_servers_btn, browser_state], outputs=[chatbot, mcp_servers_btn, browser_state]) save_mcp_config_event = mcp_config_confirm_btn.click( fn=save_mcp_config_wrapper(initial=False), inputs=[mcp_config, mcp_servers_btn, browser_state], cancels=[load_success_save_mcp_config_event], outputs=[chatbot, mcp_servers_btn, browser_state]) reset_mcp_config_btn.click( fn=reset_mcp_config, inputs=[mcp_servers_btn], outputs=[mcp_config, mcp_servers_btn, chatbot, browser_state], cancels=[save_mcp_config_event, load_success_save_mcp_config_event]) submit_event = input.submit(fn=submit, inputs=[ input, config_form, mcp_config, mcp_servers_btn, chatbot, oss_state ], outputs=[input, clear_btn, chatbot]) input.cancel(fn=cancel, inputs=[chatbot], outputs=[input, clear_btn, chatbot], cancels=[submit_event, retry_event], queue=False) demo.queue(default_concurrency_limit=100, max_size=100).launch(ssr_mode=False, max_threads=100)