akashraut commited on
Commit
a1afbba
·
verified ·
1 Parent(s): 572d0f2

Update server.py

Browse files
Files changed (1) hide show
  1. server.py +41 -92
server.py CHANGED
@@ -1,120 +1,69 @@
1
  # server.py
2
- import sys
3
- import json
4
- import asyncio
5
  import yfinance as yf
6
- import pandas as pd
7
  from statsmodels.tsa.holtwinters import ExponentialSmoothing
8
  from langchain_core.tools import tool, BaseTool
9
 
10
- # --- MCP Server Logic (Handles communication) ---
11
- class MCPMultiToolServer:
12
- """A server that exposes multiple LangChain tools over JSON-RPC."""
13
  def __init__(self, tools: list[BaseTool]):
14
- self.tools = {tool.name: tool for tool in tools}
15
-
16
- async def _handle_request(self, request: dict):
17
- method = request.get("method")
18
- params = request.get("params", {})
19
- request_id = request.get("id")
20
 
 
 
21
  if method == "discover":
22
- # Send back the schema for all available tools
23
- tool_specs = [
24
- {"name": t.name, "description": t.description, "args_schema": t.args}
25
- for t in self.tools.values()
26
- ]
27
- return {"jsonrpc": "2.0", "result": tool_specs, "id": request_id}
28
-
29
  if method == "execute":
30
- tool_name = params.get("tool_name")
31
- tool_args = params.get("tool_args", {})
32
- tool_to_execute = self.tools.get(tool_name)
33
- if not tool_to_execute:
34
- return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": request_id}
35
-
36
- try:
37
- # Execute the tool with the provided arguments
38
- result = await tool_to_execute.ainvoke(tool_args)
39
- return {"jsonrpc": "2.0", "result": result, "id": request_id}
40
- except Exception as e:
41
- return {"jsonrpc": "2.0", "error": {"code": -32603, "message": f"Internal error: {e}"}, "id": request_id}
42
-
43
- return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": request_id}
44
 
45
  async def serve(self):
46
- """Listens to stdin for JSON-RPC requests and sends responses to stdout."""
47
  reader = asyncio.StreamReader()
48
  await asyncio.get_event_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
49
- writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
50
- writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
51
-
52
- while not reader.at_eof():
53
- line = await reader.readline()
54
- if not line: break
55
  try:
56
- request = json.loads(line)
57
- response = await self._handle_request(request)
58
  writer.write(json.dumps(response).encode() + b'\n')
59
  await writer.drain()
60
- except json.JSONDecodeError:
61
- continue
62
 
63
- # --- Tool Definitions (The actual work is done here) ---
64
  COMMODITY_TICKERS = {"gold": "GC=F", "silver": "SI=F"}
65
 
66
  @tool
67
  async def get_current_price(commodity_name: str) -> str:
68
- """
69
- Gets the most recent 'live' price for a given commodity.
70
- Use this for questions about the current or today's price of gold or silver.
71
- """
72
- commodity_name = commodity_name.lower()
73
- ticker = COMMODITY_TICKERS.get(commodity_name)
74
- if not ticker:
75
- return f"Error: '{commodity_name}' is not supported. Please use 'gold' or 'silver'."
76
-
77
  try:
78
- data = yf.Ticker(ticker).history(period="1d")
79
- if data.empty: return "Could not retrieve price data."
80
- price = data['Close'].iloc[-1]
81
- return f"The current price of {commodity_name} is approximately ${price:.2f} USD."
82
- except Exception as e:
83
- return f"An error occurred while fetching the price: {e}"
84
 
85
  @tool
86
  async def get_price_forecast(commodity_name: str, forecast_days: int) -> str:
87
- """
88
- Generates a price forecast for a commodity for 3 to 5 days.
89
- Use this for questions about forecasts, predictions, or future outlooks for gold or silver.
90
- """
91
- commodity_name = commodity_name.lower()
92
- ticker = COMMODITY_TICKERS.get(commodity_name)
93
- if not ticker:
94
- return f"Error: '{commodity_name}' is not supported. Please use 'gold' or 'silver'."
95
- if not 3 <= forecast_days <= 5:
96
- return "Error: Forecast can only be for 3, 4, or 5 days."
97
-
98
  try:
