Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import datetime | |
| import pytz | |
| import uuid | |
| import re | |
| import json | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| import os | |
| import gc | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Log startup | |
| logger.info("Starting appointment booking application...") | |
| # Set up timezone | |
| IST = pytz.timezone('Asia/Kolkata') | |
| # ===== CONFIGURATION ===== | |
| # Model ID on Hugging Face | |
| MODEL_ID = "meta-llama/Meta-Llama-3.1-8B-Instruct" | |
| # Google Calendar API Configuration | |
| SCOPES = ['https://www.googleapis.com/auth/calendar'] | |
| SERVICE_ACCOUNT_FILE = 'service-account-key.json' | |
| CALENDAR_ID = '26f5856049fab3d6648a2f1dea57c70370de6bc1629a5182be1511b0e75d11d3@group.calendar.google.com' # Update with your calendar ID if not using primary | |
| # Local appointments database (for backup) | |
| appointments_db = {} | |
| # ===== GOOGLE CALENDAR FUNCTIONS ===== | |
| def get_calendar_service(): | |
| """Get Google Calendar service""" | |
| try: | |
| # Check if Google credentials are stored in env variable | |
| google_credentials = os.environ.get('GOOGLE_CREDENTIALS') | |
| if google_credentials: | |
| logger.info("Using Google credentials from environment variable") | |
| # Write the credentials to a temporary file | |
| with open('temp_credentials.json', 'w') as f: | |
| f.write(google_credentials) | |
| temp_file_path = 'temp_credentials.json' | |
| credentials = service_account.Credentials.from_service_account_file( | |
| temp_file_path, scopes=SCOPES) | |
| elif os.path.exists(SERVICE_ACCOUNT_FILE): | |
| logger.info(f"Using Google credentials from file: {SERVICE_ACCOUNT_FILE}") | |
| # Use the file on disk | |
| credentials = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, scopes=SCOPES) | |
| else: | |
| logger.warning("No Google Calendar credentials found") | |
| return None | |
| service = build('calendar', 'v3', credentials=credentials) | |
| return service | |
| except Exception as e: | |
| logger.error(f"Error getting calendar service: {e}") | |
| return None | |
| def add_to_google_calendar(appointment_details): | |
| """Add an appointment to Google Calendar""" | |
| try: | |
| service = get_calendar_service() | |
| if not service: | |
| return None | |
| # Format start and end time | |
| date_str = appointment_details["date"] | |
| time_str = appointment_details["time"] | |
| # Parse date and time | |
| date_parts = date_str.split('-') | |
| year, month, day = int(date_parts[0]), int(date_parts[1]), int(date_parts[2]) | |
| time_parts = time_str.split(' ') | |
| time_val = time_parts[0] | |
| meridian = time_parts[1] if len(time_parts) > 1 else 'AM' | |
| hours, minutes = map(int, time_val.split(':')) | |
| if meridian.upper() == 'PM' and hours != 12: | |
| hours += 12 | |
| if meridian.upper() == 'AM' and hours == 12: | |
| hours = 0 | |
| # Create datetime objects | |
| start_time = datetime.datetime(year, month, day, hours, minutes, 0, tzinfo=IST) | |
| end_time = start_time + datetime.timedelta(hours=1) # Default 1 hour appointment | |
| # Create event | |
| event = { | |
| 'summary': f"Appointment with {appointment_details['name']}", | |
| 'location': 'Office', | |
| 'description': 'Appointment booked via AI Assistant', | |
| 'start': { | |
| 'dateTime': start_time.isoformat(), | |
| 'timeZone': 'Asia/Kolkata', | |
| }, | |
| 'end': { | |
| 'dateTime': end_time.isoformat(), | |
| 'timeZone': 'Asia/Kolkata', | |
| }, | |
| 'reminders': { | |
| 'useDefault': False, | |
| 'overrides': [ | |
| {'method': 'email', 'minutes': 24 * 60}, | |
| {'method': 'popup', 'minutes': 10}, | |
| ], | |
| }, | |
| } | |
| # Add unique ID to track for cancellation | |
| appointment_id = appointment_details.get('appointment_id', str(uuid.uuid4())) | |
| event['extendedProperties'] = { | |
| 'private': { | |
| 'appointment_id': appointment_id | |
| } | |
| } | |
| # Insert event | |
| created_event = service.events().insert(calendarId=CALENDAR_ID, body=event).execute() | |
| return created_event['id'] | |
| except Exception as e: | |
| logger.error(f"Error adding to Google Calendar: {e}") | |
| return None | |
| # ===== FUNCTION DEFINITIONS ===== | |
| function_definitions = [ | |
| { | |
| "name": "book_appointment", | |
| "description": "Book an appointment", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "name": { | |
| "type": "string", | |
| "description": "The name of the person" | |
| }, | |
| "date": { | |
| "type": "string", | |
| "description": "The date in YYYY-MM-DD format" | |
| }, | |
| "time": { | |
| "type": "string", | |
| "description": "The time of the appointment (e.g., '10:00 AM')" | |
| } | |
| }, | |
| "required": ["name", "date", "time"] | |
| } | |
| } | |
| ] | |
| # ===== FUNCTION IMPLEMENTATIONS ===== | |
| def book_appointment(appointment_details): | |
| """Book an appointment with just name, date and time""" | |
| try: | |
| # Generate a unique appointment ID | |
| appointment_id = str(uuid.uuid4())[:8] # Shorter ID for simplicity | |
| # Add appointment ID to details | |
| appointment_details['appointment_id'] = appointment_id | |
| # Store in local database | |
| appointments_db[appointment_id] = appointment_details | |
| # Add to Google Calendar | |
| calendar_event_id = add_to_google_calendar(appointment_details) | |
| if calendar_event_id: | |
| # Store the calendar event ID | |
| appointments_db[appointment_id]['calendar_event_id'] = calendar_event_id | |
| return { | |
| "success": True, | |
| "appointment_id": appointment_id, | |
| "message": "Appointment successfully booked and added to calendar", | |
| "details": { | |
| "name": appointment_details["name"], | |
| "date": appointment_details["date"], | |
| "time": appointment_details["time"], | |
| "location": "Office" | |
| } | |
| } | |
| else: | |
| return { | |
| "success": True, | |
| "appointment_id": appointment_id, | |
| "message": "Appointment booked but failed to add to calendar (offline mode)", | |
| "details": { | |
| "name": appointment_details["name"], | |
| "date": appointment_details["date"], | |
| "time": appointment_details["time"], | |
| "location": "Office" | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in book_appointment: {e}") | |
| return { | |
| "success": False, | |
| "message": f"Failed to book appointment: {str(e)}" | |
| } | |
| # ===== MODEL MANAGEMENT ===== | |
| # Global model and tokenizer - SINGLETON PATTERN | |
| model = None | |
| tokenizer = None | |
| def free_memory(): | |
| """Free memory by clearing cache and running garbage collection""" | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| logger.info(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB") | |
| logger.info(f"GPU memory reserved: {torch.cuda.memory_reserved() / 1024**3:.2f} GB") | |
| def load_llama_model(): | |
| """Load the Llama 3.1 model and tokenizer using singleton pattern""" | |
| global model, tokenizer | |
| # If model already loaded, return the existing instances | |
| if model is not None and tokenizer is not None: | |
| return True | |
| logger.info("Loading Llama 3.1 model and tokenizer...") | |
| free_memory() | |
| try: | |
| # Set up quantization config for better memory efficiency | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True | |
| ) | |
| # Load tokenizer | |
| tokenizer_local = AutoTokenizer.from_pretrained(MODEL_ID) | |
| logger.info("Tokenizer loaded successfully") | |
| # Load model with optimized settings | |
| model_local = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| quantization_config=quantization_config, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| low_cpu_mem_usage=True | |
| ) | |
| logger.info("Model loaded successfully") | |
| # Store in global variables | |
| model = model_local | |
| tokenizer = tokenizer_local | |
| free_memory() | |
| logger.info("Model and tokenizer initialization complete") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| return False | |
| # ===== CHAT PROCESSING ===== | |
| def format_prompt_with_functions(messages, system_prompt): | |
| """Format the prompt for Llama 3.1 with function definitions""" | |
| # Add function definitions to system prompt | |
| full_system_prompt = system_prompt + "\n\n" | |
| full_system_prompt += "You have access to the following functions that you MUST use for specific user queries:\n" | |
| for func in function_definitions: | |
| full_system_prompt += f"- {func['name']}: {func['description']}\n" | |
| full_system_prompt += " Parameters:\n" | |
| for param_name, param_info in func['parameters']['properties'].items(): | |
| required = "required" if param_name in func['parameters'].get('required', []) else "optional" | |
| full_system_prompt += f" - {param_name} ({required}): {param_info.get('description', '')}\n" | |
| full_system_prompt += "\nIMPORTANT: When a user asks to book an appointment, you MUST respond using the following JSON format:\n" | |
| full_system_prompt += '```json\n{"function_call": {"name": "function_name", "arguments": {"arg1": "value1", "arg2": "value2"}}}\n```\n' | |
| full_system_prompt += "You MUST collect all required information first: name, date, and time." | |
| full_system_prompt += "\n\nFor non-function-calling queries, respond in a conversational manner." | |
| # Format conversation history | |
| formatted_messages = [ | |
| {"role": "system", "content": full_system_prompt} | |
| ] | |
| # Add conversation history | |
| for message in messages: | |
| if message["role"] == "function": | |
| # Convert function results to assistant format for Llama 3.1 | |
| formatted_messages.append({ | |
| "role": "assistant", | |
| "content": f"I'll process the function result: {message['content']}" | |
| }) | |
| else: | |
| formatted_messages.append(message) | |
| return formatted_messages | |
| def extract_function_call(response_text): | |
| """Extract function call from model response""" | |
| # Look for JSON block in the response | |
| json_pattern = r'```json\s*(.*?)\s*```' | |
| json_matches = re.findall(json_pattern, response_text, re.DOTALL) | |
| if not json_matches: | |
| # Try alternative pattern without markdown | |
| json_pattern = r'({.*"function_call".*})' | |
| json_matches = re.findall(json_pattern, response_text, re.DOTALL) | |
| if json_matches: | |
| try: | |
| for json_str in json_matches: | |
| parsed_json = json.loads(json_str.strip()) | |
| if "function_call" in parsed_json: | |
| function_call = parsed_json["function_call"] | |
| return { | |
| "id": str(uuid.uuid4()), | |
| "name": function_call["name"], | |
| "arguments": function_call["arguments"] | |
| } | |
| except json.JSONDecodeError: | |
| logger.error(f"Failed to parse JSON: {json_matches[0]}") | |
| return None | |
| def safe_generate(inputs, max_new_tokens=512): | |
| """Safely generate text with error handling and memory management""" | |
| global model, tokenizer | |
| try: | |
| free_memory() | |
| # Generate with appropriate settings | |
| outputs = model.generate( | |
| inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| response_text = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) | |
| free_memory() | |
| return response_text | |
| except Exception as e: | |
| logger.error(f"Error in generation: {e}") | |
| free_memory() | |
| return f"Error generating response: {str(e)}" | |
| def process_chat(message, chat_history): | |
| """Process a chat message, calling functions when necessary""" | |
| global model, tokenizer | |
| if model is None or tokenizer is None: | |
| error_msg = "Model not loaded properly. Please click 'Reload Model' and try again." | |
| new_history = chat_history + [(message, error_msg)] | |
| return new_history, new_history | |
| try: | |
| # Create system prompt | |
| system_prompt = """You are a friendly appointment booking assistant. You help users book appointments by collecting their name, preferred date, and time. | |
| CRITICALLY IMPORTANT: NEVER make up or hallucinate appointment details. If the user has not explicitly provided name, date, or time, you MUST ask for these details before calling any function. | |
| Follow these strict rules for appointment booking: | |
| 1. When a user asks to book an appointment, first check if they've provided name, date, and time. | |
| 2. If ANY of these details are missing, do NOT call the book_appointment function. Instead, politely ask the user for the missing information. | |
| 3. ONLY call the book_appointment function when you have collected ALL required information directly from the user. | |
| 4. NEVER invent, assume, or hallucinate ANY details - even common names like "John Doe" or dates like "tomorrow". | |
| 5. Use YYYY-MM-DD format for dates (e.g., 2025-05-15) and clear time format with AM/PM (e.g., 10:00 AM). | |
| If the user says something like "book an appointment" without providing details, your ONLY correct response is to ask for their name, preferred date, and time - NOT to make up this information or call the function.""" | |
| # Convert Gradio chat history to message format | |
| messages = [] | |
| # Limit history to last 3 exchanges to save memory | |
| limited_chat_history = chat_history[-3:] if len(chat_history) > 3 else chat_history | |
| for user_msg, bot_msg in limited_chat_history: | |
| messages.append({"role": "user", "content": user_msg}) | |
| messages.append({"role": "assistant", "content": bot_msg}) | |
| # Add current message | |
| messages.append({"role": "user", "content": message}) | |
| # Format messages with function calling info | |
| formatted_messages = format_prompt_with_functions(messages, system_prompt) | |
| # Generate model response with error handling | |
| try: | |
| inputs = tokenizer.apply_chat_template( | |
| formatted_messages, | |
| tokenize=True, | |
| add_generation_prompt=True, | |
| return_tensors="pt" | |
| ).to(model.device) | |
| # First generation | |
| response_text = safe_generate(inputs, max_new_tokens=512) | |
| logger.info(f"Model response: {response_text[:100]}...") | |
| # Check if response contains a function call | |
| function_call = extract_function_call(response_text) | |
| # Additional validation to prevent hallucination | |
| if function_call and function_call["name"] == "book_appointment": | |
| # Verify all required fields are present | |
| args = function_call["arguments"] | |
| required_fields = ["name", "date", "time"] | |
| missing_fields = [field for field in required_fields if field not in args or not args[field]] | |
| # Check if any date/time looks made up (basic validation) | |
| looks_made_up = False | |
| # Check for generic placeholder names | |
| if "name" in args and args["name"].lower() in ["john", "john doe", "jane", "jane doe", "test", "user"]: | |
| logger.warning(f"Detected likely hallucinated name: {args['name']}") | |
| looks_made_up = True | |
| # Don't proceed if missing fields or suspicious data | |
| if missing_fields or looks_made_up: | |
| logger.warning(f"Detected hallucination attempt. Missing fields: {missing_fields}, Suspicious data: {looks_made_up}") | |
| # Skip function calling and let the model ask for the missing information | |
| new_chat_history = chat_history + [(message, response_text)] | |
| return new_chat_history, new_chat_history | |
| # Execute the booking function | |
| function_result = book_appointment(function_call["arguments"]) | |
| logger.info(f"Function result: {json.dumps(function_result)[:200]}...") | |
| # Add the function result to messages | |
| messages.append({ | |
| "role": "assistant", | |
| "content": response_text, | |
| }) | |
| messages.append({ | |
| "role": "function", | |
| "name": "book_appointment", | |
| "content": json.dumps(function_result) | |
| }) | |
| # Format messages for second call | |
| formatted_messages = format_prompt_with_functions(messages, system_prompt) | |
| # Generate second response | |
| inputs = tokenizer.apply_chat_template( | |
| formatted_messages, | |
| tokenize=True, | |
| add_generation_prompt=True, | |
| return_tensors="pt" | |
| ).to(model.device) | |
| second_response = safe_generate(inputs, max_new_tokens=512) | |
| logger.info(f"Second model response: {second_response[:100]}...") | |
| # Update chat history | |
| new_chat_history = chat_history + [(message, second_response)] | |
| return new_chat_history, new_chat_history | |
| else: | |
| # No function call, just return the response | |
| new_chat_history = chat_history + [(message, response_text)] | |
| return new_chat_history, new_chat_history | |
| except Exception as e: | |
| logger.error(f"Error in generation: {e}") | |
| error_msg = f"Sorry, I couldn't generate a response. Please try a simpler question or try again later." | |
| new_chat_history = chat_history + [(message, error_msg)] | |
| return new_chat_history, new_chat_history | |
| except Exception as e: | |
| logger.error(f"Error in process_chat: {e}") | |
| error_msg = f"Sorry, I encountered an error. Please try again." | |
| new_chat_history = chat_history + [(message, error_msg)] | |
| return new_chat_history, new_chat_history | |
| # ===== GRADIO INTERFACE ===== | |
| def create_gradio_interface(): | |
| """Create the Gradio interface for the chatbot""" | |
| logger.info("Creating Gradio interface...") | |
| with gr.Blocks(css=""" | |
| .gradio-container {max-width: 800px !important} | |
| .chat-window {height: 600px !important; overflow-y: auto} | |
| """) as demo: | |
| gr.Markdown("# Simple Appointment Booking Assistant") | |
| gr.Markdown("### Tell me your name, date and time to book an appointment") | |
| # Model status indicator | |
| with gr.Row(): | |
| model_status = gr.Textbox( | |
| label="Model Status", | |
| value="Loading model...", | |
| interactive=False | |
| ) | |
| # Calendar integration status | |
| with gr.Row(): | |
| calendar_status = gr.Textbox( | |
| label="Calendar Integration Status", | |
| value="Checking Google Calendar integration...", | |
| interactive=False | |
| ) | |
| # Function to check Google Calendar connectivity | |
| def check_calendar_integration(): | |
| try: | |
| service = get_calendar_service() | |
| if service: | |
| return "Google Calendar integration is active. Appointments will be saved to calendar." | |
| else: | |
| return "Google Calendar integration is not available. Appointments will only be stored in memory." | |
| except Exception as e: | |
| logger.error(f"Error checking calendar integration: {str(e)}") | |
| return f"Error checking calendar integration: {str(e)}" | |
| # Chatbot interface | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| label="Chat with Appointment Assistant", | |
| height=500 | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| show_label=False, | |
| placeholder="Type your message here...", | |
| container=False | |
| ) | |
| submit = gr.Button("Send") | |
| with gr.Row(): | |
| clear = gr.Button("Clear Conversation") | |
| reload_model = gr.Button("Reload Model") | |
| # Provide instructions | |
| with gr.Accordion("Instructions", open=False): | |
| gr.Markdown(""" | |
| ## How to use this appointment booking assistant: | |
| Simply tell the assistant you want to book an appointment and provide: | |
| 1. Your name | |
| 2. The date you want (in YYYY-MM-DD format) | |
| 3. The time you want (like "10:00 AM") | |
| ### Example messages: | |
| - "I'd like to book an appointment" | |
| - "Book an appointment for John Smith on 2025-05-20 at 2:30 PM" | |
| - "Can I schedule a meeting tomorrow at 10 AM?" | |
| """) | |
| chat_history = gr.State([]) | |
| def initialize_model(): | |
| """Initialize the model on app startup""" | |
| success = load_llama_model() | |
| status = "Model loaded successfully!" if success else "Error loading model. Try clicking 'Reload Model'." | |
| cal_status = check_calendar_integration() | |
| return status, [], cal_status | |
| def reload_model_click(): | |
| """Force reload the model and free memory""" | |
| global model, tokenizer | |
| # Clear global variables | |
| model = None | |
| tokenizer = None | |
| # Free memory | |
| free_memory() | |
| # Reload model | |
| success = load_llama_model() | |
| status = "Model reloaded successfully!" if success else "Error reloading model. Check logs for details." | |
| cal_status = check_calendar_integration() | |
| return status, [], cal_status | |
| # Set up event handlers | |
| submit.click( | |
| process_chat, | |
| inputs=[msg, chat_history], | |
| outputs=[chatbot, chat_history] | |
| ).then( | |
| lambda: "", | |
| None, | |
| msg | |
| ) | |
| msg.submit( | |
| process_chat, | |
| inputs=[msg, chat_history], | |
| outputs=[chatbot, chat_history] | |
| ).then( | |
| lambda: "", | |
| None, | |
| msg | |
| ) | |
| clear.click( | |
| lambda: [], | |
| inputs=None, | |
| outputs=[chat_history] | |
| ).then( | |
| lambda: [], | |
| inputs=None, | |
| outputs=[chatbot] | |
| ) | |
| reload_model.click( | |
| reload_model_click, | |
| inputs=None, | |
| outputs=[model_status, chat_history, calendar_status] | |
| ).then( | |
| lambda: [], | |
| inputs=None, | |
| outputs=[chatbot] | |
| ) | |
| # Initial welcome message | |
| demo.load( | |
| initialize_model, | |
| inputs=None, | |
| outputs=[model_status, chat_history, calendar_status] | |
| ).then( | |
| lambda: [("", "Hello! I'm your appointment booking assistant. I can help you schedule an appointment. Please provide your name, preferred date (YYYY-MM-DD format), and time (like 10:00 AM) when you want to book an appointment.")], | |
| inputs=None, | |
| outputs=[chatbot] | |
| ) | |
| return demo | |
| # ===== MAIN EXECUTION ===== | |
| if __name__ == "__main__": | |
| logger.info("===== Simple Appointment Booking Assistant =====") | |
| logger.info("Using Llama 3.1-8B-Instruct") | |
| # Set PyTorch environment variables for memory efficiency | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128,garbage_collection_threshold:0.8" | |
| try: | |
| # Create and launch the Gradio interface | |
| logger.info("Creating demo...") | |
| demo = create_gradio_interface() | |
| logger.info("Demo created, launching...") | |
| demo.launch(share=False, debug=True) | |
| logger.info("Gradio interface launched successfully") | |
| except Exception as e: | |
| logger.error(f"Error launching Gradio interface: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) |