import re import pytz from datetime import datetime, timedelta from collections import Counter from googleapiclient.errors import HttpError class SchedulingAgent: """ Autonomous agent for intelligent calendar management. Features: - Context-aware scheduling (learns from past meetings) - Conflict resolution - Smart time slot suggestions - Natural language processing for calendar commands """ def __init__(self, service, timezone="Asia/Kolkata"): self.service = service self.timezone = timezone self.tz = pytz.timezone(timezone) # Load user preferences and context self.preferences = self._load_user_preferences() def _load_user_preferences(self): """Load user scheduling preferences by analyzing past meetings""" preferences = { 'preferred_days': None, 'preferred_times': None, 'common_meeting_duration': 60, # minutes 'meeting_blackout_times': [], # times to avoid 'frequent_contacts': [] # people the user meets with often } # Analyze past 3 months of meetings now = datetime.now(self.tz) three_months_ago = (now - timedelta(days=90)).isoformat() try: events = self.service.events().list( calendarId='primary', timeMin=three_months_ago, maxResults=1000, singleEvents=True, orderBy='startTime' ).execute().get('items', []) # Skip if no past meetings if not events: return preferences # Analyze meeting patterns day_counts = Counter() hour_counts = Counter() durations = [] contacts = Counter() for event in events: # Only process events with actual people (meetings) if 'attendees' in event or 'meeting' in event.get('summary', '').lower(): start = event['start'].get('dateTime') end = event['end'].get('dateTime') if start and end: start_dt = datetime.fromisoformat(start.replace('Z', '+00:00')) end_dt = datetime.fromisoformat(end.replace('Z', '+00:00')) # Track day of week preferences day_counts[start_dt.weekday()] += 1 # Track hour preferences hour_counts[start_dt.hour] += 1 # Track typical meeting durations duration = (end_dt - start_dt).total_seconds() / 60 durations.append(duration) # Extract contacts if 'attendees' in event: for attendee in event['attendees']: if 'email' in attendee and attendee.get('responseStatus') != 'declined': contacts[attendee['email']] += 1 summary = event.get('summary', '').lower() if 'with' in summary: name = summary.split('with')[-1].strip() contacts[name] += 1 # Set preferences based on analysis if day_counts: preferences['preferred_days'] = [day for day, _ in day_counts.most_common(3)] if hour_counts: preferences['preferred_times'] = [hour for hour, _ in hour_counts.most_common(3)] if durations: preferences['common_meeting_duration'] = int(sum(durations) / len(durations)) if contacts: preferences['frequent_contacts'] = [contact for contact, _ in contacts.most_common(10)] # Identify blackout times (times when meetings are rarely scheduled) work_hours = list(range(9, 18)) # 9 AM to 6 PM all_hours = set(work_hours) common_hours = set(preferences['preferred_times']) if preferences['preferred_times'] else set() preferences['meeting_blackout_times'] = list(all_hours - common_hours) return preferences except Exception as e: print(f"Error analyzing past meetings: {e}") return preferences def find_optimal_meeting_time(self, person=None, start_date=None, end_date=None, duration_minutes=None): """ Find the optimal meeting time based on: - User's scheduling preferences - Calendar availability - The other person's availability (if provided) - Contextual factors (e.g., time of day, day of week) """ if not duration_minutes: duration_minutes = self.preferences['common_meeting_duration'] now = datetime.now(self.tz) if not start_date: start_date = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) if not end_date: end_date = start_date + timedelta(days=7) # Convert to proper format time_min = start_date.isoformat() time_max = end_date.isoformat() # Get busy times freebusy_query = { "timeMin": time_min, "timeMax": time_max, "items": [{"id": "primary"}] } try: busy_times = self.service.freebusy().query(body=freebusy_query).execute() busy_slots = busy_times['calendars']['primary']['busy'] except Exception as e: print(f"Error fetching busy times: {e}") busy_slots = [] # Prepare scoring for candidate slots candidate_slots = [] current_date = start_date # Look at each day in the range while current_date.date() <= end_date.date(): # Skip days that don't match preferred days (if we have preferences) if self.preferences['preferred_days'] and current_date.weekday() not in self.preferences['preferred_days']: current_date = current_date + timedelta(days=1) continue # Look at each hour from 9 AM to 6 PM for hour in range(9, 18): # Skip hours in blackout times if hour in self.preferences['meeting_blackout_times']: continue slot_start = current_date.replace(hour=hour, minute=0, second=0, microsecond=0) slot_end = slot_start + timedelta(minutes=duration_minutes) # Skip if in the past if slot_start <= now: continue # Check if slot conflicts with busy times is_free = True for busy in busy_slots: busy_start = datetime.fromisoformat(busy['start'].replace('Z', '+00:00')) busy_end = datetime.fromisoformat(busy['end'].replace('Z', '+00:00')) if (slot_start < busy_end and slot_end > busy_start): is_free = False break if is_free: # Score this time slot score = 0 # Preferred hour bonus if self.preferences['preferred_times'] and hour in self.preferences['preferred_times']: score += 10 # Preferred day bonus if self.preferences['preferred_days'] and slot_start.weekday() in self.preferences['preferred_days']: score += 5 # Recency penalty (prefer sooner meetings) days_from_now = (slot_start.date() - now.date()).days score -= days_from_now candidate_slots.append({ 'start': slot_start.isoformat(), 'end': slot_end.isoformat(), 'score': score }) current_date = current_date + timedelta(days=1) # Sort by score, highest first candidate_slots.sort(key=lambda x: x['score'], reverse=True) # Return the top 3 options return candidate_slots[:3] if candidate_slots else [] def schedule_meeting(self, person, datetime_str=None, duration_minutes=None): """ Schedule a meeting with intelligent slot selection """ if not duration_minutes: duration_minutes = self.preferences['common_meeting_duration'] # If datetime specified, use it if datetime_str: try: # Parse the provided date/time dt = datetime.fromisoformat(datetime_str.replace('Z', '+00:00')) slot_start = dt slot_end = slot_start + timedelta(minutes=duration_minutes) # Check if this time is available freebusy_query = { "timeMin": slot_start.isoformat(), "timeMax": slot_end.isoformat(), "items": [{"id": "primary"}] } busy_times = self.service.freebusy().query(body=freebusy_query).execute() busy_slots = busy_times['calendars']['primary']['busy'] if busy_slots: # Time is not available, find alternatives optimal_slots = self.find_optimal_meeting_time( person, slot_start, slot_start + timedelta(days=7), duration_minutes ) if not optimal_slots: return { "status": "error", "message": f"Requested time is not available and no alternatives found." } # Use the best alternative slot_start = datetime.fromisoformat(optimal_slots[0]['start'].replace('Z', '+00:00')) slot_end = datetime.fromisoformat(optimal_slots[0]['end'].replace('Z', '+00:00')) # Create event with the alternative time event = { 'summary': f'Meeting with {person} (Rescheduled)', 'description': f'Originally requested for {datetime_str}', 'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone}, 'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone}, } event_result = self.service.events().insert(calendarId='primary', body=event).execute() return { "status": "rescheduled", "message": f"Requested time was unavailable. Meeting rescheduled to {slot_start.strftime('%Y-%m-%d %I:%M %p')}", "event_link": event_result.get('htmlLink'), "scheduled_time": slot_start.isoformat() } else: # Time is available, create the event event = { 'summary': f'Meeting with {person}', 'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone}, 'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone}, } event_result = self.service.events().insert(calendarId='primary', body=event).execute() return { "status": "created", "message": f"Meeting scheduled for {slot_start.strftime('%Y-%m-%d %I:%M %p')}", "event_link": event_result.get('htmlLink'), "scheduled_time": slot_start.isoformat() } except Exception as e: return { "status": "error", "message": f"Error scheduling meeting: {e}" } # No datetime specified, find optimal time else: try: now = datetime.now(self.tz) optimal_slots = self.find_optimal_meeting_time( person, now + timedelta(days=1), now + timedelta(days=14), duration_minutes ) if not optimal_slots: return { "status": "error", "message": "No suitable meeting times found in the next 2 weeks." } # Use the best slot slot_start = datetime.fromisoformat(optimal_slots[0]['start'].replace('Z', '+00:00')) slot_end = datetime.fromisoformat(optimal_slots[0]['end'].replace('Z', '+00:00')) # Create the event event = { 'summary': f'Meeting with {person}', 'description': 'Automatically scheduled by Calendar Assistant', 'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone}, 'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone}, } event_result = self.service.events().insert(calendarId='primary', body=event).execute() return { "status": "created", "message": f"Optimal meeting time found and scheduled: {slot_start.strftime('%Y-%m-%d %I:%M %p')}", "event_link": event_result.get('htmlLink'), "scheduled_time": slot_start.isoformat(), "alternatives": optimal_slots[1:] if len(optimal_slots) > 1 else [] } except Exception as e: return { "status": "error", "message": f"Error finding optimal meeting time: {e}" } def process_command(self, command): """Process natural language commands with context awareness""" command = command.lower().strip() # Find a time/schedule with person schedule_match = re.search(r'(schedule|find a time for|set up|arrange) (?:a )?(?:meeting )?with ([\w\s]+)', command) if schedule_match: action = schedule_match.group(1) person = schedule_match.group(2).strip() # Extract date/time if specified datetime_match = re.search(r'(?:on|at) ([\w\s\d,:-]+(?:am|pm)?)', command) datetime_str = None if datetime_match: datetime_text = datetime_match.group(1) try: # Try to parse the date/time from dateutil import parser dt = parser.parse(datetime_text) # Localize the datetime dt = self.tz.localize(dt) datetime_str = dt.isoformat() except: return { "status": "error", "message": f"Could not understand the date/time: {datetime_text}" } # Extract duration if specified duration_match = re.search(r'for (\d+) (minute|hour|min|hr)s?', command) duration_minutes = None if duration_match: duration_val = int(duration_match.group(1)) duration_unit = duration_match.group(2) if duration_unit in ['hour', 'hr']: duration_minutes = duration_val * 60 else: duration_minutes = duration_val # Schedule the meeting return self.schedule_meeting(person, datetime_str, duration_minutes) # Suggest times without scheduling suggest_match = re.search(r'suggest|recommend|what are good times for meeting with ([\w\s]+)', command) if suggest_match: person = suggest_match.group(1).strip() optimal_slots = self.find_optimal_meeting_time(person=person) if not optimal_slots: return { "status": "error", "message": "No suitable meeting times found in the next week." } formatted_slots = [] for slot in optimal_slots: start_dt = datetime.fromisoformat(slot['start'].replace('Z', '+00:00')) formatted_slots.append(start_dt.strftime('%A, %b %d at %I:%M %p')) return { "status": "suggestions", "message": f"Here are the best times to meet with {person}:", "slots": formatted_slots } # If no matching intent was found return { "status": "error", "message": "I couldn't understand that command. Try something like 'schedule a meeting with Alex' or 'suggest times for meeting with Taylor'." }