Spaces:
Runtime error
Runtime error
| import fitz # PyMuPDF | |
| import json | |
| import faiss | |
| import numpy as np | |
| import re | |
| import datetime | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| # Load Firebase credentials | |
| try: | |
| cred = credentials.Certificate("account_key.json") # Ensure this file exists in your project root | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| print("β Connected to Firebase Firestore!") | |
| except Exception as e: | |
| print(f"β οΈ Firebase connection error: {e}") | |
| # Load tokenizer & model | |
| MODEL_PATH = "microsoft/phi-2" | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_PATH, | |
| quantization_config=bnb_config, | |
| device_map="auto" | |
| ) | |
| print("β Model loaded successfully!") | |
| # Load FAISS index | |
| INDEX_PATH = "gale_index.faiss" | |
| CHUNKS_PATH = "gale_chunks.json" | |
| try: | |
| index = faiss.read_index(INDEX_PATH) | |
| with open(CHUNKS_PATH, "r") as f: | |
| book_chunks = json.load(f) | |
| print("β FAISS index and text chunks loaded successfully!") | |
| except Exception as e: | |
| print(f"β οΈ Error loading FAISS index or text chunks: {e}") | |
| index = None | |
| book_chunks = [] | |
| # Load embedding model | |
| embedding_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| def retrieve_context(query, k=1): | |
| """Retrieve the most relevant medical information.""" | |
| if index is None or not book_chunks: | |
| return "No relevant medical information found." | |
| query_embedding = embedding_model.encode([query], convert_to_numpy=True).astype(np.float32) | |
| faiss.normalize_L2(query_embedding) | |
| _, indices = index.search(query_embedding, k) | |
| return " ".join([book_chunks[i] for i in indices[0] if i < len(book_chunks)]) | |
| def chat_with_rag(query): | |
| """Generate a chatbot response using retrieved context.""" | |
| context = retrieve_context(query) | |
| prompt = f"""### System: | |
| You are a medical chatbot. Provide helpful information but do not give direct diagnoses. | |
| Use phrases like "It could be" or "You might have" and suggest multiple possibilities. | |
| ### Context: | |
| {context} | |
| ### User: | |
| {query} | |
| ### Assistant: | |
| """ | |
| inputs = tokenizer(prompt, return_tensors="pt").to(device) | |
| outputs = model.generate(**inputs, max_new_tokens=50, eos_token_id=tokenizer.eos_token_id) | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True).strip() | |
| return f"{response} Consider consulting a doctor for an accurate diagnosis." | |
| # Firebase Firestore Functions | |
| def get_doctors_from_firestore(): | |
| """Retrieve all doctors' data from Firestore.""" | |
| try: | |
| doctors_ref = db.collection("doctors") | |
| docs = doctors_ref.stream() | |
| doctor_list = [] | |
| for doc in docs: | |
| doctor_data = doc.to_dict() | |
| doctor_list.append({ | |
| "name": doctor_data.get("Doctor's Name", "Unknown"), | |
| "specialty": doctor_data.get("Specialty", "Unknown"), | |
| "availableDates": doctor_data.get("availableDates", []), | |
| "availableTimes": doctor_data.get("availableTimes", []) | |
| }) | |
| return doctor_list | |
| except Exception as e: | |
| print(f"β οΈ Error fetching doctors from Firestore: {e}") | |
| return [] | |
| def get_doctor_from_firestore(doctor_name): | |
| """Retrieve a specific doctor's data from Firestore.""" | |
| doctors = get_doctors_from_firestore() | |
| for doctor in doctors: | |
| if doctor["name"].lower() == doctor_name.lower().strip(): | |
| return doctor | |
| return None | |
| def book_appointment(doctor_name, date, time, patient_name="Anonymous"): | |
| """Book an appointment if the doctor is available.""" | |
| doctor_data = get_doctor_from_firestore(doctor_name) | |
| if not doctor_data: | |
| return f"β Doctor {doctor_name} not found." | |
| if date not in doctor_data.get("availableDates", []): | |
| return f"β Dr. {doctor_name} is not available on {date}." | |
| if time not in doctor_data.get("availableTimes", []): | |
| return f"β Dr. {doctor_name} is not available at {time}." | |
| appointment_data = { | |
| "doctorName": doctor_data["name"], | |
| "specialty": doctor_data["specialty"], | |
| "date": date, | |
| "time": time, | |
| "patient_name": patient_name, | |
| "status": "Booked", | |
| "timestamp": datetime.datetime.utcnow() | |
| } | |
| db.collection("appointments").add(appointment_data) | |
| return f"β Appointment booked with Dr. {doctor_name} on {date} at {time}." | |
| # Chatbot Logic | |
| def process_chat_query(query): | |
| """Determine the type of user request and handle it accordingly.""" | |
| doctor_match = re.search(r"(Dr\.?\s+[A-Za-z]+\s+[A-Za-z]+)", query) | |
| date_match = re.search(r"(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2}", query) | |
| time_match = re.search(r"\d{1,2}:\d{2}\s?(AM|PM)", query, re.IGNORECASE) | |
| if "available doctors" in query.lower(): | |
| return list_available_doctors() | |
| if doctor_match and "available" in query.lower(): | |
| return get_doctor_availability(doctor_match.group(0).strip()) | |
| if "book an appointment" in query.lower(): | |
| if not doctor_match or not date_match or not time_match: | |
| return "Please specify the doctor's name, date, and time." | |
| return book_appointment(doctor_match.group(0).strip(), date_match.group(0).strip(), time_match.group(0).strip()) | |
| return chat_with_rag(query) | |
| def list_available_doctors(): | |
| """Return a list of all doctors and their availability.""" | |
| doctors = get_doctors_from_firestore() | |
| if not doctors: | |
| return "β No doctors available at the moment." | |
| response = "π©Ί **Available Doctors:**\n" | |
| for doctor in doctors: | |
| response += f"\nπ¨ββοΈ **{doctor['name']}** ({doctor['specialty']})\n" | |
| response += f"π Dates: {', '.join(doctor['availableDates']) if doctor['availableDates'] else 'No available dates'}\n" | |
| response += f"π Times: {', '.join(doctor['availableTimes']) if doctor['availableTimes'] else 'No available times'}\n" | |
| return response | |
| def get_doctor_availability(doctor_name): | |
| """Return availability for a specific doctor.""" | |
| doctor = get_doctor_from_firestore(doctor_name) | |
| if not doctor: | |
| return f"β Dr. {doctor_name} not found." | |
| return f"π©Ί **Dr. {doctor['name']}** ({doctor['specialty']})\nπ Dates: {', '.join(doctor['availableDates'])}\nπ Times: {', '.join(doctor['availableTimes'])}" | |
| # Test the chatbot | |
| if __name__ == "__main__": | |
| print(process_chat_query("I want to book an appointment with Dr. Ahmed Younes on March 21 at 6:00 PM.")) | |
| print(process_chat_query("I have a cough what disease do I have?")) | |
| print(process_chat_query("Show me available doctors.")) | |
| print(process_chat_query("When is Dr. Ahmed Younes available?")) | |