|
|
import os |
|
|
import gc |
|
|
import torch |
|
|
import gradio as gr |
|
|
from transformers import LlamaTokenizer, LlamaForCausalLM, StoppingCriteria, StoppingCriteriaList |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_PATH = r"Muhammadidrees/JayConverstionalModel" |
|
|
MAX_NEW_TOKENS = 200 |
|
|
TEMPERATURE = 0.5 |
|
|
TOP_K = 50 |
|
|
REPETITION_PENALTY = 1.1 |
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
print(f"Loading model from {MODEL_PATH} on {device}...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = LlamaTokenizer.from_pretrained(MODEL_PATH) |
|
|
model = LlamaForCausalLM.from_pretrained( |
|
|
MODEL_PATH, |
|
|
device_map="auto", |
|
|
torch_dtype=torch.float16, |
|
|
low_cpu_mem_usage=True |
|
|
) |
|
|
|
|
|
generator = model.generate |
|
|
print("β
ChatDoctor model loaded successfully!\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = """You are ChatDoctor, a professional medical AI assistant. Your role is to: |
|
|
- Provide helpful medical information and guidance |
|
|
- Ask relevant follow-up questions about symptoms |
|
|
- Offer general health advice |
|
|
- Remind patients to consult healthcare professionals for serious concerns |
|
|
|
|
|
You should ONLY engage in health and medical-related conversations. If someone greets you casually or asks non-medical questions, politely redirect them to ask about their health concerns.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StopOnTokens(StoppingCriteria): |
|
|
def __init__(self, stop_ids): |
|
|
self.stop_ids = stop_ids |
|
|
|
|
|
def __call__(self, input_ids, scores, **kwargs): |
|
|
for stop_id_seq in self.stop_ids: |
|
|
if len(stop_id_seq) == 1: |
|
|
if input_ids[0][-1] == stop_id_seq[0]: |
|
|
return True |
|
|
else: |
|
|
if len(input_ids[0]) >= len(stop_id_seq): |
|
|
if input_ids[0][-len(stop_id_seq):].tolist() == stop_id_seq: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
conversation_history = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_greeting_or_casual(message): |
|
|
"""Check if message is just a casual greeting""" |
|
|
casual_patterns = [ |
|
|
"hey", "hi", "hello", "sup", "what's up", "whats up", |
|
|
"how are you", "good morning", "good evening", "good afternoon" |
|
|
] |
|
|
|
|
|
message_lower = message.lower().strip() |
|
|
|
|
|
|
|
|
if len(message_lower.split()) <= 3: |
|
|
for pattern in casual_patterns: |
|
|
if message_lower == pattern or message_lower.startswith(pattern): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_response(user_input, history_context): |
|
|
"""Generate response from ChatDoctor model""" |
|
|
|
|
|
|
|
|
if is_greeting_or_casual(user_input): |
|
|
return "Hello! I'm ChatDoctor, your AI medical assistant. I'm here to help with your health concerns and medical questions. What health-related issue can I assist you with today?" |
|
|
|
|
|
human_invitation = "Patient: " |
|
|
doctor_invitation = "ChatDoctor: " |
|
|
|
|
|
|
|
|
history_text = [SYSTEM_PROMPT] |
|
|
for human, assistant in history_context: |
|
|
if human: |
|
|
history_text.append(human_invitation + human) |
|
|
if assistant: |
|
|
history_text.append(doctor_invitation + assistant) |
|
|
|
|
|
|
|
|
history_text.append(human_invitation + user_input) |
|
|
|
|
|
|
|
|
prompt = "\n".join(history_text) + "\n" + doctor_invitation |
|
|
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) |
|
|
|
|
|
|
|
|
stop_words = ["Patient:", "\nPatient:", "Patient :", "\n\nPatient"] |
|
|
stop_ids = [tokenizer.encode(word, add_special_tokens=False) for word in stop_words] |
|
|
stopping_criteria = StoppingCriteriaList([StopOnTokens(stop_ids)]) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
output_ids = generator( |
|
|
input_ids, |
|
|
max_new_tokens=MAX_NEW_TOKENS, |
|
|
do_sample=True, |
|
|
temperature=TEMPERATURE, |
|
|
top_k=TOP_K, |
|
|
repetition_penalty=REPETITION_PENALTY, |
|
|
stopping_criteria=stopping_criteria, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
eos_token_id=tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
|
|
|
full_output = tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
response = full_output[len(prompt):].strip() |
|
|
|
|
|
|
|
|
for stop_word in ["Patient:", "Patient :", "\nPatient:", "\nPatient", "Patient"]: |
|
|
if stop_word in response: |
|
|
response = response.split(stop_word)[0].strip() |
|
|
break |
|
|
|
|
|
response = response.strip() |
|
|
|
|
|
|
|
|
del input_ids, output_ids |
|
|
gc.collect() |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chat_function(message, history): |
|
|
"""Gradio chat interface function""" |
|
|
if not message.strip(): |
|
|
return "" |
|
|
|
|
|
try: |
|
|
response = get_response(message, history) |
|
|
return response |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
#header { |
|
|
text-align: center; |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
padding: 20px; |
|
|
border-radius: 10px; |
|
|
margin-bottom: 20px; |
|
|
} |
|
|
|
|
|
#header h1 { |
|
|
margin: 0; |
|
|
font-size: 2.5em; |
|
|
} |
|
|
|
|
|
#header p { |
|
|
margin: 10px 0 0 0; |
|
|
font-size: 1.1em; |
|
|
opacity: 0.9; |
|
|
} |
|
|
|
|
|
.disclaimer { |
|
|
background-color: #fff3cd; |
|
|
border: 1px solid #ffc107; |
|
|
border-radius: 8px; |
|
|
padding: 15px; |
|
|
margin: 20px 0; |
|
|
color: #856404; |
|
|
} |
|
|
|
|
|
.disclaimer h3 { |
|
|
margin-top: 0; |
|
|
color: #856404; |
|
|
} |
|
|
|
|
|
footer { |
|
|
text-align: center; |
|
|
margin-top: 30px; |
|
|
color: #666; |
|
|
font-size: 0.9em; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: |
|
|
|
|
|
gr.HTML(""" |
|
|
<div id="header"> |
|
|
<h1>π©Ί ChatDoctor AI Assistant</h1> |
|
|
<p>Your AI-powered medical conversation partner</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div class="disclaimer"> |
|
|
<h3>β οΈ Medical Disclaimer</h3> |
|
|
<p><strong>Important:</strong> This AI assistant is for informational and educational purposes only. |
|
|
It is NOT a substitute for professional medical advice, diagnosis, or treatment. |
|
|
Always seek the advice of your physician or other qualified health provider with any questions |
|
|
you may have regarding a medical condition. Never disregard professional medical advice or |
|
|
delay in seeking it because of something you have read here.</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
chatbot = gr.Chatbot( |
|
|
height=500, |
|
|
placeholder="<div style='text-align: center; padding: 40px;'><h3>π Welcome to ChatDoctor!</h3><p>I'm here to discuss your health concerns. Please describe your symptoms or health question.</p></div>", |
|
|
show_label=False, |
|
|
avatar_images=(None, "π€"), |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
msg = gr.Textbox( |
|
|
placeholder="Describe your symptoms or health concern here...", |
|
|
show_label=False, |
|
|
scale=9, |
|
|
container=False |
|
|
) |
|
|
submit_btn = gr.Button("Send π€", scale=1, variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
clear_btn = gr.Button("ποΈ Clear Chat", scale=1) |
|
|
retry_btn = gr.Button("π Retry", scale=1) |
|
|
|
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
"I have a persistent headache for 3 days. What should I do?", |
|
|
"What are the symptoms of diabetes?", |
|
|
"How can I improve my sleep quality?", |
|
|
"I have a fever and sore throat. Should I be concerned?", |
|
|
"What are some natural ways to reduce stress?", |
|
|
], |
|
|
inputs=msg, |
|
|
label="π‘ Example Questions" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Accordion("βοΈ Advanced Settings", open=False): |
|
|
temperature_slider = gr.Slider( |
|
|
minimum=0.1, |
|
|
maximum=1.0, |
|
|
value=TEMPERATURE, |
|
|
step=0.1, |
|
|
label="Temperature (Creativity)", |
|
|
info="Higher values make responses more creative but less focused" |
|
|
) |
|
|
max_tokens_slider = gr.Slider( |
|
|
minimum=50, |
|
|
maximum=500, |
|
|
value=MAX_NEW_TOKENS, |
|
|
step=50, |
|
|
label="Max Response Length", |
|
|
info="Maximum number of tokens in response" |
|
|
) |
|
|
top_k_slider = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=100, |
|
|
value=TOP_K, |
|
|
step=1, |
|
|
label="Top K", |
|
|
info="Limits vocabulary selection" |
|
|
) |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<footer> |
|
|
<p>Powered by ChatDoctor Model | Built with Gradio</p> |
|
|
<p>Device: """ + device.upper() + """ | Model: LLaMA-based Medical AI</p> |
|
|
</footer> |
|
|
""") |
|
|
|
|
|
|
|
|
def user_message(user_msg, history): |
|
|
return "", history + [[user_msg, None]] |
|
|
|
|
|
def bot_response(history, temp, max_tok, top_k_val): |
|
|
global TEMPERATURE, MAX_NEW_TOKENS, TOP_K |
|
|
TEMPERATURE = temp |
|
|
MAX_NEW_TOKENS = int(max_tok) |
|
|
TOP_K = int(top_k_val) |
|
|
|
|
|
user_msg = history[-1][0] |
|
|
bot_msg = chat_function(user_msg, history[:-1]) |
|
|
history[-1][1] = bot_msg |
|
|
return history |
|
|
|
|
|
|
|
|
msg.submit(user_message, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
|
bot_response, [chatbot, temperature_slider, max_tokens_slider, top_k_slider], chatbot |
|
|
) |
|
|
|
|
|
submit_btn.click(user_message, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
|
bot_response, [chatbot, temperature_slider, max_tokens_slider, top_k_slider], chatbot |
|
|
) |
|
|
|
|
|
clear_btn.click(lambda: None, None, chatbot, queue=False) |
|
|
|
|
|
def retry_last(): |
|
|
return None |
|
|
|
|
|
retry_btn.click(retry_last, None, chatbot, queue=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\nπ Launching ChatDoctor Gradio Interface...") |
|
|
demo.queue() |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
show_error=True |
|
|
) |