|
|
import os
|
|
|
import gc
|
|
|
import torch
|
|
|
import gradio as gr
|
|
|
from transformers import LlamaTokenizer, LlamaForCausalLM, StoppingCriteria, StoppingCriteriaList
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_PATH = r"C:\Users\JAY\Downloads\Chatdoc\ChatDoctor\pretrained"
|
|
|
MAX_NEW_TOKENS = 200
|
|
|
TEMPERATURE = 0.5
|
|
|
TOP_K = 50
|
|
|
REPETITION_PENALTY = 1.1
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
print(f"Loading model from {MODEL_PATH} on {device}...")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = LlamaTokenizer.from_pretrained(MODEL_PATH)
|
|
|
model = LlamaForCausalLM.from_pretrained(
|
|
|
MODEL_PATH,
|
|
|
device_map="auto",
|
|
|
torch_dtype=torch.float16,
|
|
|
low_cpu_mem_usage=True
|
|
|
)
|
|
|
|
|
|
generator = model.generate
|
|
|
print("✅ ChatDoctor model loaded successfully!\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StopOnTokens(StoppingCriteria):
|
|
|
def __init__(self, stop_ids):
|
|
|
self.stop_ids = stop_ids
|
|
|
|
|
|
def __call__(self, input_ids, scores, **kwargs):
|
|
|
for stop_id_seq in self.stop_ids:
|
|
|
if len(stop_id_seq) == 1:
|
|
|
if input_ids[0][-1] == stop_id_seq[0]:
|
|
|
return True
|
|
|
else:
|
|
|
if len(input_ids[0]) >= len(stop_id_seq):
|
|
|
if input_ids[0][-len(stop_id_seq):].tolist() == stop_id_seq:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_response(user_input, history_context):
|
|
|
"""Generate response from ChatDoctor model"""
|
|
|
human_invitation = "Patient: "
|
|
|
doctor_invitation = "ChatDoctor: "
|
|
|
|
|
|
|
|
|
history_text = []
|
|
|
for human, assistant in history_context:
|
|
|
if human:
|
|
|
history_text.append(human_invitation + human)
|
|
|
if assistant:
|
|
|
history_text.append(doctor_invitation + assistant)
|
|
|
|
|
|
|
|
|
history_text.append(human_invitation + user_input)
|
|
|
|
|
|
|
|
|
prompt = "\n".join(history_text) + "\n" + doctor_invitation
|
|
|
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
|
|
|
|
|
|
|
|
|
stop_words = ["Patient:", "\nPatient:", "Patient :", "\n\nPatient"]
|
|
|
stop_ids = [tokenizer.encode(word, add_special_tokens=False) for word in stop_words]
|
|
|
stopping_criteria = StoppingCriteriaList([StopOnTokens(stop_ids)])
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
output_ids = generator(
|
|
|
input_ids,
|
|
|
max_new_tokens=MAX_NEW_TOKENS,
|
|
|
do_sample=True,
|
|
|
temperature=TEMPERATURE,
|
|
|
top_k=TOP_K,
|
|
|
repetition_penalty=REPETITION_PENALTY,
|
|
|
stopping_criteria=stopping_criteria,
|
|
|
pad_token_id=tokenizer.eos_token_id,
|
|
|
eos_token_id=tokenizer.eos_token_id
|
|
|
)
|
|
|
|
|
|
|
|
|
full_output = tokenizer.decode(output_ids[0], skip_special_tokens=True)
|
|
|
response = full_output[len(prompt):].strip()
|
|
|
|
|
|
|
|
|
for stop_word in ["Patient:", "Patient :", "\nPatient:", "\nPatient", "Patient"]:
|
|
|
if stop_word in response:
|
|
|
response = response.split(stop_word)[0].strip()
|
|
|
break
|
|
|
|
|
|
response = response.strip()
|
|
|
|
|
|
|
|
|
del input_ids, output_ids
|
|
|
gc.collect()
|
|
|
torch.cuda.empty_cache()
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chat_function(message, history):
|
|
|
"""Gradio chat interface function"""
|
|
|
if not message.strip():
|
|
|
return ""
|
|
|
|
|
|
try:
|
|
|
response = get_response(message, history)
|
|
|
return response
|
|
|
except Exception as e:
|
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def text_to_speech(text):
|
|
|
"""Convert text response to speech"""
|
|
|
try:
|
|
|
from gtts import gTTS
|
|
|
import tempfile
|
|
|
|
|
|
if not text or text.startswith("Error:"):
|
|
|
return None
|
|
|
|
|
|
|
|
|
tts = gTTS(text=text, lang='en', slow=False)
|
|
|
|
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
|
|
|
tts.save(temp_file.name)
|
|
|
|
|
|
return temp_file.name
|
|
|
except Exception as e:
|
|
|
print(f"TTS Error: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_css = """
|
|
|
#header {
|
|
|
text-align: center;
|
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
|
|
color: white;
|
|
|
padding: 20px;
|
|
|
border-radius: 10px;
|
|
|
margin-bottom: 20px;
|
|
|
}
|
|
|
|
|
|
#header h1 {
|
|
|
margin: 0;
|
|
|
font-size: 2.5em;
|
|
|
}
|
|
|
|
|
|
#header p {
|
|
|
margin: 10px 0 0 0;
|
|
|
font-size: 1.1em;
|
|
|
opacity: 0.9;
|
|
|
}
|
|
|
|
|
|
.disclaimer {
|
|
|
background-color: #fff3cd;
|
|
|
border: 1px solid #ffc107;
|
|
|
border-radius: 8px;
|
|
|
padding: 15px;
|
|
|
margin: 20px 0;
|
|
|
color: #856404;
|
|
|
}
|
|
|
|
|
|
.disclaimer h3 {
|
|
|
margin-top: 0;
|
|
|
color: #856404;
|
|
|
}
|
|
|
|
|
|
.voice-section {
|
|
|
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
|
|
|
padding: 20px;
|
|
|
border-radius: 10px;
|
|
|
margin: 20px 0;
|
|
|
}
|
|
|
|
|
|
footer {
|
|
|
text-align: center;
|
|
|
margin-top: 30px;
|
|
|
color: #666;
|
|
|
font-size: 0.9em;
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo:
|
|
|
|
|
|
gr.HTML("""
|
|
|
<div id="header">
|
|
|
<h1>🩺 ChatDoctor AI Assistant</h1>
|
|
|
<p>Your AI-powered medical conversation partner with Voice Support</p>
|
|
|
</div>
|
|
|
""")
|
|
|
|
|
|
|
|
|
gr.HTML("""
|
|
|
<div class="disclaimer">
|
|
|
<h3>⚠️ Medical Disclaimer</h3>
|
|
|
<p><strong>Important:</strong> This AI assistant is for informational and educational purposes only.
|
|
|
It is NOT a substitute for professional medical advice, diagnosis, or treatment.
|
|
|
Always seek the advice of your physician or other qualified health provider with any questions
|
|
|
you may have regarding a medical condition. Never disregard professional medical advice or
|
|
|
delay in seeking it because of something you have read here.</p>
|
|
|
</div>
|
|
|
""")
|
|
|
|
|
|
with gr.Row():
|
|
|
with gr.Column(scale=7):
|
|
|
|
|
|
chatbot = gr.Chatbot(
|
|
|
height=500,
|
|
|
placeholder="<div style='text-align: center; padding: 40px;'><h3>👋 Welcome to ChatDoctor!</h3><p>I'm here to discuss your health concerns. Type or speak your question!</p></div>",
|
|
|
show_label=False,
|
|
|
avatar_images=(None, "🤖"),
|
|
|
)
|
|
|
|
|
|
with gr.Row():
|
|
|
msg = gr.Textbox(
|
|
|
placeholder="Type your message here... (e.g., 'I have a headache')",
|
|
|
show_label=False,
|
|
|
scale=9,
|
|
|
container=False
|
|
|
)
|
|
|
submit_btn = gr.Button("Send 📤", scale=1, variant="primary")
|
|
|
|
|
|
with gr.Row():
|
|
|
clear_btn = gr.Button("🗑️ Clear Chat", scale=1)
|
|
|
retry_btn = gr.Button("🔄 Retry", scale=1)
|
|
|
|
|
|
with gr.Column(scale=3):
|
|
|
|
|
|
gr.HTML("<div class='voice-section'><h3 style='color: white; text-align: center; margin-top: 0;'>🎤 Voice Features</h3></div>")
|
|
|
|
|
|
audio_input = gr.Audio(
|
|
|
sources=["microphone"],
|
|
|
type="filepath",
|
|
|
label="🎙️ Speak Your Question",
|
|
|
show_download_button=False
|
|
|
)
|
|
|
|
|
|
transcribed_text = gr.Textbox(
|
|
|
label="📝 Transcribed Text",
|
|
|
placeholder="Your speech will appear here...",
|
|
|
interactive=False,
|
|
|
lines=3
|
|
|
)
|
|
|
|
|
|
send_voice_btn = gr.Button("Send Voice Message 🔊", variant="primary")
|
|
|
|
|
|
gr.Markdown("---")
|
|
|
|
|
|
|
|
|
tts_enabled = gr.Checkbox(
|
|
|
label="🔊 Enable Text-to-Speech for responses",
|
|
|
value=True,
|
|
|
info="Hear the doctor's response"
|
|
|
)
|
|
|
|
|
|
audio_output = gr.Audio(
|
|
|
label="🔈 AI Response Audio",
|
|
|
autoplay=False,
|
|
|
visible=True
|
|
|
)
|
|
|
|
|
|
|
|
|
gr.Examples(
|
|
|
examples=[
|
|
|
"I have a persistent headache for 3 days. What should I do?",
|
|
|
"What are the symptoms of diabetes?",
|
|
|
"How can I improve my sleep quality?",
|
|
|
"I have a fever and sore throat. Should I be concerned?",
|
|
|
"What are some natural ways to reduce stress?",
|
|
|
],
|
|
|
inputs=msg,
|
|
|
label="💡 Example Questions"
|
|
|
)
|
|
|
|
|
|
|
|
|
with gr.Accordion("⚙️ Advanced Settings", open=False):
|
|
|
temperature_slider = gr.Slider(
|
|
|
minimum=0.1,
|
|
|
maximum=1.0,
|
|
|
value=TEMPERATURE,
|
|
|
step=0.1,
|
|
|
label="Temperature (Creativity)",
|
|
|
info="Higher values make responses more creative but less focused"
|
|
|
)
|
|
|
max_tokens_slider = gr.Slider(
|
|
|
minimum=50,
|
|
|
maximum=500,
|
|
|
value=MAX_NEW_TOKENS,
|
|
|
step=50,
|
|
|
label="Max Response Length",
|
|
|
info="Maximum number of tokens in response"
|
|
|
)
|
|
|
top_k_slider = gr.Slider(
|
|
|
minimum=1,
|
|
|
maximum=100,
|
|
|
value=TOP_K,
|
|
|
step=1,
|
|
|
label="Top K",
|
|
|
info="Limits vocabulary selection"
|
|
|
)
|
|
|
|
|
|
|
|
|
gr.HTML("""
|
|
|
<footer>
|
|
|
<p>Powered by ChatDoctor Model | Built with Gradio | Voice-Enabled 🎤</p>
|
|
|
<p>Device: """ + device.upper() + """ | Model: LLaMA-based Medical AI</p>
|
|
|
</footer>
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def user_message(user_msg, history):
|
|
|
return "", history + [[user_msg, None]], None
|
|
|
|
|
|
def bot_response(history, temp, max_tok, top_k_val, tts_enabled_val):
|
|
|
global TEMPERATURE, MAX_NEW_TOKENS, TOP_K
|
|
|
TEMPERATURE = temp
|
|
|
MAX_NEW_TOKENS = int(max_tok)
|
|
|
TOP_K = int(top_k_val)
|
|
|
|
|
|
user_msg = history[-1][0]
|
|
|
bot_msg = chat_function(user_msg, history[:-1])
|
|
|
history[-1][1] = bot_msg
|
|
|
|
|
|
|
|
|
audio_file = None
|
|
|
if tts_enabled_val and bot_msg and not bot_msg.startswith("Error:"):
|
|
|
audio_file = text_to_speech(bot_msg)
|
|
|
|
|
|
return history, audio_file
|
|
|
|
|
|
def transcribe_audio(audio_file):
|
|
|
"""Transcribe audio to text using Whisper"""
|
|
|
if audio_file is None:
|
|
|
return ""
|
|
|
|
|
|
try:
|
|
|
import whisper
|
|
|
model = whisper.load_model("base")
|
|
|
result = model.transcribe(audio_file)
|
|
|
return result["text"]
|
|
|
except ImportError:
|
|
|
return "Error: Please install whisper: pip install openai-whisper"
|
|
|
except Exception as e:
|
|
|
return f"Transcription error: {str(e)}"
|
|
|
|
|
|
def process_voice_input(audio_file, history, temp, max_tok, top_k_val, tts_enabled_val):
|
|
|
"""Process voice input: transcribe -> send -> get response"""
|
|
|
if audio_file is None:
|
|
|
return history, "", None, None
|
|
|
|
|
|
|
|
|
transcribed = transcribe_audio(audio_file)
|
|
|
|
|
|
if transcribed.startswith("Error:"):
|
|
|
return history, transcribed, None, None
|
|
|
|
|
|
|
|
|
history = history + [[transcribed, None]]
|
|
|
|
|
|
|
|
|
global TEMPERATURE, MAX_NEW_TOKENS, TOP_K
|
|
|
TEMPERATURE = temp
|
|
|
MAX_NEW_TOKENS = int(max_tok)
|
|
|
TOP_K = int(top_k_val)
|
|
|
|
|
|
bot_msg = chat_function(transcribed, history[:-1])
|
|
|
history[-1][1] = bot_msg
|
|
|
|
|
|
|
|
|
audio_file = None
|
|
|
if tts_enabled_val and bot_msg and not bot_msg.startswith("Error:"):
|
|
|
audio_file = text_to_speech(bot_msg)
|
|
|
|
|
|
return history, transcribed, None, audio_file
|
|
|
|
|
|
|
|
|
msg.submit(
|
|
|
user_message,
|
|
|
[msg, chatbot],
|
|
|
[msg, chatbot, audio_output],
|
|
|
queue=False
|
|
|
).then(
|
|
|
bot_response,
|
|
|
[chatbot, temperature_slider, max_tokens_slider, top_k_slider, tts_enabled],
|
|
|
[chatbot, audio_output]
|
|
|
)
|
|
|
|
|
|
submit_btn.click(
|
|
|
user_message,
|
|
|
[msg, chatbot],
|
|
|
[msg, chatbot, audio_output],
|
|
|
queue=False
|
|
|
).then(
|
|
|
bot_response,
|
|
|
[chatbot, temperature_slider, max_tokens_slider, top_k_slider, tts_enabled],
|
|
|
[chatbot, audio_output]
|
|
|
)
|
|
|
|
|
|
|
|
|
audio_input.change(
|
|
|
transcribe_audio,
|
|
|
[audio_input],
|
|
|
[transcribed_text]
|
|
|
)
|
|
|
|
|
|
send_voice_btn.click(
|
|
|
process_voice_input,
|
|
|
[audio_input, chatbot, temperature_slider, max_tokens_slider, top_k_slider, tts_enabled],
|
|
|
[chatbot, transcribed_text, audio_input, audio_output]
|
|
|
)
|
|
|
|
|
|
|
|
|
clear_btn.click(lambda: (None, None, None), None, [chatbot, audio_output, transcribed_text], queue=False)
|
|
|
|
|
|
retry_btn.click(lambda: None, None, chatbot, queue=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
print("\n🚀 Launching ChatDoctor Gradio Interface with Voice Support...")
|
|
|
print("\n📦 Required packages:")
|
|
|
print(" pip install gradio gTTS openai-whisper")
|
|
|
print("\nNote: Whisper will download models on first use (~100MB for base model)\n")
|
|
|
|
|
|
demo.queue()
|
|
|
demo.launch(
|
|
|
server_name="0.0.0.0",
|
|
|
server_port=7860,
|
|
|
share=False,
|
|
|
show_error=True
|
|
|
) |