Agent_Calendar / autonomous_agent.py
atharv-16's picture
Upload 9 files
02c13f1 verified
import re
import pytz
from datetime import datetime, timedelta
from collections import Counter
from googleapiclient.errors import HttpError
class SchedulingAgent:
"""
Autonomous agent for intelligent calendar management.
Features:
- Context-aware scheduling (learns from past meetings)
- Conflict resolution
- Smart time slot suggestions
- Natural language processing for calendar commands
"""
def __init__(self, service, timezone="Asia/Kolkata"):
self.service = service
self.timezone = timezone
self.tz = pytz.timezone(timezone)
# Load user preferences and context
self.preferences = self._load_user_preferences()
def _load_user_preferences(self):
"""Load user scheduling preferences by analyzing past meetings"""
preferences = {
'preferred_days': None,
'preferred_times': None,
'common_meeting_duration': 60, # minutes
'meeting_blackout_times': [], # times to avoid
'frequent_contacts': [] # people the user meets with often
}
# Analyze past 3 months of meetings
now = datetime.now(self.tz)
three_months_ago = (now - timedelta(days=90)).isoformat()
try:
events = self.service.events().list(
calendarId='primary',
timeMin=three_months_ago,
maxResults=1000,
singleEvents=True,
orderBy='startTime'
).execute().get('items', [])
# Skip if no past meetings
if not events:
return preferences
# Analyze meeting patterns
day_counts = Counter()
hour_counts = Counter()
durations = []
contacts = Counter()
for event in events:
# Only process events with actual people (meetings)
if 'attendees' in event or 'meeting' in event.get('summary', '').lower():
start = event['start'].get('dateTime')
end = event['end'].get('dateTime')
if start and end:
start_dt = datetime.fromisoformat(start.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end.replace('Z', '+00:00'))
# Track day of week preferences
day_counts[start_dt.weekday()] += 1
# Track hour preferences
hour_counts[start_dt.hour] += 1
# Track typical meeting durations
duration = (end_dt - start_dt).total_seconds() / 60
durations.append(duration)
# Extract contacts
if 'attendees' in event:
for attendee in event['attendees']:
if 'email' in attendee and attendee.get('responseStatus') != 'declined':
contacts[attendee['email']] += 1
summary = event.get('summary', '').lower()
if 'with' in summary:
name = summary.split('with')[-1].strip()
contacts[name] += 1
# Set preferences based on analysis
if day_counts:
preferences['preferred_days'] = [day for day, _ in day_counts.most_common(3)]
if hour_counts:
preferences['preferred_times'] = [hour for hour, _ in hour_counts.most_common(3)]
if durations:
preferences['common_meeting_duration'] = int(sum(durations) / len(durations))
if contacts:
preferences['frequent_contacts'] = [contact for contact, _ in contacts.most_common(10)]
# Identify blackout times (times when meetings are rarely scheduled)
work_hours = list(range(9, 18)) # 9 AM to 6 PM
all_hours = set(work_hours)
common_hours = set(preferences['preferred_times']) if preferences['preferred_times'] else set()
preferences['meeting_blackout_times'] = list(all_hours - common_hours)
return preferences
except Exception as e:
print(f"Error analyzing past meetings: {e}")
return preferences
def find_optimal_meeting_time(self, person=None, start_date=None, end_date=None, duration_minutes=None):
"""
Find the optimal meeting time based on:
- User's scheduling preferences
- Calendar availability
- The other person's availability (if provided)
- Contextual factors (e.g., time of day, day of week)
"""
if not duration_minutes:
duration_minutes = self.preferences['common_meeting_duration']
now = datetime.now(self.tz)
if not start_date:
start_date = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
if not end_date:
end_date = start_date + timedelta(days=7)
# Convert to proper format
time_min = start_date.isoformat()
time_max = end_date.isoformat()
# Get busy times
freebusy_query = {
"timeMin": time_min,
"timeMax": time_max,
"items": [{"id": "primary"}]
}
try:
busy_times = self.service.freebusy().query(body=freebusy_query).execute()
busy_slots = busy_times['calendars']['primary']['busy']
except Exception as e:
print(f"Error fetching busy times: {e}")
busy_slots = []
# Prepare scoring for candidate slots
candidate_slots = []
current_date = start_date
# Look at each day in the range
while current_date.date() <= end_date.date():
# Skip days that don't match preferred days (if we have preferences)
if self.preferences['preferred_days'] and current_date.weekday() not in self.preferences['preferred_days']:
current_date = current_date + timedelta(days=1)
continue
# Look at each hour from 9 AM to 6 PM
for hour in range(9, 18):
# Skip hours in blackout times
if hour in self.preferences['meeting_blackout_times']:
continue
slot_start = current_date.replace(hour=hour, minute=0, second=0, microsecond=0)
slot_end = slot_start + timedelta(minutes=duration_minutes)
# Skip if in the past
if slot_start <= now:
continue
# Check if slot conflicts with busy times
is_free = True
for busy in busy_slots:
busy_start = datetime.fromisoformat(busy['start'].replace('Z', '+00:00'))
busy_end = datetime.fromisoformat(busy['end'].replace('Z', '+00:00'))
if (slot_start < busy_end and slot_end > busy_start):
is_free = False
break
if is_free:
# Score this time slot
score = 0
# Preferred hour bonus
if self.preferences['preferred_times'] and hour in self.preferences['preferred_times']:
score += 10
# Preferred day bonus
if self.preferences['preferred_days'] and slot_start.weekday() in self.preferences['preferred_days']:
score += 5
# Recency penalty (prefer sooner meetings)
days_from_now = (slot_start.date() - now.date()).days
score -= days_from_now
candidate_slots.append({
'start': slot_start.isoformat(),
'end': slot_end.isoformat(),
'score': score
})
current_date = current_date + timedelta(days=1)
# Sort by score, highest first
candidate_slots.sort(key=lambda x: x['score'], reverse=True)
# Return the top 3 options
return candidate_slots[:3] if candidate_slots else []
def schedule_meeting(self, person, datetime_str=None, duration_minutes=None):
"""
Schedule a meeting with intelligent slot selection
"""
if not duration_minutes:
duration_minutes = self.preferences['common_meeting_duration']
# If datetime specified, use it
if datetime_str:
try:
# Parse the provided date/time
dt = datetime.fromisoformat(datetime_str.replace('Z', '+00:00'))
slot_start = dt
slot_end = slot_start + timedelta(minutes=duration_minutes)
# Check if this time is available
freebusy_query = {
"timeMin": slot_start.isoformat(),
"timeMax": slot_end.isoformat(),
"items": [{"id": "primary"}]
}
busy_times = self.service.freebusy().query(body=freebusy_query).execute()
busy_slots = busy_times['calendars']['primary']['busy']
if busy_slots:
# Time is not available, find alternatives
optimal_slots = self.find_optimal_meeting_time(
person,
slot_start,
slot_start + timedelta(days=7),
duration_minutes
)
if not optimal_slots:
return {
"status": "error",
"message": f"Requested time is not available and no alternatives found."
}
# Use the best alternative
slot_start = datetime.fromisoformat(optimal_slots[0]['start'].replace('Z', '+00:00'))
slot_end = datetime.fromisoformat(optimal_slots[0]['end'].replace('Z', '+00:00'))
# Create event with the alternative time
event = {
'summary': f'Meeting with {person} (Rescheduled)',
'description': f'Originally requested for {datetime_str}',
'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone},
'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone},
}
event_result = self.service.events().insert(calendarId='primary', body=event).execute()
return {
"status": "rescheduled",
"message": f"Requested time was unavailable. Meeting rescheduled to {slot_start.strftime('%Y-%m-%d %I:%M %p')}",
"event_link": event_result.get('htmlLink'),
"scheduled_time": slot_start.isoformat()
}
else:
# Time is available, create the event
event = {
'summary': f'Meeting with {person}',
'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone},
'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone},
}
event_result = self.service.events().insert(calendarId='primary', body=event).execute()
return {
"status": "created",
"message": f"Meeting scheduled for {slot_start.strftime('%Y-%m-%d %I:%M %p')}",
"event_link": event_result.get('htmlLink'),
"scheduled_time": slot_start.isoformat()
}
except Exception as e:
return {
"status": "error",
"message": f"Error scheduling meeting: {e}"
}
# No datetime specified, find optimal time
else:
try:
now = datetime.now(self.tz)
optimal_slots = self.find_optimal_meeting_time(
person,
now + timedelta(days=1),
now + timedelta(days=14),
duration_minutes
)
if not optimal_slots:
return {
"status": "error",
"message": "No suitable meeting times found in the next 2 weeks."
}
# Use the best slot
slot_start = datetime.fromisoformat(optimal_slots[0]['start'].replace('Z', '+00:00'))
slot_end = datetime.fromisoformat(optimal_slots[0]['end'].replace('Z', '+00:00'))
# Create the event
event = {
'summary': f'Meeting with {person}',
'description': 'Automatically scheduled by Calendar Assistant',
'start': {'dateTime': slot_start.isoformat(), 'timeZone': self.timezone},
'end': {'dateTime': slot_end.isoformat(), 'timeZone': self.timezone},
}
event_result = self.service.events().insert(calendarId='primary', body=event).execute()
return {
"status": "created",
"message": f"Optimal meeting time found and scheduled: {slot_start.strftime('%Y-%m-%d %I:%M %p')}",
"event_link": event_result.get('htmlLink'),
"scheduled_time": slot_start.isoformat(),
"alternatives": optimal_slots[1:] if len(optimal_slots) > 1 else []
}
except Exception as e:
return {
"status": "error",
"message": f"Error finding optimal meeting time: {e}"
}
def process_command(self, command):
"""Process natural language commands with context awareness"""
command = command.lower().strip()
# Find a time/schedule with person
schedule_match = re.search(r'(schedule|find a time for|set up|arrange) (?:a )?(?:meeting )?with ([\w\s]+)', command)
if schedule_match:
action = schedule_match.group(1)
person = schedule_match.group(2).strip()
# Extract date/time if specified
datetime_match = re.search(r'(?:on|at) ([\w\s\d,:-]+(?:am|pm)?)', command)
datetime_str = None
if datetime_match:
datetime_text = datetime_match.group(1)
try:
# Try to parse the date/time
from dateutil import parser
dt = parser.parse(datetime_text)
# Localize the datetime
dt = self.tz.localize(dt)
datetime_str = dt.isoformat()
except:
return {
"status": "error",
"message": f"Could not understand the date/time: {datetime_text}"
}
# Extract duration if specified
duration_match = re.search(r'for (\d+) (minute|hour|min|hr)s?', command)
duration_minutes = None
if duration_match:
duration_val = int(duration_match.group(1))
duration_unit = duration_match.group(2)
if duration_unit in ['hour', 'hr']:
duration_minutes = duration_val * 60
else:
duration_minutes = duration_val
# Schedule the meeting
return self.schedule_meeting(person, datetime_str, duration_minutes)
# Suggest times without scheduling
suggest_match = re.search(r'suggest|recommend|what are good times for meeting with ([\w\s]+)', command)
if suggest_match:
person = suggest_match.group(1).strip()
optimal_slots = self.find_optimal_meeting_time(person=person)
if not optimal_slots:
return {
"status": "error",
"message": "No suitable meeting times found in the next week."
}
formatted_slots = []
for slot in optimal_slots:
start_dt = datetime.fromisoformat(slot['start'].replace('Z', '+00:00'))
formatted_slots.append(start_dt.strftime('%A, %b %d at %I:%M %p'))
return {
"status": "suggestions",
"message": f"Here are the best times to meet with {person}:",
"slots": formatted_slots
}
# If no matching intent was found
return {
"status": "error",
"message": "I couldn't understand that command. Try something like 'schedule a meeting with Alex' or 'suggest times for meeting with Taylor'."
}