akashraut commited on
Commit
3b11c41
·
verified ·
1 Parent(s): a86b005

Create server.py

Browse files
Files changed (1) hide show
  1. server.py +120 -0
server.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # server.py
2
+ import sys
3
+ import json
4
+ import asyncio
5
+ import yfinance as yf
6
+ import pandas as pd
7
+ from statsmodels.tsa.holtwinters import ExponentialSmoothing
8
+ from langchain_core.tools import tool, BaseTool
9
+
10
+ # --- MCP Server Logic (Handles communication) ---
11
+ class MCPMultiToolServer:
12
+ """A server that exposes multiple LangChain tools over JSON-RPC."""
13
+ def __init__(self, tools: list[BaseTool]):
14
+ self.tools = {tool.name: tool for tool in tools}
15
+
16
+ async def _handle_request(self, request: dict):
17
+ method = request.get("method")
18
+ params = request.get("params", {})
19
+ request_id = request.get("id")
20
+
21
+ if method == "discover":
22
+ # Send back the schema for all available tools
23
+ tool_specs = [
24
+ {"name": t.name, "description": t.description, "args_schema": t.args}
25
+ for t in self.tools.values()
26
+ ]
27
+ return {"jsonrpc": "2.0", "result": tool_specs, "id": request_id}
28
+
29
+ if method == "execute":
30
+ tool_name = params.get("tool_name")
31
+ tool_args = params.get("tool_args", {})
32
+ tool_to_execute = self.tools.get(tool_name)
33
+ if not tool_to_execute:
34
+ return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": request_id}
35
+
36
+ try:
37
+ # Execute the tool with the provided arguments
38
+ result = await tool_to_execute.ainvoke(tool_args)
39
+ return {"jsonrpc": "2.0", "result": result, "id": request_id}
40
+ except Exception as e:
41
+ return {"jsonrpc": "2.0", "error": {"code": -32603, "message": f"Internal error: {e}"}, "id": request_id}
42
+
43
+ return {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": request_id}
44
+
45
+ async def serve(self):
46
+ """Listens to stdin for JSON-RPC requests and sends responses to stdout."""
47
+ reader = asyncio.StreamReader()
48
+ await asyncio.get_event_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
49
+ writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
50
+ writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
51
+
52
+ while not reader.at_eof():
53
+ line = await reader.readline()
54
+ if not line: break
55
+ try:
56
+ request = json.loads(line)
57
+ response = await self._handle_request(request)
58
+ writer.write(json.dumps(response).encode() + b'\n')
59
+ await writer.drain()
60
+ except json.JSONDecodeError:
61
+ continue
62
+
63
+ # --- Tool Definitions (The actual work is done here) ---
64
+ COMMODITY_TICKERS = {"gold": "GC=F", "silver": "SI=F"}
65
+
66
+ @tool
67
+ async def get_current_price(commodity_name: str) -> str:
68
+ """
69
+ Gets the most recent 'live' price for a given commodity.
70
+ Use this for questions about the current or today's price of gold or silver.
71
+ """
72
+ commodity_name = commodity_name.lower()
73
+ ticker = COMMODITY_TICKERS.get(commodity_name)
74
+ if not ticker:
75
+ return f"Error: '{commodity_name}' is not supported. Please use 'gold' or 'silver'."
76
+
77
+ try:
78
+ data = yf.Ticker(ticker).history(period="1d")
79
+ if data.empty: return "Could not retrieve price data."
80
+ price = data['Close'].iloc[-1]
81
+ return f"The current price of {commodity_name} is approximately ${price:.2f} USD."
82
+ except Exception as e:
83
+ return f"An error occurred while fetching the price: {e}"
84
+
85
+ @tool
86
+ async def get_price_forecast(commodity_name: str, forecast_days: int) -> str:
87
+ """
88
+ Generates a price forecast for a commodity for 3 to 5 days.
89
+ Use this for questions about forecasts, predictions, or future outlooks for gold or silver.
90
+ """
91
+ commodity_name = commodity_name.lower()
92
+ ticker = COMMODITY_TICKERS.get(commodity_name)
93
+ if not ticker:
94
+ return f"Error: '{commodity_name}' is not supported. Please use 'gold' or 'silver'."
95
+ if not 3 <= forecast_days <= 5:
96
+ return "Error: Forecast can only be for 3, 4, or 5 days."
97
+
98
+ try:
99
+ data = yf.download(ticker, period="6mo", interval="1d", progress=False)
100
+ if data.empty: return "Not enough historical data to generate a forecast."
101
+
102
+ model = ExponentialSmoothing(data['Close'], trend='add').fit()
103
+ forecast = model.forecast(steps=forecast_days)
104
+
105
+ response = f"The {forecast_days}-day price forecast for {commodity_name} is:\n"
106
+ for i, val in enumerate(forecast, 1):
107
+ response += f"Day {i}: ${val:.2f} USD\n"
108
+ return response.strip()
109
+ except Exception as e:
110
+ return f"An error occurred during forecasting: {e}"
111
+
112
+ # --- Main entry point for the server process ---
113
+ async def main():
114
+ """Starts the tool server."""
115
+ tools = [get_current_price, get_price_forecast]
116
+ server = MCPMultiToolServer(tools)
117
+ await server.serve()
118
+
119
+ if __name__ == "__main__":
120
+ asyncio.run(main())