NeerajAhire commited on
Commit
dcc6beb
·
verified ·
1 Parent(s): 47f44f7

Upload mcp_client.py

Browse files
Files changed (1) hide show
  1. Client/mcp_client.py +167 -0
Client/mcp_client.py ADDED
@@ -0,0 +1,167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # fastmcp_client.py
2
+ import asyncio
3
+ import threading
4
+ from fastmcp import Client
5
+ import re
6
+ import logging
7
+ logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(levelname)-7s %(message)s")
8
+ logger = logging.getLogger("pdf-rag-mcp-client")
9
+ class MCPClientExcelRAG:
10
+ """
11
+ SSE-based MCP client for the Excel RAG tool `ask_excel`.
12
+ """
13
+
14
+ def __init__(self, url: str = "http://127.0.0.1:8001/sse"):
15
+ #def __init__(self, url: str = "http://127.0.0.1:7860/gradio_api/mcp/"):
16
+ self.url = url
17
+ self._loop = None
18
+ self._thread = None
19
+ self._client: Client | None = None
20
+ self._ready = threading.Event()
21
+ self._available_tools = []
22
+
23
+ # ---------- Background runner ----------
24
+ def _runner(self):
25
+ self._loop = asyncio.new_event_loop()
26
+ asyncio.set_event_loop(self._loop)
27
+
28
+ async def start_client():
29
+ self._client = Client(self.url)
30
+ await self._client.__aenter__()
31
+ try:
32
+ tools = await self._client.list_tools()
33
+ self._available_tools = [t.name for t in tools]
34
+ print(f"Available MCP tools: {self._available_tools}")
35
+ except Exception as e:
36
+ print(f"Failed to list tools: {e}")
37
+ self._ready.set()
38
+ await asyncio.Event().wait() # keep loop alive
39
+
40
+ self._loop.run_until_complete(start_client())
41
+
42
+ def start(self, timeout: float = 10):
43
+ if self._thread and self._thread.is_alive():
44
+ return
45
+ self._thread = threading.Thread(target=self._runner, daemon=True)
46
+ self._thread.start()
47
+ if not self._ready.wait(timeout):
48
+ raise RuntimeError("SSE MCP client failed to start")
49
+
50
+ def stop(self):
51
+ if not self._loop or not self._thread.is_alive():
52
+ return
53
+ for task in asyncio.all_tasks(self._loop):
54
+ if not task.done():
55
+ self._loop.call_soon_threadsafe(task.cancel)
56
+ break
57
+ self._loop.call_soon_threadsafe(self._loop.stop)
58
+ self._thread.join(timeout=5)
59
+
60
+ # ---------- RAG tool call ----------
61
+
62
+
63
+ def ask_excel(self, question: str, top_k: int = 5, sheet: int | str = 0, temperature: float = 0.1) -> str:
64
+ if "ask_excel" not in self._available_tools:
65
+ return "❌ Tool 'ask_excel' not available on the server."
66
+ logger.info(f"[CLIENT] call ask_excel q='{question[:80]}...' top_k={top_k} sheet={sheet} temp={temperature}")
67
+ fut = asyncio.run_coroutine_threadsafe(
68
+ self._client.call_tool("ask_excel", {
69
+ "question": question,
70
+ "top_k": top_k,
71
+ "sheet": sheet,
72
+ "temperature": temperature,
73
+ }),
74
+ self._loop,
75
+ )
76
+ result = fut.result(timeout=60)
77
+ logger.info(f"[CLIENT] got result type={type(result)} has_content={hasattr(result, 'content')}")
78
+
79
+ # ✅ Extract text from content blocks if available
80
+ if hasattr(result, "content") and result.content:
81
+ texts = []
82
+ for block in result.content:
83
+ # Handle FastMCP TextContent objects or dict blocks
84
+ if hasattr(block, "text"):
85
+ texts.append(block.text)
86
+ elif isinstance(block, dict) and block.get("type") == "text":
87
+ texts.append(block.get("text", ""))
88
+ answer = "\n".join(texts).strip() if texts else str(result.content)
89
+ elif hasattr(result, "structured_content") and result.structured_content:
90
+ sc = result.structured_content
91
+ if isinstance(sc, dict) and "result" in sc:
92
+ answer = sc["result"]
93
+ else:
94
+ answer = str(sc)
95
+ elif hasattr(result, "data") and result.data:
96
+ answer = str(result.data)
97
+ else:
98
+ answer = str(result)
99
+
100
+ # ✅ Optional: Remove [Row N] citations if you don't want them
101
+ answer = re.sub(r"\s*\[Row\s+\d+\]\s*", "", answer).strip()
102
+ logger.info(f"[CLIENT] answer_len={len(answer)} preview: {answer[:200]}")
103
+
104
+ return answer
105
+
106
+
107
+ def ask_pdf(self, question: str, top_k: int = 5, temperature: float = 0.1) -> str:
108
+ if "ask_pdf" not in self._available_tools:
109
+ return "❌ Tool 'ask_pdf' not available on the server."
110
+ logger.info(f"[CLIENT] call ask_pdf q='{question[:80]}...' top_k={top_k} temp={temperature}")
111
+ fut = asyncio.run_coroutine_threadsafe(
112
+ self._client.call_tool("ask_pdf", {
113
+ "question": question,
114
+ "top_k": top_k,
115
+ "temperature": temperature,
116
+ }),
117
+ self._loop,
118
+ )
119
+ result = fut.result(timeout=60)
120
+ # Extract text similar to ask_excel
121
+ if hasattr(result, "content") and result.content:
122
+ texts = []
123
+ for block in result.content:
124
+ # Handle FastMCP TextContent objects or dict blocks
125
+ if hasattr(block, "text"):
126
+ texts.append(block.text)
127
+ elif isinstance(block, dict) and block.get("type") == "text":
128
+ texts.append(block.get("text", ""))
129
+ answer = "\n".join(texts).strip() if texts else str(result.content)
130
+ elif hasattr(result, "structured_content") and result.structured_content:
131
+ sc = result.structured_content
132
+ if isinstance(sc, dict) and "result" in sc:
133
+ answer = sc["result"]
134
+ else:
135
+ answer = str(sc)
136
+ elif hasattr(result, "data") and result.data:
137
+ answer = str(result.data)
138
+ else:
139
+ answer = str(result)
140
+
141
+ # ✅ Optional: Remove [Row N] citations if you don't want them
142
+ answer = re.sub(r"\s*\[Row\s+\d+\]\s*", "", answer).strip()
143
+ logger.info(f"[CLIENT] answer_len={len(answer)} preview: {answer[:200]}")
144
+
145
+ return answer
146
+
147
+
148
+ def ask_link(self, question: str, temperature: float = 0.1, subquery_context: str = None, top_k: int = 5) -> str:
149
+ if "ask_link" not in self._available_tools:
150
+ return "❌ Tool 'ask_link' not available on the server."
151
+ fut = asyncio.run_coroutine_threadsafe(
152
+ self._client.call_tool("ask_link", {
153
+ "query": question,
154
+ "temperature": temperature,
155
+ "subquery_context": subquery_context or "",
156
+ "top_k": top_k,
157
+ }),
158
+ self._loop,
159
+ )
160
+ result = fut.result(timeout=60)
161
+ # Extract text
162
+ if hasattr(result, "content") and result.content:
163
+ texts = [block.text for block in result.content if hasattr(block, "text")]
164
+ return "\n".join(texts).strip() if texts else str(result.content)
165
+ return str(result)
166
+
167
+