Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| from datetime import datetime, timedelta | |
| import gradio as gr | |
| # SCOPES for Google Calendar API | |
| SCOPES = ['https://www.googleapis.com/auth/calendar.readonly', 'https://www.googleapis.com/auth/calendar'] | |
| def authenticate_google(): | |
| """Authenticate and return Google Calendar service.""" | |
| creds = None | |
| try: | |
| if os.path.exists('token.pickle'): | |
| try: | |
| with open('token.pickle', 'rb') as token: | |
| creds = pickle.load(token) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| try: | |
| creds.refresh(Request()) | |
| except Exception: | |
| os.remove('token.pickle') | |
| creds = None | |
| else: | |
| os.remove('token.pickle') | |
| creds = None | |
| except Exception: | |
| os.remove('token.pickle') | |
| creds = None | |
| if not creds: | |
| if not os.path.exists('credentials.json'): | |
| raise FileNotFoundError( | |
| "credentials.json file not found. Please download it from Google Cloud Console." | |
| ) | |
| flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| with open('token.pickle', 'wb') as token: | |
| pickle.dump(creds, token) | |
| return build('calendar', 'v3', credentials=creds) | |
| except Exception as e: | |
| raise Exception(f"Authentication failed: {str(e)}") | |
| def create_event(service, event_name, start_time, end_time, time_zone='Asia/Kolkata'): | |
| """Create an event in Google Calendar.""" | |
| try: | |
| event = { | |
| 'summary': event_name, | |
| 'start': { | |
| 'dateTime': start_time.isoformat(), | |
| 'timeZone': time_zone, | |
| }, | |
| 'end': { | |
| 'dateTime': end_time.isoformat(), | |
| 'timeZone': time_zone, | |
| }, | |
| } | |
| created_event = service.events().insert(calendarId='primary', body=event).execute() | |
| return f'Event "{event_name}" created successfully!' | |
| except Exception as e: | |
| error_msg = f"Failed to create event: {str(e)}" | |
| if "invalid_grant" in str(e): | |
| if os.path.exists('token.pickle'): | |
| os.remove('token.pickle') | |
| error_msg += "\nPlease restart the application to re-authenticate." | |
| raise Exception(error_msg) | |
| def parse_date_time(date_str, time_str): | |
| """Parse date and time strings.""" | |
| try: | |
| date_parts = date_str.split('-') | |
| if len(date_parts) != 3: | |
| raise ValueError("Date must be in DD-MM-YYYY format") | |
| day, month, year = map(int, date_parts) | |
| try: | |
| time_obj = datetime.strptime(time_str.strip().upper(), '%I:%M %p') | |
| except ValueError: | |
| raise ValueError("Time must be in HH:MM AM/PM format") | |
| start_time = datetime(year, month, day, | |
| time_obj.hour, time_obj.minute) | |
| return start_time | |
| except ValueError as e: | |
| raise ValueError(str(e)) | |
| def get_conversation_stage(history): | |
| """Determine the current stage of the conversation.""" | |
| user_messages = [msg for role, msg in history if role == "User"] | |
| if not user_messages: | |
| return 0 | |
| return len(user_messages) | |
| def conversational_flow(history, user_input): | |
| """Handle the conversation flow with improved input processing.""" | |
| if not user_input: | |
| return history | |
| # Check for restart command at any point | |
| if user_input.lower() in ['restart', 'start over', 'reset']: | |
| history.clear() | |
| history.append(("Bot", "Conversation restarted. Hi! How may I assist you?")) | |
| return history | |
| # Add user input to history | |
| history.append(("User", user_input)) | |
| stage = get_conversation_stage(history) | |
| # Process based on conversation stage | |
| if stage == 1: # After first input (Book an appointment) | |
| if "appointment" in user_input.lower() or "book" in user_input.lower(): | |
| history.append(("Bot", "What is the purpose of the appointment?")) | |
| else: | |
| history.pop() # Remove invalid input | |
| history.append(("Bot", "I can help you book an appointment. Please say 'Book an appointment' to start.")) | |
| elif stage == 2: # After purpose | |
| event_name = user_input | |
| history.append(("Bot", f"Great! You want to create an event for '{event_name}'. What is the date? (DD-MM-YYYY)")) | |
| elif stage == 3: # After date | |
| date_str = user_input | |
| try: | |
| # Validate date format | |
| date_parts = date_str.split('-') | |
| if len(date_parts) != 3 or not all(part.isdigit() for part in date_parts): | |
| raise ValueError("Date must be in DD-MM-YYYY format") | |
| history.append(("Bot", "What is the start time? (HH:MM AM/PM)")) | |
| except ValueError as e: | |
| history.pop() # Remove invalid date | |
| history.append(("Bot", f"Invalid date format: {str(e)}. Please enter the date in DD-MM-YYYY format.")) | |
| elif stage == 4: # After start time | |
| start_time_str = user_input | |
| try: | |
| datetime.strptime(start_time_str.strip().upper(), '%I:%M %p') | |
| history.append(("Bot", "What is the end time? (HH:MM AM/PM)")) | |
| except ValueError: | |
| history.pop() | |
| history.append(("Bot", "Invalid time format. Please enter the time in HH:MM AM/PM format (e.g., 02:30 PM).")) | |
| elif stage == 5: # After end time | |
| end_time_str = user_input | |
| try: | |
| # Extract event details from history | |
| event_name = [msg for role, msg in history if role == "User"][1] | |
| date_str = [msg for role, msg in history if role == "User"][2] | |
| start_time_str = [msg for role, msg in history if role == "User"][3] | |
| # Parse start and end times | |
| start_time = parse_date_time(date_str, start_time_str) | |
| end_time = parse_date_time(date_str, end_time_str) | |
| # Validate end time is after start time | |
| if end_time <= start_time: | |
| raise ValueError("End time must be after start time") | |
| # Create event | |
| try: | |
| service = authenticate_google() | |
| result = create_event(service, event_name, start_time, end_time) | |
| history.append(("Bot", result)) | |
| history.append(("Bot", "Do you need to book any other appointments? (yes/no)")) | |
| except Exception as e: | |
| history.append(("Bot", f"Error: {str(e)}")) | |
| except ValueError as e: | |
| history.pop() | |
| history.append(("Bot", f"Invalid time: {str(e)}. Please enter a valid end time in HH:MM AM/PM format.")) | |
| elif stage == 6: # After yes/no | |
| if user_input.lower() == "yes": | |
| history.clear() | |
| history.append(("Bot", "Hi! How may I assist you?")) | |
| elif user_input.lower() == "no": | |
| history.append(("Bot", "Thank you! Have a nice day.")) | |
| else: | |
| history.pop() | |
| history.append(("Bot", "Please answer with 'yes' or 'no'.")) | |
| return history | |
| def display_chat(history): | |
| """Display chat messages.""" | |
| chat_display = [] | |
| for role, message in history: | |
| if role == "User": | |
| chat_display.append([message, None]) | |
| else: | |
| chat_display.append([None, message]) | |
| return chat_display | |
| # Create Gradio interface | |
| with gr.Blocks() as iface: | |
| gr.Markdown(""" | |
| <h1>Google Calendar Appointment Bot</h1> | |
| <p>Type 'restart' at any time to start over.</p> | |
| """) | |
| chatbot = gr.Chatbot() | |
| user_input = gr.Textbox(placeholder="Type your response here...") | |
| send_button = gr.Button("Send") | |
| restart_button = gr.Button("Restart Conversation") | |
| def reset_conversation(): | |
| history = [("Bot", "Hi! How may I assist you?")] | |
| return display_chat(history), history | |
| def handle_input(user_input, history): | |
| if not user_input.strip(): | |
| return display_chat(history), history, "" | |
| history = conversational_flow(history, user_input) | |
| return display_chat(history), history, "" | |
| history_state = gr.State([]) | |
| send_button.click(handle_input, inputs=[user_input, history_state], outputs=[chatbot, history_state, user_input]) | |
| user_input.submit(handle_input, inputs=[user_input, history_state], outputs=[chatbot, history_state, user_input]) | |
| restart_button.click(reset_conversation, outputs=[chatbot, history_state]) | |
| iface.load(reset_conversation, outputs=[chatbot, history_state]) | |
| if __name__ == "__main__": | |
| iface.launch() |