99
- data = yf.download(ticker, period="6mo", interval="1d", progress=False)
100
- if data.empty: return "Not enough historical data to generate a forecast."
101
-
102
- model = ExponentialSmoothing(data['Close'], trend='add').fit()
103
- forecast = model.forecast(steps=forecast_days)
104
-
105
- response = f"The {forecast_days}-day price forecast for {commodity_name} is:\n"
106
- for i, val in enumerate(forecast, 1):
107
- response += f"Day {i}: ${val:.2f} USD\n"
108
- return response.strip()
109
- except Exception as e:
110
- return f"An error occurred during forecasting: {e}"
111
-
112
- # --- Main entry point for the server process ---
113
- async def main():
114
- """Starts the tool server."""
115
- tools = [get_current_price, get_price_forecast]
116
- server = MCPMultiToolServer(tools)
117
- await server.serve()
118
 
 
119
  if __name__ == "__main__":
120
- asyncio.run(main())
 
 
1
  # server.py
2
+ import sys, json, asyncio
 
 
3
  import yfinance as yf
 
4
  from statsmodels.tsa.holtwinters import ExponentialSmoothing
5
  from langchain_core.tools import tool, BaseTool
6
 
7
+ # --- Compact MCP Server Logic ---
8
+ class MCPToolServer:
 
9
  def __init__(self, tools: list[BaseTool]):
10
+ self.tools = {t.name: t for t in tools}
 
 
 
 
 
11
 
12
+ async def _handle_request(self, req: dict):
13
+ method, params, req_id = req.get("method"), req.get("params", {}), req.get("id")
14
  if method == "discover":
15
+ result = [{"name": t.name, "description": t.description, "args_schema": t.args} for t in self.tools.values()]
16
+ return {"jsonrpc": "2.0", "result": result, "id": req_id}
 
 
 
 
 
17
  if method == "execute":
18
+ tool_name, tool_args = params.get("tool_name"), params.get("tool_args", {})
19
+ if tool_to_exec := self.tools.get(tool_name):
20
+ try:
21
+ result = await tool_to_exec.ainvoke(tool_args)
22
+ return {"jsonrpc": "2.0", "result": result, "id": req_id}
23
+ except Exception as e:
24
+ return {"jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": req_id}
25
+ return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": req_id}
 
 
 
 
 
 
26
 
27
  async def serve(self):
 
28
  reader = asyncio.StreamReader()
29
  await asyncio.get_event_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
30
+ writer_transport, _ = await asyncio.get_event_loop().connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
31
+ writer = asyncio.StreamWriter(writer_transport, _, None, asyncio.get_event_loop())
32
+ while line := await reader.readline():
 
 
 
33
  try:
34
+ response = await self._handle_request(json.loads(line))
 
35
  writer.write(json.dumps(response).encode() + b'\n')
36
  await writer.drain()
37
+ except json.JSONDecodeError: continue
 
38
 
39
+ # --- Compact Tool Definitions ---
40
  COMMODITY_TICKERS = {"gold": "GC=F", "silver": "SI=F"}
41
 
42
  @tool
43
  async def get_current_price(commodity_name: str) -> str:
44
+ """Gets the most recent 'live' price for gold or silver."""
45
+ ticker = COMMODITY_TICKERS.get(commodity_name.lower())
46
+ if not ticker: return f"Error: '{commodity_name}' is not supported. Use 'gold' or 'silver'."
 
 
 
 
 
 
47
  try:
48
+ price = yf.Ticker(ticker).history(period="1d")['Close'].iloc[-1]
49
+ return f"The current price of {commodity_name} is approx. ${price:.2f} USD."
50
+ except Exception as e: return f"Error fetching price: {e}"
 
 
 
51
 
52
  @tool
53
  async def get_price_forecast(commodity_name: str, forecast_days: int) -> str:
54
+ """Generates a 3 to 5 day price forecast for gold or silver."""
55
+ ticker = COMMODITY_TICKERS.get(commodity_name.lower())
56
+ if not ticker: return f"Error: '{commodity_name}' is not supported. Use 'gold' or 'silver'."
57
+ if not 3 <= forecast_days <= 5: return "Error: Forecast must be for 3, 4, or 5 days."
 
 
 
 
 
 
 
58
  try:
59
+ data = yf.download(ticker, period="6mo", progress=False)['Close']
60
+ if data.empty: return "Not enough data to forecast."
61
+ forecast = ExponentialSmoothing(data, trend='add').fit().forecast(steps=forecast_days)
62
+ forecast_lines = [f"Day {i}: ${val:.2f} USD" for i, val in enumerate(forecast, 1)]
63
+ return f"The {forecast_days}-day forecast for {commodity_name} is:\n" + "\n".join(forecast_lines)
64
+ except Exception as e: return f"Error during forecast: {e}"
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
66
+ # --- Main Server Entrypoint ---
67
  if __name__ == "__main__":
68
+ server = MCPToolServer(tools=[get_current_price, get_price_forecast])
69
+ asyncio.run(server.serve())