Spaces:
Paused
Paused
| import os | |
| import json | |
| import asyncio | |
| import aiohttp | |
| import logging | |
| import requests | |
| from typing import List, Dict, Any | |
| from datetime import datetime, timezone | |
| logger = logging.getLogger(__name__) | |
| # Client for interacting with the MCP service | |
| class MCPClient: | |
| def __init__(self): | |
| self.webhook_url = os.getenv("PIPEDREAM_WEBHOOK_URL") | |
| if not self.webhook_url: | |
| logger.warning("PIPEDREAM_WEBHOOK_URL not set in environment variables") | |
| # Set timeout for requests | |
| self.timeout = aiohttp.ClientTimeout(total=60) | |
| # Fetch app data from MCP service | |
| async def fetch_app_data( | |
| self, | |
| provider: str, | |
| services: List[str], | |
| query: str, | |
| user_id: str, | |
| access_token: str | |
| ) -> Dict[str, Any]: | |
| if not self.webhook_url: | |
| logger.error("Pipedream webhook URL not configured") | |
| return {"error": "Pipedream integration not configured"} | |
| # Add debugging | |
| print(f"=== MCP fetch_app_data called ===") | |
| print(f"Provider: {provider}") | |
| print(f"Services: {services}") | |
| print(f"Query: {query}") | |
| print(f"User ID: {user_id}") | |
| print(f"Access token exists: {bool(access_token)}") | |
| print(f"Access token length: {len(access_token) if access_token else 0}") | |
| print("==================================") | |
| # Check if token is None | |
| if not access_token: | |
| logger.error(f"No access token for {provider}! Cannot proceed.") | |
| return {"error": f"No authentication token for {provider}"} | |
| payload = { | |
| "provider": provider, | |
| "services": services, | |
| "query": query, | |
| "user_id": user_id, | |
| "token": access_token, | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| # Manually set headers | |
| headers = {'Content-Type': 'application/json'} | |
| print(f"Fetching {provider} data for services: {services}") | |
| print(f"Payload to send: {json.dumps({**payload, 'token': 'REDACTED' if payload['token'] else None}, indent=2)}") | |
| response = requests.post(self.webhook_url, json={}) | |
| print("Status Code:", response.status_code) | |
| print("Response text:", response.text) | |
| try: | |
| print("Parsed JSON:", response.json()) | |
| except Exception as e: | |
| print("JSON parse error:", e) | |
| # try: | |
| # async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
| # payload_str = json.dumps(payload) | |
| # async with session.post(self.webhook_url, json={'data': payload_str}, headers=headers) as response: | |
| # print(f"Response status: {response.status}") | |
| # print(f"Response headers: {dict(response.headers)}") | |
| # if response.status == 200: | |
| # # Handle potential empty or null responses from Pipedream | |
| # try: | |
| # data = await response.json() | |
| # print(f"Received data: {json.dumps(data, indent=2)}") | |
| # if data is None: | |
| # logger.warning("Pipedream returned a null response.") | |
| # return {"error": "Received no data from the provider."} | |
| # except aiohttp.ContentTypeError: | |
| # logger.error("Pipedream returned a non-JSON or empty response.") | |
| # return {"error": "Invalid response from the provider."} | |
| # # Check if any service within the data returned an auth error | |
| # auth_error = False | |
| # for service_key in data: | |
| # if isinstance(data[service_key], dict) and data[service_key].get('error'): | |
| # error_details = data[service_key].get('details', '') | |
| # if '401' in str(error_details) or 'authError' in str(error_details) or 'UNAUTHENTICATED' in str(error_details): | |
| # auth_error = True | |
| # break | |
| # if auth_error: | |
| # logger.error(f"Authentication failed for {provider}") | |
| # return {"error": f"Authentication failed for {provider}. Please reconnect your account."} | |
| # logger.info(f"Successfully fetched {provider} data") | |
| # return data | |
| # else: | |
| # error_text = await response.text() | |
| # logger.error(f"Pipedream request failed: {response.status} - {error_text}") | |
| # return {"error": f"Failed to fetch data: {response.status} - {error_text}"} | |
| # except asyncio.TimeoutError: | |
| # logger.error("Pipedream request timed out") | |
| # return {"error": "Request timed out. Please try again."} | |
| # except aiohttp.ClientError as e: | |
| # logger.error(f"Network error calling Pipedream: {str(e)}") | |
| # return {"error": "Network error. Please check your connection."} | |
| # except Exception as e: | |
| # # Catching TypeError if data is None from response.json() | |
| # logger.error(f"Unexpected error calling Pipedream: {str(e)}") | |
| # return {"error": "An unexpected error occurred"} | |
| # Format the raw app data into a context string for LLM | |
| def format_as_context(self, provider: str, data: Dict[str, Any]) -> str: | |
| if not data or "error" in data: | |
| return "" | |
| context = f"\n[{provider.upper()} APP DATA]\n" | |
| context += "=" * 50 + "\n" | |
| # Format based on provider | |
| if provider == "google": | |
| context += self._format_google_data(data) | |
| elif provider == "microsoft": | |
| context += self._format_microsoft_data(data) | |
| elif provider == "slack": | |
| context += self._format_slack_data(data) | |
| else: | |
| context += f"Unknown provider: {provider}\n" | |
| context += "=" * 50 + "\n" | |
| return context | |
| # Helper methods to format data for Google apps | |
| def _format_google_data(self, data: Dict[str, Any]) -> str: | |
| formatted = "" | |
| # Google Drive | |
| if "drive" in data and isinstance(data["drive"], dict) and "files" in data["drive"]: | |
| formatted += "\nπ GOOGLE DRIVE FILES:\n" | |
| formatted += "-" * 30 + "\n" | |
| files = data["drive"]["files"] | |
| if not files: | |
| formatted += "No files found matching the query.\n" | |
| else: | |
| for i, file in enumerate(files[:10], 1): # Limit to 10 files | |
| formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
| formatted += f" Type: {file.get('mimeType', 'Unknown')}\n" | |
| formatted += f" Modified: {file.get('modifiedTime', 'Unknown')}\n" | |
| if file.get('webViewLink'): | |
| formatted += f" Link: {file['webViewLink']}\n" | |
| if file.get('content'): | |
| content_preview = file['content'][:500] | |
| if len(file['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| formatted += "\n" | |
| # Gmail | |
| if "gmail" in data and isinstance(data["gmail"], dict) and "messages" in data["gmail"]: | |
| formatted += "\nπ§ GMAIL MESSAGES:\n" | |
| formatted += "-" * 30 + "\n" | |
| messages = data["gmail"]["messages"] | |
| if not messages: | |
| formatted += "No messages found matching the query.\n" | |
| else: | |
| for i, msg in enumerate(messages[:10], 1): | |
| formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" | |
| formatted += f" Subject: {msg.get('subject', 'No subject')}\n" | |
| body_preview = msg.get('body', '')[:300] | |
| if msg.get('body', '') and len(msg['body']) > 300: | |
| body_preview += "..." | |
| formatted += f" Preview: {body_preview}\n" | |
| # Google Calendar | |
| if "calendar" in data and isinstance(data["calendar"], dict) and "events" in data["calendar"]: | |
| formatted += "\nπ GOOGLE CALENDAR EVENTS:\n" | |
| formatted += "-" * 30 + "\n" | |
| events = data["calendar"]["events"] | |
| if not events: | |
| formatted += "No calendar events found matching the query.\n" | |
| else: | |
| for i, event in enumerate(events[:10], 1): | |
| formatted += f"\n{i}. Event: {event.get('summary', 'No title')}\n" | |
| formatted += f" Time: {event.get('start', 'Unknown')}\n" | |
| if event.get('location'): | |
| formatted += f" Location: {event['location']}\n" | |
| if event.get('description'): | |
| desc_preview = event['description'][:200] | |
| if len(event['description']) > 200: | |
| desc_preview += "..." | |
| formatted += f" Description: {desc_preview}\n" | |
| # Google Docs | |
| if "docs" in data and isinstance(data["docs"], dict) and "docs" in data["docs"]: | |
| formatted += "\nπ GOOGLE DOCS:\n" | |
| formatted += "-" * 30 + "\n" | |
| docs = data["docs"]["docs"] | |
| if not docs: | |
| formatted += "No documents found matching the query.\n" | |
| else: | |
| for i, doc in enumerate(docs[:5], 1): | |
| formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {doc.get('modifiedTime', 'Unknown')}\n" | |
| if doc.get('content'): | |
| content_preview = doc['content'][:500] | |
| if len(doc['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| # Google Sheets | |
| if "sheets" in data and isinstance(data["sheets"], dict) and "sheets" in data["sheets"]: | |
| formatted += "\nπ GOOGLE SHEETS:\n" | |
| formatted += "-" * 30 + "\n" | |
| sheets = data["sheets"]["sheets"] | |
| if not sheets: | |
| formatted += "No spreadsheets found matching the query.\n" | |
| else: | |
| for i, sheet in enumerate(sheets[:5], 1): | |
| formatted += f"\n{i}. Spreadsheet: {sheet.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {sheet.get('modifiedTime', 'Unknown')}\n" | |
| if sheet.get('content'): | |
| content_preview = sheet['content'][:300] | |
| if len(sheet['content']) > 300: | |
| content_preview += "..." | |
| formatted += f" Data Preview:\n {content_preview}\n" | |
| # Google Tasks | |
| if "tasks" in data and isinstance(data["tasks"], dict) and "tasks" in data["tasks"]: | |
| formatted += "\nβ GOOGLE TASKS:\n" | |
| formatted += "-" * 30 + "\n" | |
| tasks = data["tasks"]["tasks"] | |
| if not tasks: | |
| formatted += "No tasks found matching the query.\n" | |
| else: | |
| for i, task in enumerate(tasks[:10], 1): | |
| formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" | |
| formatted += f" List: {task.get('listTitle', 'Unknown')}\n" | |
| formatted += f" Status: {task.get('status', 'Unknown')}\n" | |
| if task.get('notes'): | |
| formatted += f" Notes: {task['notes'][:200]}...\n" | |
| if task.get('due'): | |
| formatted += f" Due: {task['due']}\n" | |
| # Add other Google services as needed | |
| return formatted | |
| # Helper methods to format data for Microsoft apps | |
| def _format_microsoft_data(self, data: Dict[str, Any]) -> str: | |
| formatted = "" | |
| # Word | |
| if "word" in data and isinstance(data["word"], dict) and "documents" in data["word"]: | |
| formatted += "\nπ MICROSOFT WORD DOCUMENTS:\n" | |
| formatted += "-" * 30 + "\n" | |
| documents = data["word"]["documents"] | |
| if not documents: | |
| formatted += "No documents found matching the query.\n" | |
| else: | |
| for i, doc in enumerate(documents[:5], 1): | |
| formatted += f"\n{i}. Document: {doc.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {doc.get('lastModifiedDateTime', 'Unknown')}\n" | |
| if doc.get('content'): | |
| content_preview = doc['content'][:500] | |
| if len(doc['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| # Excel | |
| if "excel" in data and isinstance(data["excel"], dict) and "workbooks" in data["excel"]: | |
| formatted += "\nπ MICROSOFT EXCEL WORKBOOKS:\n" | |
| formatted += "-" * 30 + "\n" | |
| workbooks = data["excel"]["workbooks"] | |
| if not workbooks: | |
| formatted += "No workbooks found matching the query.\n" | |
| else: | |
| for i, wb in enumerate(workbooks[:5], 1): | |
| formatted += f"\n{i}. Workbook: {wb.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {wb.get('lastModifiedDateTime', 'Unknown')}\n" | |
| if wb.get('content'): | |
| content_preview = wb['content'][:500] | |
| if len(wb['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| # PowerPoint | |
| if "powerpoint" in data and isinstance(data["powerpoint"], dict) and "presentations" in data["powerpoint"]: | |
| formatted += "\nπ MICROSOFT POWERPOINT PRESENTATIONS:\n" | |
| formatted += "-" * 30 + "\n" | |
| presentations = data["powerpoint"]["presentations"] | |
| if not presentations: | |
| formatted += "No presentations found matching the query.\n" | |
| else: | |
| for i, pres in enumerate(presentations[:5], 1): | |
| formatted += f"\n{i}. Presentation: {pres.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {pres.get('lastModifiedDateTime', 'Unknown')}\n" | |
| if pres.get('content'): | |
| content_preview = pres['content'][:500] | |
| if len(pres['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| # OneDrive/Files | |
| if "onedrive" in data and isinstance(data["onedrive"], dict) and "files" in data["onedrive"]: | |
| formatted += "\nπ ONEDRIVE FILES:\n" | |
| formatted += "-" * 30 + "\n" | |
| files = data["onedrive"]["files"] | |
| if not files: | |
| formatted += "No files found matching the query.\n" | |
| else: | |
| for i, file in enumerate(files[:10], 1): | |
| formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
| formatted += f" Modified: {file.get('lastModified', 'Unknown')}\n" | |
| if file.get('webUrl'): | |
| formatted += f" URL: {file['webUrl']}\n" | |
| if file.get('content'): | |
| content_preview = file['content'][:500] | |
| if len(file['content']) > 500: | |
| content_preview += "..." | |
| formatted += f" Content Preview:\n {content_preview}\n" | |
| # Outlook | |
| if "outlook" in data and isinstance(data["outlook"], dict) and "messages" in data["outlook"]: | |
| formatted += "\nπ§ OUTLOOK MESSAGES:\n" | |
| formatted += "-" * 30 + "\n" | |
| messages = data["outlook"]["messages"] | |
| if not messages: | |
| formatted += "No messages found matching the query.\n" | |
| else: | |
| for i, msg in enumerate(messages[:10], 1): | |
| formatted += f"\n{i}. From: {msg.get('from', 'Unknown')}\n" | |
| formatted += f" Subject: {msg.get('subject', 'No subject')}\n" | |
| body_preview = msg.get('body', '')[:300] | |
| if msg.get('body', '') and len(msg['body']) > 300: | |
| body_preview += "..." | |
| formatted += f" Preview: {body_preview}\n" | |
| # OneNote | |
| if "onenote" in data and isinstance(data["onenote"], dict) and "pages" in data["onenote"]: | |
| formatted += "\nπ ONENOTE PAGES:\n" | |
| formatted += "-" * 30 + "\n" | |
| pages = data["onenote"]["pages"] | |
| if not pages: | |
| formatted += "No pages found matching the query.\n" | |
| else: | |
| for i, page in enumerate(pages[:10], 1): | |
| formatted += f"\n{i}. Page: {page.get('title', 'Unknown')}\n" | |
| formatted += f" Section: {page.get('parentSection', 'Unknown')}\n" | |
| formatted += f" Modified: {page.get('lastModifiedDateTime', 'Unknown')}\n" | |
| if page.get('contentPreview'): | |
| formatted += f" Preview: {page['contentPreview'][:200]}...\n" | |
| # Microsoft To Do | |
| if "todo" in data and isinstance(data["todo"], dict) and "tasks" in data["todo"]: | |
| formatted += "\nβ MICROSOFT TO DO:\n" | |
| formatted += "-" * 30 + "\n" | |
| tasks = data["todo"]["tasks"] | |
| if not tasks: | |
| formatted += "No tasks found matching the query.\n" | |
| else: | |
| for i, task in enumerate(tasks[:10], 1): | |
| formatted += f"\n{i}. Task: {task.get('title', 'No title')}\n" | |
| formatted += f" List: {task.get('listName', 'Unknown')}\n" | |
| formatted += f" Status: {'Completed' if task.get('isCompleted') else 'Pending'}\n" | |
| if task.get('body', {}).get('content'): | |
| formatted += f" Notes: {task['body']['content'][:200]}...\n" | |
| if task.get('dueDateTime'): | |
| formatted += f" Due: {task['dueDateTime']['dateTime']}\n" | |
| # Exchange Calendar | |
| if "exchange" in data and isinstance(data["exchange"], dict) and "events" in data["exchange"]: | |
| formatted += "\nπ EXCHANGE CALENDAR:\n" | |
| formatted += "-" * 30 + "\n" | |
| events = data["exchange"]["events"] | |
| if not events: | |
| formatted += "No calendar events found matching the query.\n" | |
| else: | |
| for i, event in enumerate(events[:10], 1): | |
| formatted += f"\n{i}. Event: {event.get('subject', 'No subject')}\n" | |
| formatted += f" Start: {event.get('start', {}).get('dateTime', 'Unknown')}\n" | |
| formatted += f" End: {event.get('end', {}).get('dateTime', 'Unknown')}\n" | |
| if event.get('location', {}).get('displayName'): | |
| formatted += f" Location: {event['location']['displayName']}\n" | |
| if event.get('bodyPreview'): | |
| formatted += f" Preview: {event['bodyPreview'][:200]}...\n" | |
| return formatted | |
| # Helper methods to format data for Slack | |
| def _format_slack_data(self, data: Dict[str, Any]) -> str: | |
| formatted = "\n㪠SLACK MESSAGES:\n" | |
| formatted += "-" * 30 + "\n" | |
| if "messages" in data and isinstance(data["messages"], list): | |
| messages = data["messages"] | |
| if not messages: | |
| formatted += "No messages found matching the query.\n" | |
| else: | |
| for i, msg in enumerate(messages[:15], 1): | |
| formatted += f"\n{i}. User: {msg.get('user', 'Unknown')}\n" | |
| formatted += f" Channel: #{msg.get('channel', 'Unknown')}\n" | |
| formatted += f" Message: {msg.get('text', '')}\n" | |
| if msg.get('ts'): | |
| # Convert timestamp to readable format if needed | |
| formatted += f" Time: {msg['ts']}\n" | |
| # Slack channels with messages | |
| if "channels" in data and isinstance(data["channels"], list): | |
| formatted += "\nπ’ SLACK CHANNELS:\n" | |
| formatted += "-" * 30 + "\n" | |
| for channel in data["channels"][:10]: | |
| formatted += f"\nChannel: #{channel.get('name', 'Unknown')}\n" | |
| if channel.get('messages'): | |
| for msg in channel['messages'][:5]: | |
| formatted += f" β’ {msg.get('user', 'Unknown')}: {msg.get('text', '')}\n" | |
| # Slack files | |
| if "files" in data and isinstance(data["files"], list): | |
| formatted += "\nπ SLACK FILES:\n" | |
| formatted += "-" * 30 + "\n" | |
| files = data["files"] | |
| if not files: | |
| formatted += "No files found matching the query.\n" | |
| else: | |
| for i, file in enumerate(files[:10], 1): | |
| formatted += f"\n{i}. File: {file.get('name', 'Unknown')}\n" | |
| formatted += f" Type: {file.get('mimetype', 'Unknown')}\n" | |
| formatted += f" Size: {self._format_file_size(file.get('size', 0))}\n" | |
| if file.get('preview'): | |
| formatted += f" Preview: {file['preview'][:200]}...\n" | |
| return formatted | |
| # Helper method to format file sizes | |
| def _format_file_size(self, size_bytes: int) -> str: | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024 * 1024 * 1024: | |
| return f"{size_bytes / (1024 * 1024):.1f} MB" | |
| else: | |
| return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" | |
| if __name__ == "__main__": | |
| # Example usage | |
| client = MCPClient() | |
| print(asyncio.run(client.fetch_app_data( | |
| provider="google", | |
| services=["gmail"], | |
| query="summarize the information from my last 5 emails", | |
| user_id="4c1d92c5-ecec-45d5-b8fd-cb0ce5292403", | |
| access_token="ya29.a0AS3H6Nw9WnmYv7goOaxsZiwm6qDdaQq4h6tLwD69VVFPa6s7wwPYtzV3EgPIQHMnW_xRIpbcsDzTNmeOs-8gKhnB0RoW27Kuvv75eWcRed5BcWa08JWH5FFeNoSvzr_lZswEV1PZ4e5R4xNXSrtWmV4vJ-UPmwG48HIZn2lkaCgYKAW8SARcSFQHGX2MiUPjegzd64tClSBXeJNUPvw0175" | |
| ))) |