seekr / src /integrations /mcp_client.py
Hemang Thakur
demo is ready
44ebcd1
import os
import json
import asyncio
import aiohttp
import logging
import requests
from typing import List, Dict, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Client for interacting with the MCP service
class MCPClient:
def __init__(self):
self.webhook_url = os.getenv("PIPEDREAM_WEBHOOK_URL")
if not self.webhook_url:
logger.warning("PIPEDREAM_WEBHOOK_URL not set in environment variables")
# Set timeout for requests
self.timeout = aiohttp.ClientTimeout(total=60)
# Fetch app data from MCP service
async def fetch_app_data(
self,
provider: str,
services: List[str],
query: str,
user_id: str,
access_token: str
) -> Dict[str, Any]:
if not self.webhook_url:
logger.error("Pipedream webhook URL not configured")
return {"error": "Pipedream integration not configured"}
# Add debugging
print(f"=== MCP fetch_app_data called ===")
print(f"Provider: {provider}")
print(f"Services: {services}")
print(f"Query: {query}")
print(f"User ID: {user_id}")
print(f"Access token exists: {bool(access_token)}")
print(f"Access token length: {len(access_token) if access_token else 0}")
print("==================================")
# Check if token is None
if not access_token:
logger.error(f"No access token for {provider}! Cannot proceed.")
return {"error": f"No authentication token for {provider}"}
payload = {
"provider": provider,
"services": services,
"query": query,
"user_id": user_id,
"token": access_token,
"timestamp": datetime.now(timezone.utc).isoformat()
}
# Manually set headers
headers = {'Content-Type': 'application/json'}
print(f"Fetching {provider} data for services: {services}")
print(f"Payload to send: {json.dumps({**payload, 'token': 'REDACTED' if payload['token'] else None}, indent=2)}")
response = requests.post(self.webhook_url, json={})
print("Status Code:", response.status_code)
print("Response text:", response.text)
try:
print("Parsed JSON:", response.json())
except Exception as e:
print("JSON parse error:", e)
# try:
# async with aiohttp.ClientSession(timeout=self.timeout) as session:
# payload_str = json.dumps(payload)
# async with session.post(self.webhook_url, json={'data': payload_str}, headers=headers) as response:
# print(f"Response status: {response.status}")
# print(f"Response headers: {dict(response.headers)}")
# if response.status == 200:
# # Handle potential empty or null responses from Pipedream
# try:
# data = await response.json()
# print(f"Received data: {json.dumps(data, indent=2)}")
# if data is None:
# logger.warning("Pipedream returned a null response.")
# return {"error": "Received no data from the provider."}
# except aiohttp.ContentTypeError:
# logger.error("Pipedream returned a non-JSON or empty response.")
# return {"error": "Invalid response from the provider."}
# # Check if any service within the data returned an auth error
# auth_error = False
# for service_key in data:
# if isinstance(data[service_key], dict) and data[service_key].get('error'):
# error_details = data[service_key].get('details', '')
# if '401' in str(error_details) or 'authError' in str(error_details) or 'UNAUTHENTICATED' in str(error_details):
# auth_error = True
# break
# if auth_error:
# logger.error(f"Authentication failed for {provider}")
# return {"error": f"Authentication failed for {provider}. Please reconnect your account."}
# logger.info(f"Successfully fetched {provider} data")
# return data
# else:
# error_text = await response.text()
# logger.error(f"Pipedream request failed: {response.status} - {error_text}")
# return {"error": f"Failed to fetch data: {response.status} - {error_text}"}
# except asyncio.TimeoutError:
# logger.error("Pipedream request timed out")
# return {"error": "Request timed out. Please try again."}
# except aiohttp.ClientError as e:
# logger.error(f"Network error calling Pipedream: {str(e)}")
# return {"error": "Network error. Please check your connection."}
# except Exception as e:
# # Catching TypeError if data is None from response.json()
# logger.error(f"Unexpected error calling Pipedream: {str(e)}")
# return {"error": "An unexpected error occurred"}
# Format the raw app data into a context string for LLM
def format_as_context(self, provider: str, data: Dict[str, Any]) -> str:
if not data or "error" in data:
return ""
context = f"\n[{provider.upper()} APP DATA]\n"
context += "=" * 50 + "\n"
# Format based on provider
if provider == "google":
context += self._format_google_data(data)
elif provider == "microsoft":
context += self._format_microsoft_data(data)
elif provider == "slack":
context += self._format_slack_data(data)
else:
context += f"Unknown provider: {provider}\n"
context += "=" * 50 + "\n"
return context
# Helper methods to format data for Google apps
def _format_google_data(self, data: Dict[str, Any]) -> str:
formatted = ""
# Google Drive
if "drive" in data and isinstance(data["drive"], dict) and "files" in data["drive"]:
formatted += "\nπŸ“ GOOGLE DRIVE FILES:\n"
formatted += "-" * 30 + "\n"
files = data["drive"]["files"]
if not files:
formatted += "No files found matching the query.\n"
else:
for i, file in enumerate(files[:10], 1): # Limit to 10 files
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n"
formatted += f" Type: {file.get('mimeType', 'Unknown')}\n"
formatted += f" Modified: {file.get('modifiedTime', 'Unknown')}\n"
if file.get('webViewLink'):
formatted += f" Link: {file['webViewLink']}\n"
if file.get('content'):
content_preview = file['content'][:500]
if len(file['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
formatted += "\n"
# Gmail
if "gmail" in data and isinstance(data["gmail"], dict) and "messages" in data["gmail"]:
formatted += "\nπŸ“§ GMAIL MESSAGES:\n"
formatted += "-" * 30 + "\n"
messages = data["gmail"]["messages"]
if not messages:
formatted += "No messages found matching the query.\n"
else:
for i, msg in enumerate(messages[:10], 1):
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n"
formatted += f" Subject: {msg.get('subject', 'No subject')}\n"
body_preview = msg.get('body', '')[:300]
if msg.get('body', '') and len(msg['body']) > 300:
body_preview += "..."
formatted += f" Preview: {body_preview}\n"
# Google Calendar
if "calendar" in data and isinstance(data["calendar"], dict) and "events" in data["calendar"]:
formatted += "\nπŸ“… GOOGLE CALENDAR EVENTS:\n"
formatted += "-" * 30 + "\n"
events = data["calendar"]["events"]
if not events:
formatted += "No calendar events found matching the query.\n"
else:
for i, event in enumerate(events[:10], 1):
formatted += f"\n{i}. Event: {event.get('summary', 'No title')}\n"
formatted += f" Time: {event.get('start', 'Unknown')}\n"
if event.get('location'):
formatted += f" Location: {event['location']}\n"
if event.get('description'):
desc_preview = event['description'][:200]
if len(event['description']) > 200:
desc_preview += "..."
formatted += f" Description: {desc_preview}\n"
# Google Docs
if "docs" in data and isinstance(data["docs"], dict) and "docs" in data["docs"]:
formatted += "\nπŸ“„ GOOGLE DOCS:\n"
formatted += "-" * 30 + "\n"
docs = data["docs"]["docs"]
if not docs:
formatted += "No documents found matching the query.\n"
else:
for i, doc in enumerate(docs[:5], 1):
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n"
formatted += f" Modified: {doc.get('modifiedTime', 'Unknown')}\n"
if doc.get('content'):
content_preview = doc['content'][:500]
if len(doc['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
# Google Sheets
if "sheets" in data and isinstance(data["sheets"], dict) and "sheets" in data["sheets"]:
formatted += "\nπŸ“Š GOOGLE SHEETS:\n"
formatted += "-" * 30 + "\n"
sheets = data["sheets"]["sheets"]
if not sheets:
formatted += "No spreadsheets found matching the query.\n"
else:
for i, sheet in enumerate(sheets[:5], 1):
formatted += f"\n{i}. Spreadsheet: {sheet.get('name', 'Unknown')}\n"
formatted += f" Modified: {sheet.get('modifiedTime', 'Unknown')}\n"
if sheet.get('content'):
content_preview = sheet['content'][:300]
if len(sheet['content']) > 300:
content_preview += "..."
formatted += f" Data Preview:\n {content_preview}\n"
# Google Tasks
if "tasks" in data and isinstance(data["tasks"], dict) and "tasks" in data["tasks"]:
formatted += "\nβœ… GOOGLE TASKS:\n"
formatted += "-" * 30 + "\n"
tasks = data["tasks"]["tasks"]
if not tasks:
formatted += "No tasks found matching the query.\n"
else:
for i, task in enumerate(tasks[:10], 1):
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n"
formatted += f" List: {task.get('listTitle', 'Unknown')}\n"
formatted += f" Status: {task.get('status', 'Unknown')}\n"
if task.get('notes'):
formatted += f" Notes: {task['notes'][:200]}...\n"
if task.get('due'):
formatted += f" Due: {task['due']}\n"
# Add other Google services as needed
return formatted
# Helper methods to format data for Microsoft apps
def _format_microsoft_data(self, data: Dict[str, Any]) -> str:
formatted = ""
# Word
if "word" in data and isinstance(data["word"], dict) and "documents" in data["word"]:
formatted += "\nπŸ“„ MICROSOFT WORD DOCUMENTS:\n"
formatted += "-" * 30 + "\n"
documents = data["word"]["documents"]
if not documents:
formatted += "No documents found matching the query.\n"
else:
for i, doc in enumerate(documents[:5], 1):
formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n"
formatted += f" Modified: {doc.get('lastModifiedDateTime', 'Unknown')}\n"
if doc.get('content'):
content_preview = doc['content'][:500]
if len(doc['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
# Excel
if "excel" in data and isinstance(data["excel"], dict) and "workbooks" in data["excel"]:
formatted += "\nπŸ“Š MICROSOFT EXCEL WORKBOOKS:\n"
formatted += "-" * 30 + "\n"
workbooks = data["excel"]["workbooks"]
if not workbooks:
formatted += "No workbooks found matching the query.\n"
else:
for i, wb in enumerate(workbooks[:5], 1):
formatted += f"\n{i}. Workbook: {wb.get('name', 'Unknown')}\n"
formatted += f" Modified: {wb.get('lastModifiedDateTime', 'Unknown')}\n"
if wb.get('content'):
content_preview = wb['content'][:500]
if len(wb['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
# PowerPoint
if "powerpoint" in data and isinstance(data["powerpoint"], dict) and "presentations" in data["powerpoint"]:
formatted += "\nπŸ“Š MICROSOFT POWERPOINT PRESENTATIONS:\n"
formatted += "-" * 30 + "\n"
presentations = data["powerpoint"]["presentations"]
if not presentations:
formatted += "No presentations found matching the query.\n"
else:
for i, pres in enumerate(presentations[:5], 1):
formatted += f"\n{i}. Presentation: {pres.get('name', 'Unknown')}\n"
formatted += f" Modified: {pres.get('lastModifiedDateTime', 'Unknown')}\n"
if pres.get('content'):
content_preview = pres['content'][:500]
if len(pres['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
# OneDrive/Files
if "onedrive" in data and isinstance(data["onedrive"], dict) and "files" in data["onedrive"]:
formatted += "\nπŸ“ ONEDRIVE FILES:\n"
formatted += "-" * 30 + "\n"
files = data["onedrive"]["files"]
if not files:
formatted += "No files found matching the query.\n"
else:
for i, file in enumerate(files[:10], 1):
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n"
formatted += f" Modified: {file.get('lastModified', 'Unknown')}\n"
if file.get('webUrl'):
formatted += f" URL: {file['webUrl']}\n"
if file.get('content'):
content_preview = file['content'][:500]
if len(file['content']) > 500:
content_preview += "..."
formatted += f" Content Preview:\n {content_preview}\n"
# Outlook
if "outlook" in data and isinstance(data["outlook"], dict) and "messages" in data["outlook"]:
formatted += "\nπŸ“§ OUTLOOK MESSAGES:\n"
formatted += "-" * 30 + "\n"
messages = data["outlook"]["messages"]
if not messages:
formatted += "No messages found matching the query.\n"
else:
for i, msg in enumerate(messages[:10], 1):
formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n"
formatted += f" Subject: {msg.get('subject', 'No subject')}\n"
body_preview = msg.get('body', '')[:300]
if msg.get('body', '') and len(msg['body']) > 300:
body_preview += "..."
formatted += f" Preview: {body_preview}\n"
# OneNote
if "onenote" in data and isinstance(data["onenote"], dict) and "pages" in data["onenote"]:
formatted += "\nπŸ““ ONENOTE PAGES:\n"
formatted += "-" * 30 + "\n"
pages = data["onenote"]["pages"]
if not pages:
formatted += "No pages found matching the query.\n"
else:
for i, page in enumerate(pages[:10], 1):
formatted += f"\n{i}. Page: {page.get('title', 'Unknown')}\n"
formatted += f" Section: {page.get('parentSection', 'Unknown')}\n"
formatted += f" Modified: {page.get('lastModifiedDateTime', 'Unknown')}\n"
if page.get('contentPreview'):
formatted += f" Preview: {page['contentPreview'][:200]}...\n"
# Microsoft To Do
if "todo" in data and isinstance(data["todo"], dict) and "tasks" in data["todo"]:
formatted += "\nβœ… MICROSOFT TO DO:\n"
formatted += "-" * 30 + "\n"
tasks = data["todo"]["tasks"]
if not tasks:
formatted += "No tasks found matching the query.\n"
else:
for i, task in enumerate(tasks[:10], 1):
formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n"
formatted += f" List: {task.get('listName', 'Unknown')}\n"
formatted += f" Status: {'Completed' if task.get('isCompleted') else 'Pending'}\n"
if task.get('body', {}).get('content'):
formatted += f" Notes: {task['body']['content'][:200]}...\n"
if task.get('dueDateTime'):
formatted += f" Due: {task['dueDateTime']['dateTime']}\n"
# Exchange Calendar
if "exchange" in data and isinstance(data["exchange"], dict) and "events" in data["exchange"]:
formatted += "\nπŸ“… EXCHANGE CALENDAR:\n"
formatted += "-" * 30 + "\n"
events = data["exchange"]["events"]
if not events:
formatted += "No calendar events found matching the query.\n"
else:
for i, event in enumerate(events[:10], 1):
formatted += f"\n{i}. Event: {event.get('subject', 'No subject')}\n"
formatted += f" Start: {event.get('start', {}).get('dateTime', 'Unknown')}\n"
formatted += f" End: {event.get('end', {}).get('dateTime', 'Unknown')}\n"
if event.get('location', {}).get('displayName'):
formatted += f" Location: {event['location']['displayName']}\n"
if event.get('bodyPreview'):
formatted += f" Preview: {event['bodyPreview'][:200]}...\n"
return formatted
# Helper methods to format data for Slack
def _format_slack_data(self, data: Dict[str, Any]) -> str:
formatted = "\nπŸ’¬ SLACK MESSAGES:\n"
formatted += "-" * 30 + "\n"
if "messages" in data and isinstance(data["messages"], list):
messages = data["messages"]
if not messages:
formatted += "No messages found matching the query.\n"
else:
for i, msg in enumerate(messages[:15], 1):
formatted += f"\n{i}. User: {msg.get('user', 'Unknown')}\n"
formatted += f" Channel: #{msg.get('channel', 'Unknown')}\n"
formatted += f" Message: {msg.get('text', '')}\n"
if msg.get('ts'):
# Convert timestamp to readable format if needed
formatted += f" Time: {msg['ts']}\n"
# Slack channels with messages
if "channels" in data and isinstance(data["channels"], list):
formatted += "\nπŸ“’ SLACK CHANNELS:\n"
formatted += "-" * 30 + "\n"
for channel in data["channels"][:10]:
formatted += f"\nChannel: #{channel.get('name', 'Unknown')}\n"
if channel.get('messages'):
for msg in channel['messages'][:5]:
formatted += f" β€’ {msg.get('user', 'Unknown')}: {msg.get('text', '')}\n"
# Slack files
if "files" in data and isinstance(data["files"], list):
formatted += "\nπŸ“Ž SLACK FILES:\n"
formatted += "-" * 30 + "\n"
files = data["files"]
if not files:
formatted += "No files found matching the query.\n"
else:
for i, file in enumerate(files[:10], 1):
formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n"
formatted += f" Type: {file.get('mimetype', 'Unknown')}\n"
formatted += f" Size: {self._format_file_size(file.get('size', 0))}\n"
if file.get('preview'):
formatted += f" Preview: {file['preview'][:200]}...\n"
return formatted
# Helper method to format file sizes
def _format_file_size(self, size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
if __name__ == "__main__":
# Example usage
client = MCPClient()
print(asyncio.run(client.fetch_app_data(
provider="google",
services=["gmail"],
query="summarize the information from my last 5 emails",
user_id="4c1d92c5-ecec-45d5-b8fd-cb0ce5292403",
access_token="ya29.a0AS3H6Nw9WnmYv7goOaxsZiwm6qDdaQq4h6tLwD69VVFPa6s7wwPYtzV3EgPIQHMnW_xRIpbcsDzTNmeOs-8gKhnB0RoW27Kuvv75eWcRed5BcWa08JWH5FFeNoSvzr_lZswEV1PZ4e5R4xNXSrtWmV4vJ-UPmwG48HIZn2lkaCgYKAW8SARcSFQHGX2MiUPjegzd64tClSBXeJNUPvw0175"
)))