Slack_App / all_tools.py
ujalaarshad17's picture
first commit
93dae80 verified
# import json
# import os
# import requests
# import base64
# import msal
# import time
# from typing import Type, Optional, List
# from dotenv import load_dotenv
# from pydantic.v1 import BaseModel, Field
# from msal import ConfidentialClientApplication
# from langchain_core.tools import BaseTool
# from slack_sdk.errors import SlackApiError
# from datetime import datetime
# from google_auth_oauthlib.flow import InstalledAppFlow
# from googleapiclient.discovery import build
# from google.oauth2.credentials import Credentials
# from google.auth.transport.requests import Request
# from services import construct_google_calendar_client
# from config import client
# from datetime import datetime, timedelta
# import pytz
# from typing import List, Dict, Optional
# from collections import defaultdict
# from slack_sdk.errors import SlackApiError
# from config import owner_id_pref, all_users_preload, GetAllUsers
# load_dotenv()
# calendar_service = None
# MICROSOFT_CLIENT_SECRET = os.getenv("MICROSOFT_CLIENT_SECRET")
# MICROSOFT_AUTHORITY = "https://login.microsoftonline.com/common"
# MICROSOFT_SCOPES = ["User.Read", "Calendars.ReadWrite"]
# MICROSOFT_REDIRECT_URI = os.getenv("MICROSOFT_REDIRECT_URI", "https://clear-muskox-grand.ngrok-free.app/microsoft_callback")
# # Enhanced GetAllUsers function (not a tool) to fetch email as well
# MICROSOFT_CLIENT_ID = "855e4571-d92a-4d51-802e-e712a879c00b"
# # Pydantic models for tool arguments
# class DirectDMArgs(BaseModel):
# message: str = Field(description="The message to be sent to the Slack user")
# user_id: str = Field(description="The Slack user ID")
# class DateTimeTool(BaseTool):
# name: str = "current_date_time"
# description: str = "Provides the current date and time."
# def _run(self):
# return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# # Tool to get a single user's Slack ID based on their name
# class GetSingleUserSlackIDArgs(BaseModel):
# name: str = Field(description="The real name of the user whose Slack ID is needed")
# class GetSingleUserSlackID(BaseTool):
# name: str = "gets_slack_id_single_user"
# description: str = "Gets the Slack ID of a user based on their real name"
# args_schema: Type[BaseModel] = GetSingleUserSlackIDArgs
# def _run(self, name: str):
# if not all_users_preload:
# print("Getting the users")
# all_users = GetAllUsers() # Fetch all users again
# else:
# print("Fetching the users")
# all_users = all_users_preload
# # Iterate through all_users to find a matching name
# for uid, info in all_users.items():
# if info["name"].lower() == name.lower():
# return uid, info['email']
# return "User not found"
# # Tool to get a single user's Slack name based on their ID
# class GetSingleUserSlackNameArgs(BaseModel):
# id: str = Field(description="The Slack user ID")
# class GetSingleUserSlackName(BaseTool):
# name: str = "gets_slack_name_single_user"
# description: str = "Gets the Slack real name of a user based on their slack ID"
# args_schema: Type[BaseModel] = GetSingleUserSlackNameArgs
# def _run(self, id: str):
# # Check if preload returns empty dict or "User not found"
# if not all_users_preload or all_users_preload == {}:
# all_users = GetAllUsers() # Fetch all users again
# else:
# all_users = all_users_preload
# user = all_users.get(id)
# print(all_users)
# if user:
# return user["name"], user['email']
# return "User not found"
# class MultiDMArgs(BaseModel):
# message: str
# user_ids: List[str]
# class MultiDirectDMTool(BaseTool):
# name: str = "send_multiple_dms"
# description: str = "Sends direct messages to multiple Slack users"
# args_schema: Type[BaseModel] = MultiDMArgs
# def _run(self, message: str, user_ids: List[str]):
# results = {}
# for user_id in user_ids:
# try:
# client.chat_postMessage(channel=user_id, text=message)
# results[user_id] = "Message sent successfully"
# except SlackApiError as e:
# results[user_id] = f"Error: {e.response['error']}"
# return results
# # Direct DM tool for sending messages within Slack
# class DirectDMTool(BaseTool):
# name: str = "send_direct_dm"
# description: str = "Sends direct messages to Slack users"
# args_schema: Type[BaseModel] = DirectDMArgs
# def _run(self, message: str, user_id: str):
# try:
# client.chat_postMessage(channel=user_id, text=message)
# return "Message sent successfully"
# except SlackApiError as e:
# return f"Error sending message: {e.response['error']}"
# def send_dm(user_id: str, message: str) -> bool:
# """Send a direct message to a user"""
# try:
# client.chat_postMessage(channel=user_id, text=message)
# return True
# except SlackApiError as e:
# print(f"Error sending DM: {e.response['error']}")
# return False
# def handle_event_modification(event_id: str, action: str) -> str:
# """Handle event modification (update/delete)"""
# # You'll need to implement the actual modification logic
# if action == "delete":
# result = GoogleDeleteCalendarEvent().run(event_id=event_id)
# else:
# result = GoogleUpdateCalendarEvent().run(event_id=event_id)
# if result.get("status") == "success":
# return f"Event {action}d successfully!"
# return f"Failed to {action} event: {result.get('message', 'Unknown error')}"
# # Google Calendar Tools
# PT = pytz.timezone('America/Los_Angeles')
# def convert_to_pt(dt: datetime) -> datetime:
# """Convert a datetime object to PT timezone"""
# if dt.tzinfo is None:
# dt = pytz.utc.localize(dt)
# return dt.astimezone(PT)
# class GoogleCalendarList(BaseTool):
# name: str = "list_calendar_list"
# description: str = "Lists available calendars in the user's Google Calendar account"
# def _run(self, user_id: str, max_capacity: int = 200):
# if not calendar_service:
# calendar_service = construct_google_calendar_client(user_id)
# else:
# return "Token should be refreshed in Google Calendar"
# all_calendars = []
# next_page_token = None
# capacity_tracker = 0
# while capacity_tracker < max_capacity:
# results = calendar_service.calendarList().list(
# maxResults=min(200, max_capacity - capacity_tracker),
# pageToken=next_page_token
# ).execute()
# calendars = results.get('items', [])
# all_calendars.extend(calendars)
# capacity_tracker += len(calendars)
# next_page_token = results.get('nextPageToken')
# if not next_page_token:
# break
# return [{
# 'id': cal['id'],
# 'name': cal['summary'],
# 'description': cal.get('description', '')
# } for cal in all_calendars]
# class GoogleCalendarEvents(BaseTool):
# name: str = "list_calendar_events"
# description: str = "Lists and gets events from a specific Google Calendar"
# def _run(self, user_id: str, calendar_id: str = "primary", max_capacity: int = 20):
# calendar_service = construct_google_calendar_client(user_id)
# all_events = []
# next_page_token = None
# capacity_tracker = 0
# while capacity_tracker < max_capacity:
# results = calendar_service.events().list(
# calendarId=calendar_id,
# maxResults=min(250, max_capacity - capacity_tracker),
# pageToken=next_page_token
# ).execute()
# events = results.get('items', [])
# all_events.extend(events)
# capacity_tracker += len(events)
# next_page_token = results.get('nextPageToken')
# if not next_page_token:
# break
# return all_events
# class GoogleCreateCalendar(BaseTool):
# name: str = "create_calendar_list"
# description: str = "Creates a new calendar in Google Calendar"
# def _run(self, user_id: str, calendar_name: str):
# calendar_service = construct_google_calendar_client(user_id)
# calendar_body = {'summary': calendar_name}
# created_calendar = calendar_service.calendars().insert(body=calendar_body).execute()
# return f"Created calendar: {created_calendar['id']}"
# # Updated Event Creation Tool with guest options, meeting agenda, and invite link support
# class GoogleAddCalendarEventArgs(BaseModel):
# calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
# summary: str = Field(description="Event title (should include meeting agenda if needed)")
# user_id: str = Field(default="user", description="User slack Id which should be matched to name")
# description: str = Field(default="", description="Event description or agenda")
# start_time: str = Field(description="Start time in ISO 8601 format")
# end_time: str = Field(description="End time in ISO 8601 format")
# location: str = Field(default="", description="Event location")
# invite_link: str = Field(default="", description="Invite link for the meeting")
# guests: List[str] = Field(default=None, description="List of guest emails to invite")
# class GoogleAddCalendarEvent(BaseTool):
# name: str = "google_add_calendar_event"
# description: str = "Creates an event in a Google Calendar with comprehensive meeting details and guest options"
# args_schema: Type[BaseModel] = GoogleAddCalendarEventArgs
# def _run(self, user_id: str, summary: str, start_time: str, end_time: str,
# description: str = "", calendar_id: str = 'primary', location: str = "",
# invite_link: str = "", guests: List[str] = None):
# calendar_service = construct_google_calendar_client(user_id)
# # Append invite link to description if provided
# if invite_link:
# description = f"{description}\nInvite Link: {invite_link}"
# event = {
# 'summary': summary,
# 'description': description,
# 'start': {'dateTime': start_time, 'timeZone': 'America/Los_Angeles'},
# 'end': {'dateTime': end_time, 'timeZone': 'America/Los_Angeles'},
# 'location': location,
# }
# # Add guests if provided
# if guests:
# event['attendees'] = [{'email': guest} for guest in guests]
# try:
# print("I am here registering the event")
# created_event = calendar_service.events().insert(
# calendarId=calendar_id,
# body=event,
# sendUpdates='all' # Send invitations to guests
# ).execute()
# return {
# "status": "success",
# "event_id": created_event['id'],
# "link": created_event.get('htmlLink', '')
# }
# except Exception as e:
# return {"status": "error", "message": str(e)}
# # Updated Tool: Update an existing calendar event including guest options
# class GoogleUpdateCalendarEventArgs(BaseModel):
# calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
# user_id: str = Field(default="user", description="User slack Id which should be matched to name")
# event_id: str = Field(description="The event ID to update")
# summary: str = Field(default=None, description="Updated event title or agenda")
# description: Optional[str] = Field(default=None, description="Updated event description or agenda")
# start_time: Optional[str] = Field(default=None, description="Updated start time in ISO 8601 format")
# end_time: Optional[str] = Field(default=None, description="Updated end time in ISO 8601 format")
# location: Optional[str] = Field(default=None, description="Updated event location")
# invite_link: str = Field(default=None, description="Updated invite link for the meeting")
# guests: List[str] = Field(default=None, description="Updated list of guest emails")
# class GoogleUpdateCalendarEvent(BaseTool):
# name: str = "google_update_calendar_event"
# description: str = "Updates an existing event in a Google Calendar, including guest options"
# args_schema: Type[BaseModel] = GoogleUpdateCalendarEventArgs
# def _run(self, user_id: str, event_id: str, calendar_id: str = "primary",
# summary: Optional[str] = None, description: Optional[str] = None,
# start_time: Optional[str] = None, end_time: Optional[str] = None,
# location: Optional[str] = None, invite_link: Optional[str] = None,
# guests: Optional[List[str]] = None):
# calendar_service = construct_google_calendar_client(user_id)
# # Retrieve the existing event
# try:
# event = calendar_service.events().get(calendarId=calendar_id, eventId=event_id).execute()
# except Exception as e:
# return {"status": "error", "message": f"Event retrieval failed: {str(e)}"}
# # Update fields if provided
# if summary:
# event['summary'] = summary
# if description:
# event['description'] = description
# if invite_link:
# # Append invite link to the description
# current_desc = event.get('description', '')
# event['description'] = f"{current_desc}\nInvite Link: {invite_link}"
# if start_time:
# event['start'] = {'dateTime': start_time, 'timeZone': 'UTC'}
# if end_time:
# event['end'] = {'dateTime': end_time, 'timeZone': 'UTC'}
# if location:
# event['location'] = location
# if guests is not None:
# event['attendees'] = [{'email': guest} for guest in guests]
# try:
# updated_event = calendar_service.events().update(
# calendarId=calendar_id,
# eventId=event_id,
# body=event
# ).execute()
# return {"status": "success", "event_id": updated_event['id']}
# except Exception as e:
# return {"status": "error", "message": f"Update failed: {str(e)}"}
# # New Tool: Delete a calendar event
# class GoogleDeleteCalendarEventArgs(BaseModel):
# calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
# event_id: str = Field(description="The event ID to delete")
# user_id: str = Field(default="user", description="User slack Id which should be matched to name")
# class GoogleDeleteCalendarEvent(BaseTool):
# name: str = "google_delete_calendar_event"
# description: str = "Deletes an event from a Google Calendar"
# args_schema: Type[BaseModel] = GoogleDeleteCalendarEventArgs
# def _run(self, user_id: str, event_id: str, calendar_id: str = "primary"):
# calendar_service = construct_google_calendar_client(user_id)
# try:
# calendar_service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
# return {"status": "success", "message": f"Deleted event {event_id}"}
# except Exception as e:
# return {"status": "error", "message": f"Deletion failed: {str(e)}"}
# # New Tool: Search Events by User
# class SearchUserEventsArgs(BaseModel):
# user_id: str
# lookback_days: int = Field(default=30)
# class SearchUserEventsTool(BaseTool):
# name: str = "search_events_by_user"
# description: str = "Finds calendar events associated with a specific user"
# args_schema: Type[BaseModel] = SearchUserEventsArgs
# def _run(self, user_id: str, lookback_days: int = 30):
# # Get user info which may be a tuple (name, email) or "User not found"
# user_info = GetSingleUserSlackName().run(user_id)
# # Handle case where user is not found
# if user_info == "User not found":
# return []
# # Extract user_name from tuple
# user_name, user_email = user_info
# # Fetch events for the user
# events = GoogleCalendarEvents().run(user_id)
# # Ensure 'now' is timezone-aware (using UTC)
# now = datetime.now(pytz.UTC)
# relevant_events = []
# for event in events:
# # Ensure event_time is timezone-aware
# event_time_str = event['start'].get('dateTime')
# if not event_time_str:
# continue # Skip events without a valid start time
# event_time = datetime.fromisoformat(event_time_str)
# if event_time.tzinfo is None:
# # If event_time is naive, make it timezone-aware (assuming UTC)
# event_time = pytz.UTC.localize(event_time)
# # Check if the event is within the lookback period
# if (now - event_time).days > lookback_days:
# continue
# # Check if the user's name is in the event summary or description
# if user_name in event.get('summary', '') or user_name in event.get('description', ''):
# relevant_events.append({
# 'id': event['id'],
# 'title': event['summary'],
# 'time': event_time.strftime("%Y-%m-%d %H:%M"),
# 'calendar_id': event['organizer']['email']
# })
# return relevant_events
# # Enhanced Agent Logic
# def handle_update_delete(user_id, text):
# events = SearchUserEventsTool().run(user_id)
# if not events:
# return "No recent events found for you."
# if len(events) > 1:
# options = [{"text": f"{e['title']} ({e['time']})", "value": e['id']} for e in events]
# return {
# "response_type": "ephemeral",
# "blocks": [{
# "type": "section",
# "text": {"type": "mrkdwn", "text": "Multiple events found:"},
# "accessory": {
# "type": "static_select",
# "options": options,
# "action_id": "select_event_to_modify"
# }
# }]
# }
# # If single event, proceed directly
# return handle_event_modification(events[0]['id'])
# # State Management
# from collections import defaultdict
# meeting_coordination = defaultdict(dict)
# class CreatePollArgs(BaseModel):
# time_slots: List[str]
# channel_id: str
# initiator_id: str
# class CoordinateDMsArgs(BaseModel):
# user_ids: List[str]
# time_slots: List[str]
# # Poll Creation Tool
# class CoordinateDMsTool(BaseTool):
# name:str = "coordinate_dm_responses"
# description:str = "Manages DM responses for meeting coordination"
# args_schema: Type[BaseModel] = CoordinateDMsArgs
# def _run(self, user_ids: List[str], time_slots: List[str]) -> Dict:
# session_id = f"dm_coord_{datetime.now().timestamp()}"
# message = "Please choose a time slot by replying with the number:\n" + \
# "\n".join([f"{i+1}. {slot}" for i, slot in enumerate(time_slots)])
# for uid in user_ids:
# if not send_dm(uid, message):
# return {"status": "error", "message": f"Failed to send DM to user {uid}"}
# meeting_coordination[session_id] = {
# "responses": {},
# "required": len(user_ids),
# "slots": time_slots,
# "participants": user_ids,
# "created_at": datetime.now()
# }
# return {
# "status": "success",
# "session_id": session_id,
# "message": "DMs sent to all participants"
# }
# # Poll Management
# # Zoom Meeting Tool
# # class ZoomCreateMeetingArgs(BaseModel):
# # topic: str = Field(description="Meeting topic")
# # start_time: str = Field(description="Start time in ISO 8601 format and PT timezone")
# # duration: int = Field(description="Duration in minutes")
# # agenda: Optional[str] = Field(default="", description="Meeting agenda")
# # timezone: str = Field(default="UTC", description="Timezone for the meeting")
# # class ZoomCreateMeetingTool(BaseTool):
# # name:str = "create_zoom_meeting"
# # description:str = "Creates a Zoom meeting using configured credentials"
# # args_schema: Type[BaseModel] = ZoomCreateMeetingArgs
# # def _run(self, topic: str, start_time: str, duration: int = 30,
# # agenda: str = "", timezone: str = "UTC"):
# # # Get owner's credentials
# # owner_id = "owner_id_pref"
# # if not owner_id:
# # return "Workspace owner not found"
# # prefs_path = os.path.join('preferences', f'preferences_{owner_id}.json')
# # if not os.path.exists(prefs_path):
# # return "Zoom credentials not configured"
# # with open(prefs_path) as f:
# # prefs = json.load(f)
# # if prefs['zoom_config']['mode'] == 'manual':
# # return {"link": prefs.get('zoom_link'), "message": "Using manual Zoom link"}
# # # Automatic Zoom creation
# # auth_str = f"{prefs['zoom_config']['client_id']}:{prefs['zoom_config']['client_secret']}"
# # auth_bytes = base64.b64encode(auth_str.encode()).decode()
# # headers = {
# # "Authorization": f"Basic {auth_bytes}",
# # "Content-Type": "application/x-www-form-urlencoded"
# # }
# # data = {
# # "grant_type": "account_credentials",
# # "account_id": prefs['zoom_config']['account_id']
# # }
# # # Get access token
# # token_res = requests.post(
# # "https://zoom.us/oauth/token",
# # headers=headers,
# # data=data
# # )
# # if token_res.status_code != 200:
# # return "Zoom authentication failed"
# # access_token = token_res.json()["access_token"]
# # # Create meeting
# # meeting_data = {
# # "topic": topic,
# # "type": 2,
# # "start_time": start_time,
# # "duration": duration,
# # "timezone": timezone,
# # "agenda": agenda,
# # "settings": {
# # "host_video": True,
# # "participant_video": True,
# # "join_before_host": False
# # }
# # }
# # headers = {
# # "Authorization": f"Bearer {access_token}",
# # "Content-Type": "application/json"
# # }
# # meeting_res = requests.post(
# # "https://api.zoom.us/v2/users/me/meetings",
# # headers=headers,
# # json=meeting_data
# # )
# # if meeting_res.status_code == 201:
# # return {
# # "meeting_id": meeting_res.json()["id"],
# # "join_url": meeting_res.json()["join_url"],
# # 'passcode':meeting_res.json()["password"],
# # 'duration':duration
# # }
# # return f"Zoom meeting creation failed: {meeting_res.text}"
# from langchain.tools import BaseTool
# from pydantic import BaseModel, Field
# from typing import Optional, Type
# import os
# import json
# import time
# import requests
# import base64
# class ZoomCreateMeetingArgs(BaseModel):
# topic: str = Field(description="Meeting topic")
# start_time: str = Field(description="Start time in ISO 8601 format and PT timezone")
# duration: int = Field(description="Duration in minutes")
# agenda: Optional[str] = Field(default="", description="Meeting agenda")
# timezone: str = Field(default="UTC", description="Timezone for the meeting")
# from config import get_workspace_owner_id, load_preferences
# import os
# # ZOOM_REDIRECT_URI = os.environ['ZOOM_REDIRECT_URI']
# CLIENT_SECRET = os.environ['ZOOM_CLIENT_SECRET']
# CLIENT_ID = os.environ['ZOOM_CLIENT_ID']
# ZOOM_TOKEN_API = os.environ['ZOOM_TOKEN_API']
# # ZOOM_OAUTH_AUTHORIZE_API = os.environ['ZOOM_OAUTH_AUTHORIZE_API']
# class ZoomCreateMeetingTool(BaseTool):
# name: str = "create_zoom_meeting"
# description: str = "Creates a Zoom meeting using configured credentials"
# args_schema: Type[BaseModel] = ZoomCreateMeetingArgs
# def _run(self, topic: str, start_time: str, duration: int = 30,
# agenda: str = "", timezone: str = "UTC"):
# # Get workspace owner's ID
# owner_id = get_workspace_owner_id()
# if not owner_id:
# return "Workspace owner not found"
# # Load preferences to check Zoom mode
# prefs = load_preferences(owner_id)
# zoom_config = prefs.get("zoom_config", {"mode": "manual", "link": None})
# # Handle manual mode
# if zoom_config["mode"] == "manual":
# link = zoom_config.get("link")
# if link:
# return f"Please join the meeting using this link: {link}"
# else:
# return "Manual Zoom link not configured"
# # Automatic mode
# token_path = f'token_files/zoom_{owner_id}.json'
# if not os.path.exists(token_path):
# return "Zoom not configured for automatic mode"
# with open(token_path) as f:
# token_data = json.load(f)
# access_token = token_data.get("access_token")
# if not access_token:
# return "Invalid Zoom token"
# # Check if token is expired and refresh if necessary
# expires_at = token_data.get("expires_at")
# if expires_at and time.time() > expires_at:
# refresh_token = token_data.get("refresh_token")
# if not refresh_token:
# return "Zoom token expired and no refresh token available"
# params = {
# "grant_type": "refresh_token",
# "refresh_token": refresh_token
# }
# auth_str = f"{CLIENT_ID}:{CLIENT_SECRET}"
# auth_bytes = base64.b64encode(auth_str.encode()).decode()
# headers = {
# "Authorization": f"Basic {auth_bytes}",
# "Content-Type": "application/x-www-form-urlencoded"
# }
# response = requests.post(ZOOM_TOKEN_API, data=params, headers=headers)
# if response.status_code == 200:
# new_token_data = response.json()
# token_data.update(new_token_data)
# token_data["expires_at"] = time.time() + new_token_data["expires_in"]
# with open(token_path, 'w') as f:
# json.dump(token_data, f)
# access_token = new_token_data["access_token"]
# else:
# return "Failed to refresh Zoom token"
# # Create Zoom meeting
# meeting_data = {
# "topic": topic,
# "type": 2, # Scheduled meeting
# "start_time": start_time,
# "duration": duration,
# "timezone": timezone,
# "agenda": agenda,
# "settings": {
# "host_video": True,
# "participant_video": True,
# "join_before_host": False
# }
# }
# headers = {
# "Authorization": f"Bearer {access_token}",
# "Content-Type": "application/json"
# }
# meeting_res = requests.post(
# "https://api.zoom.us/v2/users/me/meetings",
# headers=headers,
# json=meeting_data
# )
# if meeting_res.status_code == 201:
# meeting_info = meeting_res.json()
# # Extract meeting details
# meeting_id = meeting_info["id"]
# join_url = meeting_info["join_url"]
# password = meeting_info.get("password", "")
# dial_in_numbers = meeting_info["settings"].get("global_dial_in_numbers", [])
# print(meeting_info['settings'])
# print(dial_in_numbers)
# # Format one-tap mobile numbers (up to 2 US numbers)
# us_numbers = [num for num in dial_in_numbers if num["country"] == "US"]
# one_tap_strs = []
# for num in us_numbers[:2]:
# clean_number = ''.join(num["number"].split()) # Remove spaces, e.g., "+1 305 224 1968" -> "+13052241968"
# one_tap = f"{clean_number},,{meeting_id}#,,,,*{password}# US"
# one_tap_strs.append(one_tap)
# # Format dial-in numbers
# dial_in_strs = []
# for num in dial_in_numbers:
# country = num["country"]
# city = num.get("city", "")
# number = num["number"]
# if city:
# dial_in_strs.append(f"• {number} {country} ({city})")
# else:
# dial_in_strs.append(f"• {number} {country}")
# # Construct the invitation text
# invitation = "Citrusbug Technolabs is inviting you to a scheduled Zoom meeting.\n\n"
# invitation += f"Join Zoom Meeting\n{join_url}\n\n"
# invitation += f"Meeting ID: {meeting_id}\nPasscode: {password}\n\n"
# invitation += "---\n\n"
# if one_tap_strs:
# invitation += "One tap mobile\n"
# invitation += "\n".join(one_tap_strs) + "\n\n"
# invitation += "---\n\n"
# invitation += "Dial by your location\n"
# invitation += "\n".join(dial_in_strs) + "\n\n"
# invitation += f"Meeting ID: {meeting_id}\nPasscode: {password}\n"
# invitation += "Find your local number: https://zoom.us/zoomconference"
# return invitation
# else:
# return f"Zoom meeting creation failed: {meeting_res.text}"
# class MicrosoftBaseTool(BaseTool):
# """Base class for Microsoft tools with common auth handling"""
# def get_microsoft_client(self, user_id: str):
# """Get authenticated Microsoft client for a user"""
# token_path = os.path.join('token_files', f'microsoft_{user_id}.json')
# if not os.path.exists(token_path):
# return None, "Microsoft credentials not configured"
# with open(token_path) as f:
# token_data = json.load(f)
# if time.time() > token_data['expires_at']:
# # Handle token refresh
# app = ConfidentialClientApplication(
# MICROSOFT_CLIENT_ID,
# authority=MICROSOFT_AUTHORITY,
# client_credential=MICROSOFT_CLIENT_SECRET
# )
# result = app.acquire_token_by_refresh_token(
# token_data['refresh_token'],
# scopes=MICROSOFT_SCOPES
# )
# if "access_token" not in result:
# return None, "Token refresh failed"
# token_data.update(result)
# with open(token_path, 'w') as f:
# json.dump(token_data, f)
# headers = {
# "Authorization": f"Bearer {token_data['access_token']}",
# "Content-Type": "application/json"
# }
# return headers, None
# # Pydantic models for Microsoft tools
# class MicrosoftAddCalendarEventArgs(BaseModel):
# user_id: str = Field(description="Slack user ID of the calendar owner")
# subject: str = Field(description="Event title/subject")
# start_time: str = Field(description="Start time in ISO 8601 format")
# end_time: str = Field(description="End time in ISO 8601 format")
# content: str = Field(default="", description="Event description/content")
# location: str = Field(default="", description="Event location")
# attendees: List[str] = Field(default=[], description="List of attendee emails")
# class MicrosoftUpdateCalendarEventArgs(MicrosoftAddCalendarEventArgs):
# event_id: str = Field(description="Microsoft event ID to update")
# class MicrosoftDeleteCalendarEventArgs(BaseModel):
# user_id: str = Field(description="Slack user ID of the calendar owner")
# event_id: str = Field(description="Microsoft event ID to delete")
# # Microsoft Calendar Tools
# class MicrosoftListCalendarEvents(MicrosoftBaseTool):
# name:str = "microsoft_calendar_list_events"
# description:str = "Lists events from Microsoft Calendar"
# def _run(self, user_id: str, max_results: int = 10):
# headers, error = self.get_microsoft_client(user_id)
# if error:
# return error
# endpoint = "https://graph.microsoft.com/v1.0/me/events"
# params = {
# "$top": max_results,
# "$orderby": "start/dateTime desc"
# }
# response = requests.get(endpoint, headers=headers, params=params)
# if response.status_code != 200:
# return f"Error fetching events: {response.text}"
# events = response.json().get('value', [])
# return [{
# 'id': e['id'],
# 'subject': e.get('subject'),
# 'start': e['start'].get('dateTime'),
# 'end': e['end'].get('dateTime'),
# 'webLink': e.get('webUrl')
# } for e in events]
# class MicrosoftAddCalendarEvent(MicrosoftBaseTool):
# name:str = "microsoft_calendar_add_event"
# description:str = "Creates an event in Microsoft Calendar"
# args_schema: Type[BaseModel] = MicrosoftAddCalendarEventArgs
# def _run(self, user_id: str, subject: str, start_time: str, end_time: str,
# content: str = "", location: str = "", attendees: List[str] = []):
# headers, error = self.get_microsoft_client(user_id)
# if error:
# return error
# event_payload = {
# "subject": subject,
# "body": {
# "contentType": "HTML",
# "content": content
# },
# "start": {
# "dateTime": start_time,
# "timeZone": "America/Los_Angeles"
# },
# "end": {
# "dateTime": end_time,
# "timeZone": "America/Los_Angeles"
# },
# "location": {"displayName": location},
# "attendees": [{"emailAddress": {"address": email}} for email in attendees]
# }
# response = requests.post(
# "https://graph.microsoft.com/v1.0/me/events",
# headers=headers,
# json=event_payload
# )
# if response.status_code == 201:
# return {
# "status": "success",
# "event_id": response.json()['id'],
# "link": response.json().get('webUrl')
# }
# return f"Error creating event: {response.text}"
# class MicrosoftUpdateCalendarEvent(MicrosoftBaseTool):
# name:str = "microsoft_calendar_update_event"
# description:str = "Updates an existing Microsoft Calendar event"
# args_schema: Type[BaseModel] = MicrosoftUpdateCalendarEventArgs
# def _run(self, user_id: str, event_id: str, **kwargs):
# headers, error = self.get_microsoft_client(user_id)
# if error:
# return error
# get_response = requests.get(
# f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
# headers=headers
# )
# if get_response.status_code != 200:
# return f"Error finding event: {get_response.text}"
# existing_event = get_response.json()
# update_payload = {
# "subject": kwargs.get('subject', existing_event.get('subject')),
# "body": {
# "content": kwargs.get('content', existing_event.get('body', {}).get('content')),
# "contentType": "HTML"
# },
# "start": {
# "dateTime": kwargs.get('start_time', existing_event['start']['dateTime']),
# "timeZone": "UTC"
# },
# "end": {
# "dateTime": kwargs.get('end_time', existing_event['end']['dateTime']),
# "timeZone": "UTC"
# },
# "location": {"displayName": kwargs.get('location', existing_event.get('location', {}).get('displayName'))},
# "attendees": [{"emailAddress": {"address": email}} for email in
# kwargs.get('attendees', [a['emailAddress']['address'] for a in existing_event.get('attendees', [])])]
# }
# response = requests.patch(
# f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
# headers=headers,
# json=update_payload
# )
# if response.status_code == 200:
# return {"status": "success", "event_id": event_id}
# return f"Error updating event: {response.text}"
# class MicrosoftDeleteCalendarEvent(MicrosoftBaseTool):
# name:str = "microsoft_calendar_delete_event"
# description:str = "Deletes an event from Microsoft Calendar"
# args_schema: Type[BaseModel] = MicrosoftDeleteCalendarEventArgs
# def _run(self, user_id: str, event_id: str):
# headers, error = self.get_microsoft_client(user_id)
# if error:
# return error
# response = requests.delete(
# f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
# headers=headers
# )
# if response.status_code == 204:
# return {"status": "success", "message": f"Deleted event {event_id}"}
# return f"Error deleting event: {response.text}"
# tools = [
# DirectDMTool(),
# ZoomCreateMeetingTool(),
# # GetSingleUserSlackName(),
# # GetSingleUserSlackID(),
# # CoordinateDMsTool(),
# SearchUserEventsTool(),
# # DateTimeTool(),
# GoogleCalendarList(),
# GoogleCalendarEvents(),
# GoogleCreateCalendar(),
# GoogleAddCalendarEvent(),
# GoogleUpdateCalendarEvent(),
# GoogleDeleteCalendarEvent(),
# # MicrosoftListCalendarEvents(),
# MicrosoftAddCalendarEvent(),
# MicrosoftUpdateCalendarEvent(),
# MicrosoftDeleteCalendarEvent(),
# MultiDirectDMTool()
# ]
# calendar_prompt_tools = [
# MicrosoftListCalendarEvents(),
# GoogleCalendarEvents()
# ]
# dm_tools = [
# DirectDMTool(),
# ZoomCreateMeetingTool(),
# # CoordinateDMsTool(),
# SearchUserEventsTool(),
# GetSingleUserSlackName(),
# GetSingleUserSlackID(),
# # DateTimeTool(),
# GoogleCalendarList(),
# GoogleCalendarEvents(),
# GoogleCreateCalendar(),
# GoogleAddCalendarEvent(),
# GoogleUpdateCalendarEvent(),
# GoogleDeleteCalendarEvent(),
# MicrosoftListCalendarEvents(),
# MicrosoftAddCalendarEvent(),
# MicrosoftUpdateCalendarEvent(),
# MicrosoftDeleteCalendarEvent()
# ]
# dm_group_tools = [
# GoogleCalendarEvents(),
# MicrosoftListCalendarEvents(),
# DateTimeTool(),
# ]
import json
import os
import requests
import base64
import msal
import time
from typing import Type, Optional, List
from dotenv import load_dotenv
from pydantic.v1 import BaseModel, Field
from msal import ConfidentialClientApplication
from langchain_core.tools import BaseTool
from slack_sdk.errors import SlackApiError
from datetime import datetime
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from config import client, get_workspace_owner_id, load_preferences, load_token, save_token
from datetime import datetime, timedelta
import pytz
from collections import defaultdict
from config import all_users_preload, GetAllUsers
load_dotenv()
# Load credentials from environment variables
MICROSOFT_CLIENT_SECRET = os.getenv("MICROSOFT_CLIENT_SECRET")
MICROSOFT_AUTHORITY = "https://login.microsoftonline.com/common"
MICROSOFT_SCOPES = ["User.Read", "Calendars.ReadWrite"]
MICROSOFT_REDIRECT_URI = os.getenv("MICROSOFT_REDIRECT_URI", "https://clear-muskox-grand.ngrok-free.app/microsoft_callback")
MICROSOFT_CLIENT_ID = "855e4571-d92a-4d51-802e-e712a879c00b"
ZOOM_CLIENT_SECRET = os.getenv("ZOOM_CLIENT_SECRET")
ZOOM_CLIENT_ID = os.getenv("ZOOM_CLIENT_ID")
ZOOM_TOKEN_API = os.getenv("ZOOM_TOKEN_API", "https://zoom.us/oauth/token")
# Pydantic models for tool arguments
class DirectDMArgs(BaseModel):
message: str = Field(description="The message to be sent to the Slack user")
user_id: str = Field(description="The Slack user ID")
class DateTimeTool(BaseTool):
name: str = "current_date_time"
description: str = "Provides the current date and time."
def _run(self):
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Slack Tools
class GetSingleUserSlackIDArgs(BaseModel):
name: str = Field(description="The real name of the user whose Slack ID is needed")
class GetSingleUserSlackID(BaseTool):
name: str = "gets_slack_id_single_user"
description: str = "Gets the Slack ID of a user based on their real name"
args_schema: Type[BaseModel] = GetSingleUserSlackIDArgs
def _run(self, name: str):
if not all_users_preload:
all_users = GetAllUsers()
else:
all_users = all_users_preload
for uid, info in all_users.items():
if info["name"].lower() == name.lower():
return uid, info['email']
return "User not found"
class GetSingleUserSlackNameArgs(BaseModel):
id: str = Field(description="The Slack user ID")
class GetSingleUserSlackName(BaseTool):
name: str = "gets_slack_name_single_user"
description: str = "Gets the Slack real name of a user based on their Slack ID"
args_schema: Type[BaseModel] = GetSingleUserSlackNameArgs
def _run(self, id: str):
if not all_users_preload or all_users_preload == {}:
all_users = GetAllUsers()
else:
all_users = all_users_preload
user = all_users.get(id)
if user:
return user["name"], user['email']
return "User not found"
class MultiDMArgs(BaseModel):
message: str
user_ids: List[str]
class MultiDirectDMTool(BaseTool):
name: str = "send_multiple_dms"
description: str = "Sends direct messages to multiple Slack users"
args_schema: Type[BaseModel] = MultiDMArgs
def _run(self, message: str, user_ids: List[str]):
results = {}
for user_id in user_ids:
try:
client.chat_postMessage(channel=user_id, text=message)
results[user_id] = "Message sent successfully"
except SlackApiError as e:
results[user_id] = f"Error: {e.response['error']}"
return results
class DirectDMTool(BaseTool):
name: str = "send_direct_dm"
description: str = "Sends direct messages to Slack users"
args_schema: Type[BaseModel] = DirectDMArgs
def _run(self, message: str, user_id: str):
try:
client.chat_postMessage(channel=user_id, text=message)
return "Message sent successfully"
except SlackApiError as e:
return f"Error sending message: {e.response['error']}"
def send_dm(user_id: str, message: str) -> bool:
try:
client.chat_postMessage(channel=user_id, text=message)
return True
except SlackApiError as e:
print(f"Error sending DM: {e.response['error']}")
return False
# Google Calendar Tools
PT = pytz.timezone('America/Los_Angeles')
def construct_google_calendar_client(team_id: str, user_id: str):
token_data = load_token(team_id, user_id, 'google')
if not token_data:
return None
creds = Credentials(
token=token_data.get('access_token'),
refresh_token=token_data.get('refresh_token'),
token_uri="https://oauth2.googleapis.com/token",
client_id=os.getenv("GOOGLE_CLIENT_ID"),
client_secret=os.getenv("GOOGLE_CLIENT_SECRET"),
scopes=["https://www.googleapis.com/auth/calendar"]
)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
token_data.update({
"access_token": creds.token,
"refresh_token": creds.refresh_token,
"expires_at": creds.expiry.timestamp()
})
save_token(team_id, user_id, 'google', token_data)
return build('calendar', 'v3', credentials=creds)
class GoogleCalendarList(BaseTool):
name: str = "list_calendar_list"
description: str = "Lists available calendars in the user's Google Calendar account"
def _run(self, team_id: str, user_id: str, max_capacity: int = 200):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
all_calendars = []
next_page_token = None
capacity_tracker = 0
while capacity_tracker < max_capacity:
results = calendar_service.calendarList().list(
maxResults=min(200, max_capacity - capacity_tracker),
pageToken=next_page_token
).execute()
calendars = results.get('items', [])
all_calendars.extend(calendars)
capacity_tracker += len(calendars)
next_page_token = results.get('nextPageToken')
if not next_page_token:
break
return [{
'id': cal['id'],
'name': cal['summary'],
'description': cal.get('description', '')
} for cal in all_calendars]
class GoogleCalendarEvents(BaseTool):
name: str = "list_calendar_events"
description: str = "Lists and gets events from a specific Google Calendar"
def _run(self, team_id: str, user_id: str, calendar_id: str = "primary", max_capacity: int = 20):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
all_events = []
next_page_token = None
capacity_tracker = 0
while capacity_tracker < max_capacity:
results = calendar_service.events().list(
calendarId=calendar_id,
maxResults=min(250, max_capacity - capacity_tracker),
pageToken=next_page_token
).execute()
events = results.get('items', [])
all_events.extend(events)
capacity_tracker += len(events)
next_page_token = results.get('nextPageToken')
if not next_page_token:
break
return all_events
class GoogleCreateCalendar(BaseTool):
name: str = "create_calendar_list"
description: str = "Creates a new calendar in Google Calendar"
def _run(self, team_id: str, user_id: str, calendar_name: str):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
calendar_body = {'summary': calendar_name}
created_calendar = calendar_service.calendars().insert(body=calendar_body).execute()
return f"Created calendar: {created_calendar['id']}"
class GoogleAddCalendarEventArgs(BaseModel):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
summary: str = Field(description="Event title")
description: str = Field(default="", description="Event description or agenda")
start_time: str = Field(description="Start time in ISO 8601 format")
end_time: str = Field(description="End time in ISO 8601 format")
location: str = Field(default="", description="Event location")
invite_link: str = Field(default="", description="Invite link for the meeting")
guests: List[str] = Field(default=None, description="List of guest emails to invite")
class GoogleAddCalendarEvent(BaseTool):
name: str = "google_add_calendar_event"
description: str = "Creates an event in a Google Calendar"
args_schema: Type[BaseModel] = GoogleAddCalendarEventArgs
def _run(self, team_id: str, user_id: str, summary: str, start_time: str, end_time: str,
description: str = "", calendar_id: str = 'primary', location: str = "",
invite_link: str = "", guests: List[str] = None):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
if invite_link:
description = f"{description}\nInvite Link: {invite_link}"
event = {
'summary': summary,
'description': description,
'start': {'dateTime': start_time, 'timeZone': 'America/Los_Angeles'},
'end': {'dateTime': end_time, 'timeZone': 'America/Los_Angeles'},
'location': location,
}
if guests:
event['attendees'] = [{'email': guest} for guest in guests]
try:
created_event = calendar_service.events().insert(
calendarId=calendar_id,
body=event,
sendUpdates='all'
).execute()
return {
"status": "success",
"event_id": created_event['id'],
"link": created_event.get('htmlLink', '')
}
except Exception as e:
return {"status": "error", "message": str(e)}
class GoogleUpdateCalendarEventArgs(BaseModel):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
event_id: str = Field(description="The event ID to update")
summary: str = Field(default=None, description="Updated event title")
description: Optional[str] = Field(default=None, description="Updated event description")
start_time: Optional[str] = Field(default=None, description="Updated start time in ISO 8601 format")
end_time: Optional[str] = Field(default=None, description="Updated end time in ISO 8601 format")
location: Optional[str] = Field(default=None, description="Updated event location")
invite_link: str = Field(default=None, description="Updated invite link")
guests: List[str] = Field(default=None, description="Updated list of guest emails")
class GoogleUpdateCalendarEvent(BaseTool):
name: str = "google_update_calendar_event"
description: str = "Updates an existing event in a Google Calendar"
args_schema: Type[BaseModel] = GoogleUpdateCalendarEventArgs
def _run(self, team_id: str, user_id: str, event_id: str, calendar_id: str = "primary",
summary: Optional[str] = None, description: Optional[str] = None,
start_time: Optional[str] = None, end_time: Optional[str] = None,
location: Optional[str] = None, invite_link: Optional[str] = None,
guests: Optional[List[str]] = None):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
try:
event = calendar_service.events().get(calendarId=calendar_id, eventId=event_id).execute()
except Exception as e:
return {"status": "error", "message": f"Event retrieval failed: {str(e)}"}
if summary:
event['summary'] = summary
if description:
event['description'] = description
if invite_link:
current_desc = event.get('description', '')
event['description'] = f"{current_desc}\nInvite Link: {invite_link}"
if start_time:
event['start'] = {'dateTime': start_time, 'timeZone': 'America/Los_Angeles'}
if end_time:
event['end'] = {'dateTime': end_time, 'timeZone': 'America/Los_Angeles'}
if location:
event['location'] = location
if guests is not None:
event['attendees'] = [{'email': guest} for guest in guests]
try:
updated_event = calendar_service.events().update(
calendarId=calendar_id,
eventId=event_id,
body=event
).execute()
return {"status": "success", "event_id": updated_event['id']}
except Exception as e:
return {"status": "error", "message": f"Update failed: {str(e)}"}
class GoogleDeleteCalendarEventArgs(BaseModel):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
calendar_id: str = Field(default="primary", description="Calendar ID (default 'primary')")
event_id: str = Field(description="The event ID to delete")
class GoogleDeleteCalendarEvent(BaseTool):
name: str = "google_delete_calendar_event"
description: str = "Deletes an event from a Google Calendar"
args_schema: Type[BaseModel] = GoogleDeleteCalendarEventArgs
def _run(self, team_id: str, user_id: str, event_id: str, calendar_id: str = "primary"):
calendar_service = construct_google_calendar_client(team_id, user_id)
if not calendar_service:
return "Google Calendar not configured or token invalid."
try:
calendar_service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
return {"status": "success", "message": f"Deleted event {event_id}"}
except Exception as e:
return {"status": "error", "message": f"Deletion failed: {str(e)}"}
# Search Events Tool
class SearchUserEventsArgs(BaseModel):
user_id: str
lookback_days: int = Field(default=30)
class SearchUserEventsTool(BaseTool):
name: str = "search_events_by_user"
description: str = "Finds calendar events associated with a specific user"
args_schema: Type[BaseModel] = SearchUserEventsArgs
def _run(self, team_id: str, user_id: str, lookback_days: int = 30):
user_info = GetSingleUserSlackName().run(user_id)
if user_info == "User not found":
return []
user_name, user_email = user_info
events = GoogleCalendarEvents().run(team_id, user_id)
now = datetime.now(pytz.UTC)
relevant_events = []
for event in events:
event_time_str = event['start'].get('dateTime')
if not event_time_str:
continue
event_time = datetime.fromisoformat(event_time_str)
if event_time.tzinfo is None:
event_time = pytz.UTC.localize(event_time)
if (now - event_time).days > lookback_days:
continue
if user_name in event.get('summary', '') or user_name in event.get('description', ''):
relevant_events.append({
'id': event['id'],
'title': event['summary'],
'time': event_time.strftime("%Y-%m-%d %H:%M"),
'calendar_id': event['organizer']['email']
})
return relevant_events
# Zoom Meeting Tool
class ZoomCreateMeetingArgs(BaseModel):
team_id:str = Field(description="Team Id here")
topic: str = Field(description="Meeting topic with all names not slack ids starting with U--")
start_time: str = Field(description="Start time in ISO 8601 format")
duration: int = Field(description="Duration in minutes")
agenda: Optional[str] = Field(default="", description="Meeting agenda")
timezone: str = Field(default="UTC", description="Timezone for the meeting")
class ZoomCreateMeetingTool(BaseTool):
name: str = "create_zoom_meeting"
description: str = "Creates a Zoom meeting using configured credentials"
args_schema: Type[BaseModel] = ZoomCreateMeetingArgs
def _run(self, team_id: str, topic: str, start_time: str, duration: int = 30,
agenda: str = "", timezone: str = "UTC"):
owner_id = get_workspace_owner_id(client,team_id)
if not owner_id:
return "Workspace owner not found"
prefs = load_preferences(team_id, owner_id)
zoom_config = prefs.get("zoom_config", {"mode": "manual", "link": None})
if zoom_config["mode"] == "manual":
link = zoom_config.get("link")
if link:
return f"Please join the meeting using this link: {link}"
else:
return "Manual Zoom link not configured"
token_data = load_token(team_id, owner_id, 'zoom')
if not token_data:
return "Zoom token not found in database"
access_token = token_data.get("access_token")
if not access_token:
return "Invalid Zoom token"
expires_at = token_data.get("expires_at")
if expires_at and time.time() > expires_at:
refresh_token = token_data.get("refresh_token")
if not refresh_token:
return "Zoom token expired and no refresh token available"
params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token
}
auth_str = f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}"
auth_bytes = base64.b64encode(auth_str.encode()).decode()
headers = {
"Authorization": f"Basic {auth_bytes}",
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(ZOOM_TOKEN_API, data=params, headers=headers)
if response.status_code == 200:
new_token_data = response.json()
token_data.update(new_token_data)
token_data["expires_at"] = time.time() + new_token_data["expires_in"]
save_token(team_id, owner_id, 'zoom', token_data)
access_token = new_token_data["access_token"]
else:
return "Failed to refresh Zoom token"
meeting_data = {
"topic": topic,
"type": 2,
"start_time": start_time,
"duration": duration,
"timezone": timezone,
"agenda": agenda,
"settings": {
"host_video": True,
"participant_video": True,
"join_before_host": False
}
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
meeting_res = requests.post(
"https://api.zoom.us/v2/users/me/meetings",
headers=headers,
json=meeting_data
)
if meeting_res.status_code == 201:
meeting_info = meeting_res.json()
invitation = f"Join Zoom Meeting\n{meeting_info['join_url']}\nMeeting ID: {meeting_info['id']}\nPasscode: {meeting_info.get('password', '')}"
return invitation
else:
return f"Zoom meeting creation failed: {meeting_res.text}"
# Microsoft Calendar Tools
class MicrosoftBaseTool(BaseTool):
def get_microsoft_client(self, team_id: str, user_id: str):
token_data = load_token(team_id, user_id, 'microsoft')
if not token_data:
return None, "Microsoft token not found in database"
if time.time() > token_data['expires_at']:
app = ConfidentialClientApplication(
MICROSOFT_CLIENT_ID,
authority=MICROSOFT_AUTHORITY,
client_credential=MICROSOFT_CLIENT_SECRET
)
result = app.acquire_token_by_refresh_token(
token_data['refresh_token'],
scopes=MICROSOFT_SCOPES
)
if "access_token" not in result:
return None, "Token refresh failed"
token_data.update(result)
save_token(team_id, user_id, 'microsoft', token_data)
headers = {
"Authorization": f"Bearer {token_data['access_token']}",
"Content-Type": "application/json"
}
return headers, None
class MicrosoftListCalendarEvents(MicrosoftBaseTool):
name: str = "microsoft_calendar_list_events"
description: str = "Lists events from Microsoft Calendar"
def _run(self, team_id: str, user_id: str, max_results: int = 10):
headers, error = self.get_microsoft_client(team_id, user_id)
if error:
return error
endpoint = "https://graph.microsoft.com/v1.0/me/events"
params = {"$top": max_results, "$orderby": "start/dateTime desc"}
response = requests.get(endpoint, headers=headers, params=params)
if response.status_code != 200:
return f"Error fetching events: {response.text}"
events = response.json().get('value', [])
return [{
'id': e['id'],
'subject': e.get('subject'),
'start': e['start'].get('dateTime'),
'end': e['end'].get('dateTime'),
'webLink': e.get('webUrl')
} for e in events]
class MicrosoftAddCalendarEventArgs(BaseModel):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
subject: str = Field(description="Event title/subject")
start_time: str = Field(description="Start time in ISO 8601 format")
end_time: str = Field(description="End time in ISO 8601 format")
content: str = Field(default="", description="Event description/content")
location: str = Field(default="", description="Event location")
attendees: List[str] = Field(default=[], description="List of attendee emails")
class MicrosoftAddCalendarEvent(MicrosoftBaseTool):
name: str = "microsoft_calendar_add_event"
description: str = "Creates an event in Microsoft Calendar"
args_schema: Type[BaseModel] = MicrosoftAddCalendarEventArgs
def _run(self, team_id: str, user_id: str, subject: str, start_time: str, end_time: str,
content: str = "", location: str = "", attendees: List[str] = []):
headers, error = self.get_microsoft_client(team_id, user_id)
if error:
return error
event_payload = {
"subject": subject,
"body": {"contentType": "HTML", "content": content},
"start": {"dateTime": start_time, "timeZone": "America/Los_Angeles"},
"end": {"dateTime": end_time, "timeZone": "America/Los_Angeles"},
"location": {"displayName": location},
"attendees": [{"emailAddress": {"address": email}} for email in attendees]
}
response = requests.post(
"https://graph.microsoft.com/v1.0/me/events",
headers=headers,
json=event_payload
)
if response.status_code == 201:
return {
"status": "success",
"event_id": response.json()['id'],
"link": response.json().get('webUrl')
}
return f"Error creating event: {response.text}"
class MicrosoftUpdateCalendarEventArgs(MicrosoftAddCalendarEventArgs):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
event_id: str = Field(description="Microsoft event ID to update")
class MicrosoftUpdateCalendarEvent(MicrosoftBaseTool):
name: str = "microsoft_calendar_update_event"
description: str = "Updates an existing Microsoft Calendar event"
args_schema: Type[BaseModel] = MicrosoftUpdateCalendarEventArgs
def _run(self, team_id: str, user_id: str, event_id: str, **kwargs):
headers, error = self.get_microsoft_client(team_id, user_id)
if error:
return error
get_response = requests.get(
f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
headers=headers
)
if get_response.status_code != 200:
return f"Error finding event: {get_response.text}"
existing_event = get_response.json()
update_payload = {
"subject": kwargs.get('subject', existing_event.get('subject')),
"body": {
"content": kwargs.get('content', existing_event.get('body', {}).get('content')),
"contentType": "HTML"
},
"start": {
"dateTime": kwargs.get('start_time', existing_event['start']['dateTime']),
"timeZone": "America/Los_Angeles"
},
"end": {
"dateTime": kwargs.get('end_time', existing_event['end']['dateTime']),
"timeZone": "America/Los_Angeles"
},
"location": {"displayName": kwargs.get('location', existing_event.get('location', {}).get('displayName'))},
"attendees": [{"emailAddress": {"address": email}} for email in
kwargs.get('attendees', [a['emailAddress']['address'] for a in existing_event.get('attendees', [])])]
}
response = requests.patch(
f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
headers=headers,
json=update_payload
)
if response.status_code == 200:
return {"status": "success", "event_id": event_id}
return f"Error updating event: {response.text}"
class MicrosoftDeleteCalendarEventArgs(BaseModel):
team_id: str = Field(description="Team id here")
user_id: str = Field(description="User id here")
event_id: str = Field(description="Microsoft event ID to delete")
class MicrosoftDeleteCalendarEvent(MicrosoftBaseTool):
name: str = "microsoft_calendar_delete_event"
description: str = "Deletes an event from Microsoft Calendar"
args_schema: Type[BaseModel] = MicrosoftDeleteCalendarEventArgs
def _run(self, team_id: str, user_id: str, event_id: str):
headers, error = self.get_microsoft_client(team_id, user_id)
if error:
return error
response = requests.delete(
f"https://graph.microsoft.com/v1.0/me/events/{event_id}",
headers=headers
)
if response.status_code == 204:
return {"status": "success", "message": f"Deleted event {event_id}"}
return f"Error deleting event: {response.text}"
# Tool Lists
tools = [
DirectDMTool(),
ZoomCreateMeetingTool(),
SearchUserEventsTool(),
GoogleCalendarList(),
GoogleCalendarEvents(),
GoogleCreateCalendar(),
GoogleAddCalendarEvent(),
GoogleUpdateCalendarEvent(),
GoogleDeleteCalendarEvent(),
MicrosoftListCalendarEvents(),
MicrosoftAddCalendarEvent(),
MicrosoftUpdateCalendarEvent(),
MicrosoftDeleteCalendarEvent(),
MultiDirectDMTool()
]
calendar_prompt_tools = [
MicrosoftListCalendarEvents(),
GoogleCalendarEvents()
]
dm_tools = [
DirectDMTool(),
ZoomCreateMeetingTool(),
SearchUserEventsTool(),
GetSingleUserSlackName(),
GetSingleUserSlackID(),
GoogleCalendarList(),
GoogleCalendarEvents(),
GoogleCreateCalendar(),
GoogleAddCalendarEvent(),
GoogleUpdateCalendarEvent(),
GoogleDeleteCalendarEvent(),
MicrosoftListCalendarEvents(),
MicrosoftAddCalendarEvent(),
MicrosoftUpdateCalendarEvent(),
MicrosoftDeleteCalendarEvent()
]
dm_group_tools = [
GoogleCalendarEvents(),
MicrosoftListCalendarEvents(),
DateTimeTool(),
]