| | """ |
| | π§π
Google Integration for Olivia |
| | Provides Gmail and Calendar access for the household manager persona. |
| | |
| | SETUP REQUIRED: |
| | 1. Go to https://console.cloud.google.com/ |
| | 2. Create a new project (or use existing) |
| | 3. Enable Gmail API and Google Calendar API |
| | 4. Create OAuth 2.0 credentials (Desktop app type) |
| | 5. Download credentials.json and place in this directory |
| | 6. Run this file directly to authenticate: python google_services.py |
| | """ |
| |
|
| | import os |
| | import pickle |
| | import datetime |
| | from pathlib import Path |
| |
|
| | |
| | try: |
| | from google.auth.transport.requests import Request |
| | from google.oauth2.credentials import Credentials |
| | from google_auth_oauthlib.flow import InstalledAppFlow |
| | from googleapiclient.discovery import build |
| | from googleapiclient.errors import HttpError |
| | GOOGLE_AVAILABLE = True |
| | except ImportError: |
| | GOOGLE_AVAILABLE = False |
| | print("β οΈ Google API libraries not installed. Run: pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib") |
| |
|
| | |
| | SCOPES = [ |
| | 'https://www.googleapis.com/auth/gmail.readonly', |
| | 'https://www.googleapis.com/auth/gmail.send', |
| | 'https://www.googleapis.com/auth/calendar.readonly', |
| | 'https://www.googleapis.com/auth/calendar.events', |
| | ] |
| |
|
| | |
| | SCRIPT_DIR = Path(__file__).parent |
| | CREDENTIALS_FILE = SCRIPT_DIR / 'credentials.json' |
| | TOKEN_FILE = SCRIPT_DIR / 'token.pickle' |
| |
|
| |
|
| | def get_google_credentials(): |
| | """Get or refresh Google OAuth credentials.""" |
| | if not GOOGLE_AVAILABLE: |
| | return None |
| | |
| | creds = None |
| | |
| | |
| | if TOKEN_FILE.exists(): |
| | with open(TOKEN_FILE, 'rb') as token: |
| | creds = pickle.load(token) |
| | |
| | |
| | if not creds or not creds.valid: |
| | if creds and creds.expired and creds.refresh_token: |
| | creds.refresh(Request()) |
| | else: |
| | if not CREDENTIALS_FILE.exists(): |
| | print(f"β οΈ Missing {CREDENTIALS_FILE}") |
| | print(" Download from Google Cloud Console β APIs & Services β Credentials") |
| | return None |
| | |
| | flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_FILE), SCOPES) |
| | creds = flow.run_local_server(port=0) |
| | |
| | |
| | with open(TOKEN_FILE, 'wb') as token: |
| | pickle.dump(creds, token) |
| | |
| | return creds |
| |
|
| |
|
| | class GmailService: |
| | """Gmail integration for Olivia.""" |
| | |
| | def __init__(self): |
| | self.service = None |
| | self._connect() |
| | |
| | def _connect(self): |
| | """Connect to Gmail API.""" |
| | creds = get_google_credentials() |
| | if creds: |
| | try: |
| | self.service = build('gmail', 'v1', credentials=creds) |
| | except Exception as e: |
| | print(f"β οΈ Gmail connection failed: {e}") |
| | |
| | def get_unread_count(self): |
| | """Get count of unread emails.""" |
| | if not self.service: |
| | return None |
| | try: |
| | results = self.service.users().messages().list( |
| | userId='me', q='is:unread', maxResults=1 |
| | ).execute() |
| | return results.get('resultSizeEstimate', 0) |
| | except HttpError as e: |
| | print(f"β οΈ Gmail error: {e}") |
| | return None |
| | |
| | def get_recent_emails(self, max_results=5): |
| | """Get recent email summaries.""" |
| | if not self.service: |
| | return [] |
| | |
| | try: |
| | results = self.service.users().messages().list( |
| | userId='me', maxResults=max_results |
| | ).execute() |
| | |
| | messages = results.get('messages', []) |
| | emails = [] |
| | |
| | for msg in messages: |
| | msg_data = self.service.users().messages().get( |
| | userId='me', id=msg['id'], format='metadata', |
| | metadataHeaders=['From', 'Subject', 'Date'] |
| | ).execute() |
| | |
| | headers = {h['name']: h['value'] for h in msg_data.get('payload', {}).get('headers', [])} |
| | emails.append({ |
| | 'id': msg['id'], |
| | 'from': headers.get('From', 'Unknown'), |
| | 'subject': headers.get('Subject', '(no subject)'), |
| | 'date': headers.get('Date', ''), |
| | 'snippet': msg_data.get('snippet', '')[:100], |
| | 'unread': 'UNREAD' in msg_data.get('labelIds', []) |
| | }) |
| | |
| | return emails |
| | except HttpError as e: |
| | print(f"β οΈ Gmail error: {e}") |
| | return [] |
| | |
| | def get_email_summary(self): |
| | """Get a natural language summary for Olivia to speak.""" |
| | unread = self.get_unread_count() |
| | if unread is None: |
| | return "I couldn't access your Gmail right now." |
| | |
| | if unread == 0: |
| | return "Your inbox is all clear - no unread emails!" |
| | |
| | recent = self.get_recent_emails(3) |
| | unread_emails = [e for e in recent if e.get('unread')] |
| | |
| | if unread == 1: |
| | summary = f"You have 1 unread email" |
| | else: |
| | summary = f"You have {unread} unread emails" |
| | |
| | if unread_emails: |
| | first = unread_emails[0] |
| | sender = first['from'].split('<')[0].strip() |
| | summary += f". The most recent is from {sender} about '{first['subject'][:50]}'" |
| | |
| | return summary + "." |
| |
|
| |
|
| | class CalendarService: |
| | """Google Calendar integration for Olivia.""" |
| | |
| | def __init__(self): |
| | self.service = None |
| | self._connect() |
| | |
| | def _connect(self): |
| | """Connect to Calendar API.""" |
| | creds = get_google_credentials() |
| | if creds: |
| | try: |
| | self.service = build('calendar', 'v3', credentials=creds) |
| | except Exception as e: |
| | print(f"β οΈ Calendar connection failed: {e}") |
| | |
| | def _get_all_calendar_ids(self): |
| | """Get all calendar IDs the user has access to.""" |
| | try: |
| | calendar_list = self.service.calendarList().list().execute() |
| | return [cal['id'] for cal in calendar_list.get('items', [])] |
| | except HttpError: |
| | return ['primary'] |
| | |
| | def get_today_events(self): |
| | """Get today's calendar events from ALL calendars.""" |
| | return self.get_events_for_date(days_offset=0) |
| | |
| | def get_upcoming_events(self, days=7, max_results=10): |
| | """Get upcoming events for the next N days from ALL calendars.""" |
| | if not self.service: |
| | return [] |
| | |
| | try: |
| | now = datetime.datetime.utcnow() |
| | end_date = now + datetime.timedelta(days=days) |
| | |
| | all_events = [] |
| | calendar_ids = self._get_all_calendar_ids() |
| | |
| | for cal_id in calendar_ids: |
| | try: |
| | events_result = self.service.events().list( |
| | calendarId=cal_id, |
| | timeMin=now.isoformat() + 'Z', |
| | timeMax=end_date.isoformat() + 'Z', |
| | maxResults=max_results, |
| | singleEvents=True, |
| | orderBy='startTime' |
| | ).execute() |
| | all_events.extend(events_result.get('items', [])) |
| | except HttpError: |
| | continue |
| | |
| | |
| | all_events.sort(key=lambda x: x.get('start', {}).get('dateTime', x.get('start', {}).get('date', ''))) |
| | return all_events[:max_results] |
| | except HttpError as e: |
| | print(f"β οΈ Calendar error: {e}") |
| | return [] |
| | |
| | def get_schedule_summary(self): |
| | """Get a natural language summary for Olivia to speak.""" |
| | events = self.get_today_events() |
| | |
| | if not events: |
| | return "Your calendar is clear for today - no scheduled events!" |
| | |
| | if len(events) == 1: |
| | summary = "You have 1 event today" |
| | else: |
| | summary = f"You have {len(events)} events today" |
| | |
| | |
| | for i, event in enumerate(events[:3]): |
| | start = event.get('start', '') |
| | if 'T' in start: |
| | |
| | try: |
| | time_str = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) |
| | time_formatted = time_str.strftime('%I:%M %p').lstrip('0') |
| | except: |
| | time_formatted = start |
| | else: |
| | time_formatted = "all day" |
| | |
| | if i == 0: |
| | summary += f": {event['summary']} at {time_formatted}" |
| | else: |
| | summary += f", then {event['summary']} at {time_formatted}" |
| | |
| | if len(events) > 3: |
| | summary += f", and {len(events) - 3} more" |
| | |
| | return summary + "." |
| | |
| | def get_events_for_date(self, days_offset=0): |
| | """Get events for a specific day (0=today, 1=tomorrow, etc.) from ALL calendars. |
| | Uses local date calculation to handle timezone differences.""" |
| | if not self.service: |
| | return [] |
| | |
| | try: |
| | |
| | import time |
| | local_now = datetime.datetime.now() |
| | target_date = local_now + datetime.timedelta(days=days_offset) |
| | |
| | |
| | start_of_day = target_date.replace(hour=0, minute=0, second=0, microsecond=0) |
| | end_of_day = start_of_day + datetime.timedelta(days=1) |
| | |
| | |
| | |
| | tz_offset = time.strftime('%z') |
| | tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else 'Z' |
| | |
| | time_min = start_of_day.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted |
| | time_max = end_of_day.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted |
| | |
| | all_events = [] |
| | calendar_ids = self._get_all_calendar_ids() |
| | |
| | for cal_id in calendar_ids: |
| | try: |
| | events_result = self.service.events().list( |
| | calendarId=cal_id, |
| | timeMin=time_min, |
| | timeMax=time_max, |
| | singleEvents=True, |
| | orderBy='startTime' |
| | ).execute() |
| | |
| | for e in events_result.get('items', []): |
| | all_events.append({ |
| | 'summary': e.get('summary', 'Untitled'), |
| | 'start': e.get('start', {}).get('dateTime', e.get('start', {}).get('date', '')), |
| | 'end': e.get('end', {}).get('dateTime', e.get('end', {}).get('date', '')), |
| | 'location': e.get('location', ''), |
| | }) |
| | except HttpError: |
| | continue |
| | |
| | all_events.sort(key=lambda x: x.get('start', '')) |
| | return all_events |
| | except HttpError as e: |
| | print(f"β οΈ Calendar error: {e}") |
| | return [] |
| | |
| | def get_tomorrow_summary(self): |
| | """Get a natural language summary of tomorrow's events.""" |
| | events = self.get_events_for_date(days_offset=1) |
| | |
| | if not events: |
| | return "Your calendar is clear for tomorrow - no scheduled events!" |
| | |
| | if len(events) == 1: |
| | summary = "You have 1 event tomorrow" |
| | else: |
| | summary = f"You have {len(events)} events tomorrow" |
| | |
| | for i, event in enumerate(events[:3]): |
| | start = event.get('start', '') |
| | if 'T' in start: |
| | try: |
| | time_str = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) |
| | time_formatted = time_str.strftime('%I:%M %p').lstrip('0') |
| | except: |
| | time_formatted = start |
| | else: |
| | time_formatted = "all day" |
| | |
| | if i == 0: |
| | summary += f": {event['summary']} at {time_formatted}" |
| | else: |
| | summary += f", then {event['summary']} at {time_formatted}" |
| | |
| | if len(events) > 3: |
| | summary += f", and {len(events) - 3} more" |
| | |
| | return summary + "." |
| | |
| | def create_event(self, summary, start_time, end_time, description="", location=""): |
| | """Create a new calendar event. |
| | |
| | Args: |
| | summary: Event title |
| | start_time: datetime object for start |
| | end_time: datetime object for end |
| | description: Optional event description |
| | location: Optional location |
| | |
| | Returns: |
| | Created event dict or None if failed |
| | """ |
| | if not self.service: |
| | return None |
| | |
| | try: |
| | import time |
| | tz_offset = time.strftime('%z') |
| | tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| | |
| | event = { |
| | 'summary': summary, |
| | 'location': location, |
| | 'description': description, |
| | 'start': { |
| | 'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | 'timeZone': 'America/Denver', |
| | }, |
| | 'end': { |
| | 'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | 'timeZone': 'America/Denver', |
| | }, |
| | } |
| | |
| | created_event = self.service.events().insert( |
| | calendarId='primary', |
| | body=event |
| | ).execute() |
| | |
| | print(f"β
Created event: {created_event.get('summary')}") |
| | return created_event |
| | except HttpError as e: |
| | print(f"β οΈ Failed to create event: {e}") |
| | return None |
| | |
| | def delete_event(self, event_id): |
| | """Delete a calendar event by ID.""" |
| | if not self.service: |
| | return False |
| | |
| | try: |
| | self.service.events().delete( |
| | calendarId='primary', |
| | eventId=event_id |
| | ).execute() |
| | print(f"β
Deleted event: {event_id}") |
| | return True |
| | except HttpError as e: |
| | print(f"β οΈ Failed to delete event: {e}") |
| | return False |
| | |
| | def create_all_day_event(self, summary, date, recurrence=None, description=""): |
| | """Create an all-day event (like birthdays, holidays). |
| | |
| | Args: |
| | summary: Event title |
| | date: datetime object for the date |
| | recurrence: Optional - 'yearly', 'monthly', 'weekly', or None |
| | description: Optional description |
| | |
| | Returns: |
| | Created event dict or None if failed |
| | """ |
| | if not self.service: |
| | return None |
| | |
| | try: |
| | event = { |
| | 'summary': summary, |
| | 'description': description, |
| | 'start': { |
| | 'date': date.strftime('%Y-%m-%d'), |
| | }, |
| | 'end': { |
| | 'date': date.strftime('%Y-%m-%d'), |
| | }, |
| | } |
| | |
| | |
| | if recurrence: |
| | rrule_map = { |
| | 'yearly': 'RRULE:FREQ=YEARLY', |
| | 'annually': 'RRULE:FREQ=YEARLY', |
| | 'monthly': 'RRULE:FREQ=MONTHLY', |
| | 'weekly': 'RRULE:FREQ=WEEKLY', |
| | 'daily': 'RRULE:FREQ=DAILY', |
| | } |
| | if recurrence.lower() in rrule_map: |
| | event['recurrence'] = [rrule_map[recurrence.lower()]] |
| | |
| | created_event = self.service.events().insert( |
| | calendarId='primary', |
| | body=event |
| | ).execute() |
| | |
| | print(f"β
Created all-day event: {created_event.get('summary')}") |
| | return created_event |
| | except HttpError as e: |
| | print(f"β οΈ Failed to create all-day event: {e}") |
| | return None |
| | |
| | def find_events_by_name(self, search_term, days_ahead=30): |
| | """Find events by name/summary. |
| | |
| | Args: |
| | search_term: Text to search for in event titles |
| | days_ahead: How many days ahead to search (default 30) |
| | |
| | Returns: |
| | List of matching events with id, summary, start, end, location |
| | """ |
| | if not self.service: |
| | return [] |
| | |
| | try: |
| | import time |
| | local_now = datetime.datetime.now() |
| | time_min = local_now |
| | time_max = local_now + datetime.timedelta(days=days_ahead) |
| | |
| | tz_offset = time.strftime('%z') |
| | tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| | |
| | events_result = self.service.events().list( |
| | calendarId='primary', |
| | timeMin=time_min.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | timeMax=time_max.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | singleEvents=True, |
| | orderBy='startTime', |
| | q=search_term |
| | ).execute() |
| | |
| | matches = [] |
| | for e in events_result.get('items', []): |
| | matches.append({ |
| | 'id': e.get('id'), |
| | 'summary': e.get('summary', 'Untitled'), |
| | 'start': e.get('start', {}).get('dateTime', e.get('start', {}).get('date', '')), |
| | 'end': e.get('end', {}).get('dateTime', e.get('end', {}).get('date', '')), |
| | 'location': e.get('location', ''), |
| | }) |
| | |
| | return matches |
| | except HttpError as e: |
| | print(f"β οΈ Failed to search events: {e}") |
| | return [] |
| | |
| | def update_event(self, event_id, summary=None, location=None, description=None, start_time=None, end_time=None): |
| | """Update an existing calendar event. |
| | |
| | Args: |
| | event_id: The event ID to update |
| | summary: New title (optional) |
| | location: New location (optional) |
| | description: New description (optional) |
| | start_time: New start datetime (optional) |
| | end_time: New end datetime (optional) |
| | |
| | Returns: |
| | Updated event dict or None if failed |
| | """ |
| | if not self.service: |
| | return None |
| | |
| | try: |
| | |
| | event = self.service.events().get( |
| | calendarId='primary', |
| | eventId=event_id |
| | ).execute() |
| | |
| | |
| | if summary: |
| | event['summary'] = summary |
| | if location: |
| | event['location'] = location |
| | if description: |
| | event['description'] = description |
| | |
| | if start_time: |
| | import time |
| | tz_offset = time.strftime('%z') |
| | tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| | event['start'] = { |
| | 'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | 'timeZone': 'America/Denver', |
| | } |
| | |
| | if end_time: |
| | import time |
| | tz_offset = time.strftime('%z') |
| | tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| | event['end'] = { |
| | 'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| | 'timeZone': 'America/Denver', |
| | } |
| | |
| | |
| | updated_event = self.service.events().update( |
| | calendarId='primary', |
| | eventId=event_id, |
| | body=event |
| | ).execute() |
| | |
| | print(f"β
Updated event: {updated_event.get('summary')}") |
| | return updated_event |
| | except HttpError as e: |
| | print(f"β οΈ Failed to update event: {e}") |
| | return None |
| |
|
| |
|
| | |
| | _gmail = None |
| | _calendar = None |
| |
|
| | def get_gmail(): |
| | """Get Gmail service (singleton).""" |
| | global _gmail |
| | if _gmail is None and GOOGLE_AVAILABLE: |
| | _gmail = GmailService() |
| | return _gmail |
| |
|
| | def get_calendar(): |
| | """Get Calendar service (singleton).""" |
| | global _calendar |
| | if _calendar is None and GOOGLE_AVAILABLE: |
| | _calendar = CalendarService() |
| | return _calendar |
| |
|
| |
|
| | |
| | def get_daily_briefing(): |
| | """Get a complete daily briefing for Olivia.""" |
| | parts = [] |
| | |
| | calendar = get_calendar() |
| | if calendar and calendar.service: |
| | parts.append(calendar.get_schedule_summary()) |
| | |
| | gmail = get_gmail() |
| | if gmail and gmail.service: |
| | parts.append(gmail.get_email_summary()) |
| | |
| | if not parts: |
| | return "I don't have access to your calendar or email yet. Would you like help setting that up?" |
| | |
| | return " ".join(parts) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | """Run directly to set up authentication.""" |
| | print("π Haven Google Authentication Setup") |
| | print("=" * 50) |
| | |
| | if not GOOGLE_AVAILABLE: |
| | print("\nβ Google API libraries not installed!") |
| | print(" Run: pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib") |
| | exit(1) |
| | |
| | if not CREDENTIALS_FILE.exists(): |
| | print(f"\nβ Missing credentials file: {CREDENTIALS_FILE}") |
| | print("\nTo set up Google integration:") |
| | print("1. Go to https://console.cloud.google.com/") |
| | print("2. Create a project and enable Gmail & Calendar APIs") |
| | print("3. Create OAuth 2.0 credentials (Desktop app)") |
| | print("4. Download and save as: credentials.json") |
| | print(f" Save it here: {SCRIPT_DIR}") |
| | exit(1) |
| | |
| | print("\nπ± Opening browser for Google authentication...") |
| | creds = get_google_credentials() |
| | |
| | if creds: |
| | print("\nβ
Authentication successful!") |
| | print("\nTesting services...") |
| | |
| | gmail = get_gmail() |
| | if gmail.service: |
| | print(f"π§ Gmail: {gmail.get_email_summary()}") |
| | |
| | calendar = get_calendar() |
| | if calendar.service: |
| | print(f"π
Calendar: {calendar.get_schedule_summary()}") |
| | |
| | print("\nπ Olivia now has access to your Gmail and Calendar!") |
| | else: |
| | print("\nβ Authentication failed") |
| |
|