File size: 10,388 Bytes
d7b3d84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
"""
Advanced example of building an AI assistant that uses browser-use MCP server.

This example shows how to build a more sophisticated MCP client that:
- Connects to multiple MCP servers (browser-use + filesystem)
- Orchestrates complex multi-step workflows
- Handles errors and retries
- Provides a conversational interface

Prerequisites:
1. Install required packages:
   pip install 'browser-use[cli]'

2. Start the browser-use MCP server:
   uvx 'browser-use[cli]' --mcp

3. Run this example:
   python advanced_server.py

This demonstrates real-world usage patterns for the MCP protocol.
"""

import asyncio
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import TextContent, Tool


@dataclass
class TaskResult:
	"""Result of executing a task."""

	success: bool
	data: Any
	error: str | None = None
	timestamp: datetime | None = None

	def __post_init__(self):
		if self.timestamp is None:
			self.timestamp = datetime.now()


class AIAssistant:
	"""An AI assistant that uses MCP servers to perform complex tasks."""

	def __init__(self):
		self.servers: dict[str, ClientSession] = {}
		self.tools: dict[str, Tool] = {}
		self.history: list[TaskResult] = []

	async def connect_server(self, name: str, command: str, args: list[str], env: dict[str, str] | None = None):
		"""Connect to an MCP server and discover its tools."""
		print(f'\nπŸ”Œ Connecting to {name} server...')

		server_params = StdioServerParameters(command=command, args=args, env=env or {})

		try:
			# Create connection
			read, write = await stdio_client(server_params).__aenter__()
			session = ClientSession(read, write)
			await session.__aenter__()
			await session.initialize()

			# Store session
			self.servers[name] = session

			# Discover tools
			tools_result = await session.list_tools()
			tools = tools_result.tools
			for tool in tools:
				# Prefix tool names with server name to avoid conflicts
				prefixed_name = f'{name}.{tool.name}'
				self.tools[prefixed_name] = tool
				print(f'  βœ“ Discovered: {prefixed_name}')

			print(f'βœ… Connected to {name} with {len(tools)} tools')

		except Exception as e:
			print(f'❌ Failed to connect to {name}: {e}')
			raise

	async def disconnect_all(self):
		"""Disconnect from all MCP servers."""
		for name, session in self.servers.items():
			try:
				await session.__aexit__(None, None, None)
				print(f'πŸ“΄ Disconnected from {name}')
			except Exception as e:
				print(f'⚠️ Error disconnecting from {name}: {e}')

	async def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> TaskResult:
		"""Call a tool on the appropriate MCP server."""
		# Parse server and tool name
		if '.' not in tool_name:
			return TaskResult(False, None, "Invalid tool name format. Use 'server.tool'")

		server_name, actual_tool_name = tool_name.split('.', 1)

		# Check if server is connected
		if server_name not in self.servers:
			return TaskResult(False, None, f"Server '{server_name}' not connected")

		# Call the tool
		try:
			session = self.servers[server_name]
			result = await session.call_tool(actual_tool_name, arguments)

			# Extract text content
			text_content = [c.text for c in result.content if isinstance(c, TextContent)]
			data = text_content[0] if text_content else str(result.content)

			task_result = TaskResult(True, data)
			self.history.append(task_result)
			return task_result

		except Exception as e:
			error_result = TaskResult(False, None, str(e))
			self.history.append(error_result)
			return error_result

	async def search_and_save(self, query: str, output_file: str) -> TaskResult:
		"""Search for information and save results to a file."""
		print(f'\nπŸ” Searching for: {query}')

		# Step 1: Navigate to search engine
		print('  1️⃣ Opening DuckDuckGo...')
		nav_result = await self.call_tool('browser.browser_navigate', {'url': f'https://duckduckgo.com/?q={query}'})
		if not nav_result.success:
			return nav_result

		await asyncio.sleep(2)  # Wait for page load

		# Step 2: Get search results
		print('  2️⃣ Extracting search results...')
		extract_result = await self.call_tool(
			'browser.browser_extract_content',
			{'query': 'Extract the top 5 search results with titles and descriptions', 'extract_links': True},
		)
		if not extract_result.success:
			return extract_result

		# Step 3: Save to file (if filesystem server is connected)
		if 'filesystem' in self.servers:
			print(f'  3️⃣ Saving results to {output_file}...')
			save_result = await self.call_tool(
				'filesystem.write_file',
				{'path': output_file, 'content': f'Search Query: {query}\n\nResults:\n{extract_result.data}'},
			)
			if save_result.success:
				print(f'  βœ… Results saved to {output_file}')
		else:
			print('  ⚠️ Filesystem server not connected, skipping save')

		return extract_result

	async def monitor_page_changes(self, url: str, duration: int = 10, interval: int = 2):
		"""Monitor a webpage for changes over time."""
		print(f'\nπŸ“Š Monitoring {url} for {duration} seconds...')

		# Navigate to page
		await self.call_tool('browser.browser_navigate', {'url': url})
		await asyncio.sleep(2)

		changes = []
		start_time = datetime.now()

		while (datetime.now() - start_time).seconds < duration:
			# Get current state
			state_result = await self.call_tool('browser.browser_get_state', {'include_screenshot': False})

			if state_result.success:
				state = json.loads(state_result.data)
				changes.append(
					{
						'timestamp': datetime.now().isoformat(),
						'title': state.get('title', ''),
						'element_count': len(state.get('interactive_elements', [])),
					}
				)
				print(f'  πŸ“Έ Captured state at {changes[-1]["timestamp"]}')

			await asyncio.sleep(interval)

		return TaskResult(True, changes)

	async def fill_form_workflow(self, form_url: str, form_data: dict[str, str]):
		"""Navigate to a form and fill it out."""
		print(f'\nπŸ“ Form filling workflow for {form_url}')

		# Step 1: Navigate to form
		print('  1️⃣ Navigating to form...')
		nav_result = await self.call_tool('browser.browser_navigate', {'url': form_url})
		if not nav_result.success:
			return nav_result

		await asyncio.sleep(2)

		# Step 2: Get form elements
		print('  2️⃣ Analyzing form elements...')
		state_result = await self.call_tool('browser.browser_get_state', {'include_screenshot': False})

		if not state_result.success:
			return state_result

		state = json.loads(state_result.data)

		# Step 3: Fill form fields
		print('  3️⃣ Filling form fields...')
		filled_fields = []

		for element in state.get('interactive_elements', []):
			# Look for input fields
			if element.get('tag') in ['input', 'textarea']:
				# Try to match field by placeholder or nearby text
				for field_name, field_value in form_data.items():
					element_text = str(element).lower()
					if field_name.lower() in element_text:
						print(f'    ✏️ Filling {field_name}...')
						type_result = await self.call_tool(
							'browser.browser_type', {'index': element['index'], 'text': field_value}
						)
						if type_result.success:
							filled_fields.append(field_name)
						await asyncio.sleep(0.5)
						break

		return TaskResult(True, {'filled_fields': filled_fields, 'form_data': form_data, 'url': form_url})


