Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from google.auth.transport.requests import Request as GoogleRequest | |
| import os | |
| import logging | |
| # ---------- FastAPI Setup ---------- | |
| app = FastAPI() | |
| origins = [ | |
| "https://e04c-49-206-114-222.ngrok-free.app", | |
| "http://localhost:8501" | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ---------- Pydantic Schema ---------- | |
| class EventDetails(BaseModel): | |
| summary: str | |
| location: str | None = '' | |
| description: str | None = '' | |
| attendees: list[dict] | None = [] | |
| start: str | |
| end: str | |
| timeZone: str | None = 'Asia/Kolkata' | |
| # ---------- Schedule Endpoint ---------- | |
| async def schedule_event(event_details: EventDetails): | |
| SCOPES = ['https://www.googleapis.com/auth/calendar'] | |
| creds = None | |
| if os.path.exists('token.json'): | |
| creds = Credentials.from_authorized_user_file('token.json', SCOPES) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(GoogleRequest()) | |
| else: | |
| flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
| flow.redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' | |
| auth_url, _ = flow.authorization_url(prompt='consent') | |
| print(f"Go to this URL: {auth_url}") | |
| code = input("Enter the authorization code: ") | |
| flow.fetch_token(code=code) | |
| creds = flow.credentials | |
| with open('token.json', 'w') as token: | |
| token.write(creds.to_json()) | |
| try: | |
| service = build('calendar', 'v3', credentials=creds) | |
| logging.info(f"Service type: {type(service)}") | |
| event = { | |
| 'summary': event_details.summary, | |
| 'location': event_details.location, | |
| 'description': event_details.description, | |
| 'attendees': event_details.attendees, | |
| 'start': { | |
| 'dateTime': event_details.start, | |
| 'timeZone': event_details.timeZone, | |
| }, | |
| 'end': { | |
| 'dateTime': event_details.end, | |
| 'timeZone': event_details.timeZone, | |
| }, | |
| 'reminders': { | |
| 'useDefault': False, | |
| 'overrides': [ | |
| {'method': 'email', 'minutes': 24 * 60}, | |
| {'method': 'popup', 'minutes': 10}, | |
| ], | |
| }, | |
| } | |
| created_event = service.events().insert(calendarId='primary', body=event, sendUpdates='all').execute() | |
| return { | |
| "status": "success", | |
| "event_link": created_event.get('htmlLink'), | |
| "message": f"Event created: {created_event.get('summary')}" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error_message": str(e) | |
| } |