| import gradio as gr |
| import os |
| import asyncio |
| from typing import List |
| from google import genai |
| from google.genai import types |
| from mcp import ClientSession, StdioServerParameters |
| from mcp.client.stdio import stdio_client |
|
|
| # MCP-Server Konfiguration |
| server_params = StdioServerParameters( |
| command="npx", |
| args=[ |
| "mcp-remote", |
| "https://mgokg-db-timetable-api.hf.space/gradio_api/mcp/" |
| ] |
| ) |
|
|
| async def generate1(input_text): |
| try: |
| client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY")) |
| except Exception as e: |
| return f"Fehler bei der Initialisierung: {e}", "" |
| |
| model = "gemini-2.5-flash" |
| |
| try: |
| # MCP-Session aufbauen |
| async with stdio_client(server_params) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| |
| # MCP-Tools abrufen |
| mcp_tools_data = await session.list_tools() |
| |
| # Tools in Gemini-kompatibles Format konvertieren |
| function_declarations = [] |
| |
| # 1. MCP Tools hinzufügen |
| for tool in mcp_tools_data.tools: |
| schema = tool.inputSchema if tool.inputSchema else { |
| "type": "object", |
| "properties": {}, |
| "required": [] |
| } |
| |
| function_declarations.append( |
| types.FunctionDeclaration( |
| name=tool.name, |
| description=tool.description or "MCP Tool", |
| parameters=schema |
| ) |
| ) |
| |
| # 2. Google Search Tool hinzufügen (als Function Declaration) |
| # Die Gemini API erwartet Google Search als spezielles Tool |
| google_search_declaration = types.FunctionDeclaration( |
| name="google_search", |
| description="Search the internet for current information, news, facts, and general knowledge. Use this for questions about current events, weather, general knowledge, or when you need up-to-date information from the web.", |
| parameters={ |
| "type": "object", |
| "properties": { |
| "query": { |
| "type": "string", |
| "description": "The search query to look up on Google" |
| } |
| }, |
| "required": ["query"] |
| } |
| ) |
| function_declarations.append(google_search_declaration) |
| |
| # Alle Tools kombinieren |
| tools = [types.Tool(function_declarations=function_declarations)] |
| |
| # System Prompt zur Tool-Auswahl |
| system_instruction = """You are a helpful assistant with access to two types of tools: |
|
|
| 1. TRAIN CONNECTION TOOLS (MCP): Use these ONLY when the user asks about train connections, schedules, routes, or German railway (DB) information. Available tools: """ + ", ".join([t.name for t in mcp_tools_data.tools]) + """ |
|
|
| 2. GOOGLE SEARCH: Use this for ALL other queries - general knowledge, current events, weather, news, calculations, or any topic not related to train connections. |
|
|
| Choose the appropriate tool based on the user's intent. For train connections, use the MCP tools. For everything else, use Google Search.""" |
|
|
| contents = [types.Content( |
| role="user", |
| parts=[types.Part.from_text(text=input_text)] |
| )] |
| |
| # Erster API-Aufruf |
| response = await client.aio.models.generate_content( |
| model=model, |
| contents=contents, |
| config=types.GenerateContentConfig( |
| tools=tools, |
| temperature=0.4, |
| system_instruction=system_instruction |
| ) |
| ) |
| |
| # Agentic Loop für Tool-Calls |
| turn_count = 0 |
| max_turns = 5 |
| |
| while response.candidates and turn_count < max_turns: |
| candidate = response.candidates[0] |
| |
| if not hasattr(candidate, 'content') or not candidate.content: |
| break |
| |
| function_calls = [ |
| part for part in candidate.content.parts |
| if hasattr(part, 'function_call') and part.function_call |
| ] |
| |
| if not function_calls: |
| break |
| |
| turn_count += 1 |
| contents.append(candidate.content) |
| tool_responses = [] |
| |
| for part in function_calls: |
| fc = part.function_call |
| tool_name = fc.name |
| tool_args = dict(fc.args) if fc.args else {} |
| |
| try: |
| # Unterscheide zwischen MCP-Tools und Google Search |
| if tool_name == "google_search": |
| # Google Search über Gemini's eingebautes Tool |
| # Wir simulieren hier den Aufruf durch einen neuen Request mit google_search Tool |
| search_response = await client.aio.models.generate_content( |
| model=model, |
| contents=[types.Content( |
| role="user", |
| parts=[types.Part.from_text(text=f"Search Google for: {tool_args.get('query', '')}")] |
| )], |
| config=types.GenerateContentConfig( |
| tools=[types.Tool(google_search=types.GoogleSearch())], |
| temperature=0.4 |
| ) |
| ) |
| result_text = search_response.text |
| |
| else: |
| # MCP-Tool ausführen |
| tool_result = await session.call_tool(tool_name, tool_args) |
| |
| if tool_result.isError: |
| result_text = f"Error: {tool_result.content[0].text if tool_result.content else 'Unknown error'}" |
| else: |
| result_text = tool_result.content[0].text if tool_result.content else "No result" |
| |
| tool_responses.append( |
| types.FunctionResponse( |
| name=tool_name, |
| response={"result": result_text} |
| ) |
| ) |
| |
| except Exception as e: |
| tool_responses.append( |
| types.FunctionResponse( |
| name=tool_name, |
| response={"error": str(e)} |
| ) |
| ) |
| |
| contents.append(types.Content(role="user", parts=[ |
| types.Part.from_function_response( |
| name=tr.name, |
| response=tr.response |
| ) for tr in tool_responses |
| ])) |
| |
| # Nächster API-Aufruf |
| response = await client.aio.models.generate_content( |
| model=model, |
| contents=contents, |
| config=types.GenerateContentConfig( |
| tools=tools, |
| temperature=0.4, |
| system_instruction=system_instruction |
| ) |
| ) |
| |
| return response.text, "" |
| |
| except Exception as e: |
| import traceback |
| return f"Fehler während der Verarbeitung: {str(e)}\n\n{traceback.format_exc()}", "" |
|
|
| # Gradio UI Wrapper |
| def generate(input_text): |
| try: |
| return asyncio.run(generate1(input_text)) |
| except Exception as e: |
| import traceback |
| return f"UI Fehler: {str(e)}\n\n{traceback.format_exc()}", "" |
|
|
| if __name__ == '__main__': |
| with gr.Blocks() as demo: |
| gr.Markdown("# Gemini 2.5 Flash + Google Search + DB Timetable (MCP)") |
| |
| with gr.Row(): |
| with gr.Column(): |
| input_textbox = gr.Textbox( |
| lines=3, |
| label="Anfrage", |
| placeholder="z.B. Wie komme ich von Berlin nach Hamburg? oder Was ist die Hauptstadt von Frankreich?" |
| ) |
| submit_button = gr.Button("Senden", variant="primary") |
| |
| with gr.Column(): |
| output_textbox = gr.Markdown(label="Antwort") |
| status_textbox = gr.Textbox(label="Status", interactive=False) |
| |
| submit_button.click( |
| fn=generate, |
| inputs=input_textbox, |
| outputs=[output_textbox, status_textbox] |
| ) |
| |
| # Enter-Taste unterstützung |
| input_textbox.submit( |
| fn=generate, |
| inputs=input_textbox, |
| outputs=[output_textbox, status_textbox] |
| ) |
| |
| demo.launch(show_error=True, share=False) |