|
|
import os |
|
|
import gc |
|
|
import torch |
|
|
import gradio as gr |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, StoppingCriteria, StoppingCriteriaList |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_PATH = r"Muhammadidrees/JayConverstionalModel" |
|
|
MAX_NEW_TOKENS = 200 |
|
|
TEMPERATURE = 0.5 |
|
|
TOP_K = 50 |
|
|
REPETITION_PENALTY = 1.1 |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
print(f"π Loading model from {MODEL_PATH} on {device}...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_PATH, |
|
|
device_map="auto", |
|
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
|
low_cpu_mem_usage=True |
|
|
) |
|
|
|
|
|
print("β
ChatDoctor model loaded successfully!\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StopOnTokens(StoppingCriteria): |
|
|
def __init__(self, stop_ids): |
|
|
self.stop_ids = stop_ids |
|
|
|
|
|
def __call__(self, input_ids, scores, **kwargs): |
|
|
for stop_id_seq in self.stop_ids: |
|
|
if len(stop_id_seq) == 1: |
|
|
if input_ids[0][-1] == stop_id_seq[0]: |
|
|
return True |
|
|
else: |
|
|
if len(input_ids[0]) >= len(stop_id_seq): |
|
|
if input_ids[0][-len(stop_id_seq):].tolist() == stop_id_seq: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MEDICAL_KEYWORDS = [ |
|
|
"pain", "ache", "symptom", "hurt", "sore", "discomfort", "fever", "cough", "flu", |
|
|
"infection", "allergy", "diabetes", "pressure", "asthma", "migraine", "vomit", |
|
|
"stomach", "head", "chest", "throat", "heart", "lung", "liver", "kidney", "brain", |
|
|
"doctor", "hospital", "medicine", "treatment", "therapy", "surgery", "disease", |
|
|
"illness", "blood", "test", "scan", "health", "diet", "nutrition", "stress", "sleep", |
|
|
"weight", "vitamin", "fatigue", "anxiety", "depression" |
|
|
] |
|
|
|
|
|
CASUAL_ONLY_PATTERNS = [ |
|
|
"hey", "hi", "hello", "sup", "yo", "good morning", "good evening", |
|
|
"how are you", "wassup", "hiya" |
|
|
] |
|
|
|
|
|
|
|
|
def is_medical_query(message): |
|
|
message_lower = message.lower() |
|
|
for keyword in MEDICAL_KEYWORDS: |
|
|
if keyword in message_lower: |
|
|
return True |
|
|
question_words = ["what", "how", "why", "when", "where", "can", "should", "is", "are", "do", "does"] |
|
|
has_question = any(q in message_lower.split()[:3] for q in question_words) |
|
|
if has_question and len(message.split()) > 5: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def is_only_greeting(message): |
|
|
message_lower = message.lower().strip().replace("!", "").replace("?", "").replace(".", "") |
|
|
if len(message_lower.split()) <= 3: |
|
|
for pattern in CASUAL_ONLY_PATTERNS: |
|
|
if message_lower == pattern or message_lower.startswith(pattern): |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_response(user_input, history_context): |
|
|
if is_only_greeting(user_input): |
|
|
return "π Hello! I'm ChatDoctor β your AI medical assistant. Please tell me about any health symptoms or medical concerns you'd like to discuss." |
|
|
|
|
|
if not is_medical_query(user_input): |
|
|
return ( |
|
|
"Hello! I'm ChatDoctor, an AI medical assistant specialized in health and wellness.\n\n" |
|
|
"I can help you with:\n" |
|
|
"β’ Symptoms and medical conditions\n" |
|
|
"β’ Treatment and prevention advice\n" |
|
|
"β’ Fitness, diet, and mental health tips\n\n" |
|
|
"Please describe your health concern in detail to get started." |
|
|
) |
|
|
|
|
|
human_prefix = "Patient:" |
|
|
doctor_prefix = "ChatDoctor:" |
|
|
system_instruction = ( |
|
|
"You are ChatDoctor, a professional medical AI assistant. " |
|
|
"You provide accurate, concise, and empathetic responses to health-related questions only.\n\n" |
|
|
"If the question is non-medical, politely redirect back to medical topics.\n" |
|
|
) |
|
|
|
|
|
|
|
|
history_text = [system_instruction] |
|
|
for human, assistant in history_context: |
|
|
if human: |
|
|
history_text.append(f"{human_prefix} {human}") |
|
|
if assistant: |
|
|
history_text.append(f"{doctor_prefix} {assistant}") |
|
|
history_text.append(f"{human_prefix} {user_input}") |
|
|
|
|
|
prompt = "\n".join(history_text) + f"\n{doctor_prefix} " |
|
|
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) |
|
|
|
|
|
stop_words = ["Patient:", "\nPatient:", "Patient :", "\n\nPatient"] |
|
|
stop_ids = [tokenizer.encode(word, add_special_tokens=False) for word in stop_words] |
|
|
stopping_criteria = StoppingCriteriaList([StopOnTokens(stop_ids)]) |
|
|
|
|
|
with torch.no_grad(): |
|
|
output_ids = model.generate( |
|
|
input_ids, |
|
|
max_new_tokens=MAX_NEW_TOKENS, |
|
|
do_sample=True, |
|
|
temperature=TEMPERATURE, |
|
|
top_k=TOP_K, |
|
|
repetition_penalty=REPETITION_PENALTY, |
|
|
stopping_criteria=stopping_criteria, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
eos_token_id=tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
response = tokenizer.decode(output_ids[0], skip_special_tokens=True)[len(prompt):].strip() |
|
|
|
|
|
for stop_word in ["Patient:", "Patient :", "\nPatient", "Patient"]: |
|
|
if stop_word in response: |
|
|
response = response.split(stop_word)[0].strip() |
|
|
break |
|
|
|
|
|
response = response.strip() |
|
|
if any(x in response.lower() for x in ["chatbot", "api key", "error", "cloud"]): |
|
|
response = ( |
|
|
"I apologize for the confusion β I'm ChatDoctor, trained to assist with medical and health-related topics only. " |
|
|
"Please tell me about your symptoms or health concerns." |
|
|
) |
|
|
|
|
|
del input_ids, output_ids |
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
#header { |
|
|
text-align: center; |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
padding: 20px; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 20px; |
|
|
} |
|
|
#header h1 { margin: 0; font-size: 2.3em; } |
|
|
#header p { margin: 5px 0 0; font-size: 1em; opacity: 0.9; } |
|
|
.disclaimer { |
|
|
background-color: #fff3cd; |
|
|
border: 1px solid #ffc107; |
|
|
border-radius: 8px; |
|
|
padding: 15px; |
|
|
margin: 20px 0; |
|
|
color: #856404; |
|
|
} |
|
|
""" |
|
|
|
|
|
with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: |
|
|
gr.HTML(""" |
|
|
<div id="header"> |
|
|
<h1>π©Ί ChatDoctor AI Assistant</h1> |
|
|
<p>Your AI-powered medical consultation partner</p> |
|
|
</div> |
|
|
""") |
|
|
gr.HTML(""" |
|
|
<div class="disclaimer"> |
|
|
<h3>β οΈ Medical Disclaimer</h3> |
|
|
<p>This AI assistant is for informational purposes only. |
|
|
It is NOT a substitute for professional medical advice, diagnosis, or treatment.</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
|
height=480, |
|
|
placeholder="<div style='text-align:center;padding:40px;'><h3>π Welcome to ChatDoctor!</h3><p>Describe your symptoms or ask a health-related question to begin.</p></div>", |
|
|
show_label=False, |
|
|
avatar_images=(None, "π€"), |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
msg = gr.Textbox(placeholder="Type your medical concern here...", show_label=False, scale=9, container=False) |
|
|
send_btn = gr.Button("Send π€", scale=1, variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
clear_btn = gr.Button("ποΈ Clear Chat", scale=1) |
|
|
retry_btn = gr.Button("π Retry", scale=1) |
|
|
|
|
|
with gr.Accordion("βοΈ Advanced Settings", open=False): |
|
|
temp_slider = gr.Slider(0.1, 1.0, TEMPERATURE, 0.1, label="Temperature") |
|
|
max_tok_slider = gr.Slider(50, 500, MAX_NEW_TOKENS, 50, label="Max Tokens") |
|
|
top_k_slider = gr.Slider(1, 100, TOP_K, 1, label="Top-K") |
|
|
|
|
|
def user_message(user_msg, history): |
|
|
return "", history + [[user_msg, None]] |
|
|
|
|
|
def bot_response(history, temp, max_tok, topk): |
|
|
global TEMPERATURE, MAX_NEW_TOKENS, TOP_K |
|
|
TEMPERATURE, MAX_NEW_TOKENS, TOP_K = temp, int(max_tok), int(topk) |
|
|
user_msg = history[-1][0] |
|
|
bot_msg = get_response(user_msg, history[:-1]) |
|
|
history[-1][1] = bot_msg |
|
|
return history |
|
|
|
|
|
def retry_last(history, temp, max_tok, topk): |
|
|
if not history: |
|
|
return history |
|
|
user_msg = history[-1][0] |
|
|
bot_msg = get_response(user_msg, history[:-1]) |
|
|
history[-1][1] = bot_msg |
|
|
return history |
|
|
|
|
|
msg.submit(user_message, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
|
bot_response, [chatbot, temp_slider, max_tok_slider, top_k_slider], chatbot |
|
|
) |
|
|
send_btn.click(user_message, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
|
bot_response, [chatbot, temp_slider, max_tok_slider, top_k_slider], chatbot |
|
|
) |
|
|
clear_btn.click(lambda: None, None, chatbot, queue=False) |
|
|
retry_btn.click(retry_last, [chatbot, temp_slider, max_tok_slider, top_k_slider], chatbot) |
|
|
|
|
|
gr.HTML(f"<footer><center><p>π§ Powered by LLaMA-based ChatDoctor | Device: {device.upper()}</p></center></footer>") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\nπ‘ Launching ChatDoctor Gradio Interface...") |
|
|
demo.queue() |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |
|
|
|