USMLEMedPrepAI / app.py
Nahiyan14's picture
Update app.py
ab12b63 verified
import os
import streamlit as st
import json
from datetime import datetime, timedelta
from src.helper import download_hugging_face_embeddings
from langchain_community.vectorstores import Pinecone
from langchain_openai import OpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from dotenv import load_dotenv
from src.prompt import system_prompt
# Set up cache directories
os.environ['TRANSFORMERS_CACHE'] = '/tmp/model_cache'
os.environ['HF_HOME'] = '/tmp/model_cache'
os.makedirs('/tmp/model_cache', exist_ok=True)
# Load environment variables
load_dotenv()
# Rate limiting configuration
RATE_LIMIT_FILE = "/tmp/rate_limits.json"
MAX_REQUESTS_PER_DAY = 5
# Initialize rate limiting storage
def init_rate_limiting():
if not os.path.exists(RATE_LIMIT_FILE):
with open(RATE_LIMIT_FILE, 'w') as f:
json.dump({}, f)
# Check if a user has exceeded their daily limit
def check_rate_limit(user_id):
today = datetime.now().strftime('%Y-%m-%d')
try:
with open(RATE_LIMIT_FILE, 'r') as f:
rate_limits = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
rate_limits = {}
# Clean up old entries
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
users_to_remove = []
for uid in rate_limits:
if yesterday in rate_limits[uid]:
del rate_limits[uid][yesterday]
if not rate_limits[uid]: # If user has no other days, remove them
users_to_remove.append(uid)
for uid in users_to_remove:
del rate_limits[uid]
# Check and update current user's limit
if user_id not in rate_limits:
rate_limits[user_id] = {}
if today not in rate_limits[user_id]:
rate_limits[user_id][today] = 0
# Check if limit exceeded
if rate_limits[user_id][today] >= MAX_REQUESTS_PER_DAY:
return False, rate_limits[user_id][today]
# Increment count and save
rate_limits[user_id][today] += 1
with open(RATE_LIMIT_FILE, 'w') as f:
json.dump(rate_limits, f)
return True, rate_limits[user_id][today]
def get_user_id():
# For Streamlit, we'll use session_id as user identifier
if not hasattr(st.session_state, 'user_id'):
st.session_state.user_id = str(hash(datetime.now().strftime("%Y%m%d%H%M%S")))
return st.session_state.user_id
def get_remaining_queries(user_id):
today = datetime.now().strftime('%Y-%m-%d')
try:
with open(RATE_LIMIT_FILE, 'r') as f:
rate_limits = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return MAX_REQUESTS_PER_DAY
count = rate_limits.get(user_id, {}).get(today, 0)
return MAX_REQUESTS_PER_DAY - count
# Set up page configuration
st.set_page_config(
page_title="Medical Assistant RAG Chatbot",
page_icon="🩺",
layout="centered"
)
# Initialize session state for chat history
if 'messages' not in st.session_state:
st.session_state.messages = []
# Initialize rate limiting
init_rate_limiting()
# Display remaining queries
user_id = get_user_id()
remaining_queries = get_remaining_queries(user_id)
st.sidebar.write(f"Remaining queries today: {remaining_queries}/{MAX_REQUESTS_PER_DAY}")
# Check for API keys
PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY')
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
if not PINECONE_API_KEY or not OPENAI_API_KEY:
st.error("Missing API keys. Please set PINECONE_API_KEY and OPENAI_API_KEY environment variables.")
st.stop()
os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
# Cache the RAG chain initialization
@st.cache_resource
def initialize_rag_chain():
try:
st.sidebar.write("Loading embeddings model...")
embeddings = download_hugging_face_embeddings()
st.sidebar.write("Connecting to Pinecone...")
index_name = "medprep"
docsearch = Pinecone.from_existing_index(
index_name=index_name,
embedding=embeddings
)
retriever = docsearch.as_retriever(search_type="similarity", search_kwargs={"k": 3})
st.sidebar.write("Initializing OpenAI...")
llm = OpenAI(temperature=0.4, max_tokens=500)
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
st.sidebar.success("✅ System initialized successfully!")
return rag_chain
except Exception as e:
st.sidebar.error(f"Error initializing system: {str(e)}")
import traceback
st.sidebar.text(traceback.format_exc())
return None
# Main app title
st.title("Medical Assistant Chatbot")
st.write("Ask me any medical question, and I'll try to help!")
# Initialize the RAG chain
rag_chain = initialize_rag_chain()
if rag_chain is None:
st.error("Failed to initialize the system. Please check the sidebar for error details.")
st.stop()
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Get user input
if prompt := st.chat_input("Ask a question..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message
with st.chat_message("user"):
st.markdown(prompt)
# Check rate limit
user_id = get_user_id()
allowed, count = check_rate_limit(user_id)
if not allowed:
response = f"⚠️ Daily limit reached. You've used {count} queries today. Please try again tomorrow."
else:
# Process the query with the RAG chain
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
try:
result = rag_chain.invoke({"input": prompt})
response = result.get("answer", "Sorry, I couldn't find an answer to that.")
remaining = MAX_REQUESTS_PER_DAY - count
response += f"\n\n\n_You have {remaining} queries remaining today._"
except Exception as e:
response = f"Error processing your request: {str(e)}"
st.markdown(response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": response})
# Footer
st.markdown("---")
st.markdown("*This is a RAG-based medical assistant chatbot. It retrieves information from a medical knowledge base to answer your questions.*")