mgokg commited on
Commit
cb60006
·
verified ·
1 Parent(s): eebf1aa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +40 -15
app.py CHANGED
@@ -1,19 +1,44 @@
1
  import asyncio
 
 
2
 
3
- async def connect_to_server(server):
4
- try:
5
- # Simulierte Verbindung zum Server
6
- await asyncio.sleep(1) # Simuliert eine Verzögerung
7
- raise ConnectionError("Verbindungsfehler") # Simuliert einen Verbindungsfehler
8
- except Exception as e:
9
- print(f"Fehler beim Verbinden mit {server}: {e}")
 
 
 
 
 
 
10
 
11
- async def main():
12
- servers = ["MCP-Server", "Gemini"]
13
- tasks = [connect_to_server(server) for server in servers]
14
-
15
- # Verwendung von asyncio.gather, um alle Aufgaben zu verwalten
16
- await asyncio.gather(*tasks, return_exceptions=True)
17
 
18
- # Ausführung des Hauptprogramms
19
- asyncio.run(main())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import asyncio
2
+ from mcp import ClientSession
3
+ from mcp.client.sse import sse_client
4
 
5
+ async def run_mcp_client():
6
+ # Die Basis-URL deines Hugging Face Spaces (mit /sse Endpunkt für das Python SDK)
7
+ server_url = "https://mgokg-db-timetable-api.hf.space"
8
+
9
+ print(f"Verbinde zu MCP Server: {server_url}...")
10
+
11
+ # 1. Transport-Layer aufbauen (SSE)
12
+ async with sse_client(url=server_url) as (read_stream, write_stream):
13
+
14
+ # 2. MCP-Session initialisieren
15
+ async with ClientSession(read_stream, write_stream) as session:
16
+ await session.initialize()
17
+ print("Verbindung erfolgreich hergestellt!\n")
18
 
19
+ # --- Beispiel 1: Verfügbare Tools auflisten ---
20
+ print("Verfügbare Tools:")
21
+ tools_result = await session.list_tools()
22
+ for tool in tools_result.tools:
23
+ print(f"- {tool.name}: {tool.description}")
 
24
 
25
+ # --- Beispiel 2: Ein Tool aufrufen ---
26
+ # Ersetze 'get_timetable' und die Argumente durch die echten Tool-Namen deiner API
27
+ if tools_result.tools:
28
+ tool_to_call = tools_result.tools[0].name
29
+ print(f"\nRufe Tool '{tool_to_call}' auf...")
30
+
31
+ # Beispielhafter Aufruf (Passe die Argumente an deine API an!)
32
+ result = await session.call_tool(
33
+ name=tool_to_call,
34
+ arguments={"station": "Berlin Hbf"}
35
+ )
36
+ print(f"Ergebnis: {result.content}")
37
+
38
+ if __name__ == "__main__":
39
+ try:
40
+ asyncio.run(run_mcp_client())
41
+ except KeyboardInterrupt:
42
+ pass
43
+ except Exception as e:
44
+ print(f"Fehler: {e}")