async def main():
	"""Main demonstration of advanced MCP client usage."""
	print('Browser-Use MCP Client - Advanced Example')
	print('=' * 50)

	assistant = AIAssistant()

	try:
		# Connect to browser-use MCP server
		await assistant.connect_server(name='browser', command='uvx', args=['browser-use[cli]', '--mcp'])

		# Optionally connect to filesystem server
		# Note: Uncomment to enable file operations
		# await assistant.connect_server(
		#     name="filesystem",
		#     command="npx",
		#     args=["@modelcontextprotocol/server-filesystem", "."]
		# )

		print('\n' + '=' * 50)
		print('Starting demonstration workflows...')
		print('=' * 50)

		# Demo 1: Search and extract
		print('\nπŸ“Œ Demo 1: Web Search and Extraction')
		search_result = await assistant.search_and_save(query='MCP protocol browser automation', output_file='search_results.txt')
		print(f'Search completed: {"βœ…" if search_result.success else "❌"}')

		# Demo 2: Multi-tab comparison
		print('\nπŸ“Œ Demo 2: Multi-tab News Comparison')
		news_sites = [('BBC News', 'https://bbc.com/news'), ('CNN', 'https://cnn.com'), ('Reuters', 'https://reuters.com')]

		for i, (name, url) in enumerate(news_sites):
			print(f'\n  πŸ“° Opening {name}...')
			await assistant.call_tool('browser.browser_navigate', {'url': url, 'new_tab': i > 0})
			await asyncio.sleep(2)

		# List all tabs
		tabs_result = await assistant.call_tool('browser.browser_list_tabs', {})
		if tabs_result.success:
			tabs = json.loads(tabs_result.data)
			print(f'\n  πŸ“‘ Opened {len(tabs)} news sites:')
			for tab in tabs:
				print(f'    - Tab {tab["index"]}: {tab["title"]}')

		# Demo 3: Form filling
		print('\nπŸ“Œ Demo 3: Automated Form Filling')
		form_result = await assistant.fill_form_workflow(
			form_url='https://httpbin.org/forms/post',
			form_data={
				'custname': 'AI Assistant',
				'custtel': '555-0123',
				'custemail': 'ai@example.com',
				'comments': 'Testing MCP browser automation',
			},
		)
		if form_result.success:
			print(f'  βœ… Filled {len(form_result.data["filled_fields"])} fields')

		# Demo 4: Page monitoring
		print('\nπŸ“Œ Demo 4: Dynamic Page Monitoring')
		monitor_result = await assistant.monitor_page_changes(url='https://time.is/', duration=10, interval=3)
		if monitor_result.success:
			print(f'  πŸ“Š Collected {len(monitor_result.data)} snapshots')

		# Summary
		print('\n' + '=' * 50)
		print('πŸ“Š Session Summary')
		print('=' * 50)

		success_count = sum(1 for r in assistant.history if r.success)
		total_count = len(assistant.history)

		print(f'Total operations: {total_count}')
		print(f'Successful: {success_count}')
		print(f'Failed: {total_count - success_count}')
		print(f'Success rate: {success_count / total_count * 100:.1f}%')

	except Exception as e:
		print(f'\n❌ Fatal error: {e}')

	finally:
		# Always disconnect
		print('\n🧹 Cleaning up...')
		await assistant.disconnect_all()
		print('✨ Demo complete!')


if __name__ == '__main__':
	asyncio.run(main())