| |
|
| | |
| | import os |
| | import chromadb |
| | from dotenv import load_dotenv |
| | import json |
| | import numpy as np |
| | from groq import Groq |
| | from mem0 import MemoryClient |
| | import streamlit as st |
| | from datetime import datetime |
| | from typing import Dict, List, Tuple, Any, TypedDict |
| |
|
| | |
| | from langchain_core.documents import Document |
| | from langchain_core.runnables import RunnablePassthrough |
| | from langchain_core.output_parsers import StrOutputParser |
| | from langchain.prompts import ChatPromptTemplate |
| | from langchain.chains.query_constructor.base import AttributeInfo |
| | from langchain.retrievers.self_query.base import SelfQueryRetriever |
| | from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
| | from langchain.retrievers import ContextualCompressionRetriever |
| | from langchain_community.vectorstores import Chroma |
| | from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
| | from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
| | from langchain_experimental.text_splitter import SemanticChunker |
| | from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter |
| | from langchain_core.tools import tool |
| | from langchain.agents import create_tool_calling_agent, AgentExecutor |
| | from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
| | from llama_index.core import Settings |
| | from langgraph.graph import StateGraph, END, START |
| | from pydantic import BaseModel |
| |
|
| | |
| | |
| | api_key = os.environ.get("API_KEY") |
| | endpoint = os.environ.get("OPENAI_API_BASE") |
| | llama_api_key = os.environ.get('GROQ_API_KEY') |
| | MEM0_api_key = os.environ.get('mem0') |
| |
|
| | |
| | embedding_model = OpenAIEmbeddings( |
| | openai_api_base=endpoint, |
| | openai_api_key=api_key, |
| | model='text-embedding-ada-002' |
| | ) |
| |
|
| | |
| | llm = ChatOpenAI( |
| | base_url=endpoint, |
| | openai_api_key=api_key, |
| | model="gpt-4o-mini", |
| | streaming=False |
| | ) |
| |
|
| | |
| | Settings.llm = llm |
| | Settings.embedding = embedding_model |
| |
|
| | |
| |
|
| | class AgentState(TypedDict): |
| | query: str |
| | expanded_query: str |
| | context: List[Dict[str, Any]] |
| | response: str |
| | precision_score: float |
| | groundedness_score: float |
| | groundedness_loop_count: int |
| | precision_loop_count: int |
| | feedback: str |
| | query_feedback: str |
| | groundedness_check: bool |
| | loop_max_iter: int |
| |
|
| | def expand_query(state): |
| | print("---------Expanding Query---------") |
| | system_message = '''You are a query expansion expert for nutrition and health topics. |
| | Expand the given query to improve information retrieval by adding relevant terms, synonyms, and related concepts. |
| | Focus on nutrition disorders, dietary conditions, and health topics. Return only the expanded query.''' |
| | expand_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Expand this query: {query} using the feedback: {query_feedback}") |
| | ]) |
| | chain = expand_prompt | llm | StrOutputParser() |
| | expanded_query = chain.invoke({"query": state['query'], "query_feedback": state.get("query_feedback", "Improve retrieval effectiveness")}) |
| | print("expanded_query", expanded_query) |
| | state["expanded_query"] = expanded_query |
| | return state |
| |
|
| | |
| | vector_store = Chroma( |
| | collection_name="nutritional_hypotheticals", |
| | persist_directory="./nutritional_db", |
| | embedding_function=embedding_model |
| | ) |
| | retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': 3}) |
| |
|
| | def retrieve_context(state): |
| | print("---------retrieve_context---------") |
| | query = state['expanded_query'] |
| | docs = retriever.invoke(query) |
| | context = [{"content": doc.page_content, "metadata": doc.metadata} for doc in docs] |
| | state['context'] = context |
| | return state |
| |
|
| | def craft_response(state: Dict) -> Dict: |
| | print("---------craft_response---------") |
| | system_message = '''You are an expert nutrition and health advisor. Provide accurate, evidence-based responses about nutrition disorders and dietary conditions. |
| | |
| | Guidelines: |
| | - Use only information from the provided context |
| | - Give clear, actionable advice when appropriate |
| | - Maintain a professional yet accessible tone |
| | - If context is insufficient, acknowledge limitations |
| | - Recommend professional consultation when appropriate |
| | |
| | Generate a comprehensive response based strictly on the provided context.''' |
| | response_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") |
| | ]) |
| | chain = response_prompt | llm | StrOutputParser() |
| | response = chain.invoke({ |
| | "query": state['query'], |
| | "context": "\n".join([doc["content"] for doc in state['context']]), |
| | "feedback": state.get('feedback', "Provide a helpful and accurate response") |
| | }) |
| | state['response'] = response |
| | return state |
| |
|
| | def score_groundedness(state: Dict) -> Dict: |
| | print("---------check_groundedness---------") |
| | system_message = '''You are an expert evaluator. Rate how well the response is grounded in the provided context. |
| | |
| | Scale: |
| | - 1.0 = Fully grounded (all information comes from context) |
| | - 0.8 = Mostly grounded (minor reasonable inferences) |
| | - 0.6 = Partially grounded (some claims supported) |
| | - 0.4 = Weakly grounded (few claims supported) |
| | - 0.2 = Poorly grounded (mostly unsupported) |
| | - 0.0 = Not grounded (contradicts or ignores context) |
| | |
| | Return ONLY a decimal number between 0.0 and 1.0.''' |
| | groundedness_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
| | ]) |
| | chain = groundedness_prompt | llm | StrOutputParser() |
| | try: |
| | score_str = chain.invoke({ |
| | "context": "\n".join([doc["content"] for doc in state['context']]), |
| | "response": state['response'] |
| | }) |
| | import re |
| | match = re.search(r"\d+(\.\d+)?", score_str) |
| | groundedness_score = float(match.group(0)) if match else 0.0 |
| | except: |
| | groundedness_score = 0.0 |
| | state['groundedness_loop_count'] += 1 |
| | state['groundedness_score'] = groundedness_score |
| | return state |
| |
|
| | def check_precision(state: Dict) -> Dict: |
| | print("---------check_precision---------") |
| | system_message = '''You are an expert evaluator. Rate how precisely the response addresses the user's query on a scale of 0.0 to 1.0. |
| | |
| | Consider: |
| | - Does the response directly answer what was asked? |
| | - Are all parts of the query addressed? |
| | - Is there unnecessary or irrelevant information? |
| | - Is the response focused and on-topic? |
| | |
| | Return ONLY a decimal number between 0.0 and 1.0.''' |
| | precision_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
| | ]) |
| | chain = precision_prompt | llm | StrOutputParser() |
| | try: |
| | score_str = chain.invoke({ |
| | "query": state['query'], |
| | "response": state['response'] |
| | }) |
| | import re |
| | match = re.search(r"\d+(\.\d+)?", score_str) |
| | precision_score = float(match.group(0)) if match else 0.0 |
| | except: |
| | precision_score = 0.0 |
| | state['precision_score'] = precision_score |
| | state['precision_loop_count'] += 1 |
| | return state |
| |
|
| | def refine_response(state: Dict) -> Dict: |
| | print("---------refine_response---------") |
| | system_message = '''You are an expert reviewer. Analyze the response and suggest specific improvements for better accuracy, completeness, and clarity. Focus on actionable recommendations.''' |
| | refine_response_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nResponse: {response}\n\nWhat improvements can be made?") |
| | ]) |
| | chain = refine_response_prompt | llm | StrOutputParser() |
| | feedback = chain.invoke({'query': state['query'], 'response': state['response']}) |
| | state['feedback'] = feedback |
| | return state |
| |
|
| | def refine_query(state: Dict) -> Dict: |
| | print("---------refine_query---------") |
| | system_message = '''You are a query optimization expert. Analyze the expanded query and suggest specific improvements to enhance information retrieval effectiveness. Focus on terminology, specificity, and comprehensiveness.''' |
| | refine_query_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\nWhat improvements can be made?") |
| | ]) |
| | chain = refine_query_prompt | llm | StrOutputParser() |
| | query_feedback = chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']}) |
| | state['query_feedback'] = query_feedback |
| | return state |
| |
|
| | def should_continue_groundedness(state): |
| | if state['groundedness_score'] >= 0.8: |
| | return "check_precision" |
| | elif state["groundedness_loop_count"] > state['loop_max_iter']: |
| | return "max_iterations_reached" |
| | else: |
| | return "refine_response" |
| |
|
| | def should_continue_precision(state: Dict) -> str: |
| | if state['precision_score'] >= 0.8: |
| | return "pass" |
| | elif state["precision_loop_count"] > state['loop_max_iter']: |
| | return "max_iterations_reached" |
| | else: |
| | return "refine_query" |
| |
|
| | def max_iterations_reached(state: Dict) -> Dict: |
| | state['response'] = "I'm unable to refine the response further. Please provide more context or clarify your question." |
| | return state |
| |
|
| | def create_workflow() -> StateGraph: |
| | workflow = StateGraph(AgentState) |
| | workflow.add_node("expand_query", expand_query) |
| | workflow.add_node("retrieve_context", retrieve_context) |
| | workflow.add_node("craft_response", craft_response) |
| | workflow.add_node("score_groundedness", score_groundedness) |
| | workflow.add_node("refine_response", refine_response) |
| | workflow.add_node("check_precision", check_precision) |
| | workflow.add_node("refine_query", refine_query) |
| | workflow.add_node("max_iterations_reached", max_iterations_reached) |
| |
|
| | workflow.add_edge(START, "expand_query") |
| | workflow.add_edge("expand_query", "retrieve_context") |
| | workflow.add_edge("retrieve_context", "craft_response") |
| | workflow.add_edge("craft_response", "score_groundedness") |
| |
|
| | workflow.add_conditional_edges( |
| | "score_groundedness", |
| | should_continue_groundedness, |
| | { |
| | "check_precision": "check_precision", |
| | "refine_response": "refine_response", |
| | "max_iterations_reached": "max_iterations_reached" |
| | } |
| | ) |
| | workflow.add_edge("refine_response", "score_groundedness") |
| |
|
| | workflow.add_conditional_edges( |
| | "check_precision", |
| | should_continue_precision, |
| | { |
| | "pass": END, |
| | "refine_query": "refine_query", |
| | "max_iterations_reached": "max_iterations_reached" |
| | } |
| | ) |
| | workflow.add_edge("refine_query", "expand_query") |
| | workflow.add_edge("max_iterations_reached", END) |
| | return workflow |
| |
|
| | WORKFLOW_APP = create_workflow().compile() |
| |
|
| | @tool |
| | def agentic_rag(query: str): |
| | """Runs the RAG-based agent.""" |
| | inputs = { |
| | "query": query, |
| | "expanded_query": "", |
| | "context": [], |
| | "response": "", |
| | "precision_score": 0.0, |
| | "groundedness_score": 0.0, |
| | "groundedness_loop_count": 0, |
| | "precision_loop_count": 0, |
| | "feedback": "", |
| | "query_feedback": "", |
| | "loop_max_iter": 3 |
| | } |
| | output = WORKFLOW_APP.invoke(inputs) |
| | return output['response'] |
| |
|
| | |
| | llama_guard_client = Groq(api_key=llama_api_key) |
| | def filter_input_with_llama_guard(user_input, model="meta-llama/llama-guard-4-12b"): |
| | try: |
| | response = llama_guard_client.chat.completions.create( |
| | messages=[{"role": "user", "content": user_input}], |
| | model=model, |
| | ) |
| | return response.choices[0].message.content.strip() |
| | except Exception as e: |
| | print(f"Error with Llama Guard: {e}") |
| | return "safe" |
| |
|
| | |
| | class NutritionBot: |
| | def __init__(self): |
| | self.memory = MemoryClient(api_key=MEM0_api_key) |
| | self.client = ChatOpenAI( |
| | model_name="gpt-4o-mini", |
| | api_key=api_key, |
| | base_url=endpoint, |
| | temperature=0 |
| | ) |
| | tools = [agentic_rag] |
| | system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
| | Guidelines for Interaction: |
| | Maintain a polite, professional, and reassuring tone. |
| | Show genuine empathy for customer concerns and health challenges. |
| | Reference past interactions to provide personalized and consistent advice. |
| | Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
| | Ensure consistent and accurate information across conversations. |
| | If any detail is unclear or missing, proactively ask for clarification. |
| | Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
| | Keep track of ongoing issues and follow-ups to ensure continuity in support. |
| | Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. |
| | |
| | """ |
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_prompt), |
| | ("human", "{input}"), |
| | ("placeholder", "{agent_scratchpad}") |
| | ]) |
| | agent = create_tool_calling_agent(self.client, tools, prompt) |
| | self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) |
| |
|
| | def store_customer_interaction(self, user_id, message, response, metadata=None): |
| | if metadata is None: metadata = {} |
| | metadata["timestamp"] = datetime.now().isoformat() |
| | conversation = [{"role": "user", "content": message}, {"role": "assistant", "content": response}] |
| | self.memory.add(conversation, user_id=user_id, output_format="v1.1", metadata=metadata) |
| |
|
| | def get_relevant_history(self, user_id, query): |
| | return self.memory.search(query=query, user_id=user_id, limit=3) |
| |
|
| | def handle_customer_query(self, user_id, query): |
| | relevant_history = self.get_relevant_history(user_id, query) |
| | context = "Previous interactions:\n" |
| | for memory in relevant_history: |
| | context += f"Memory: {memory['memory']}\n---\n" |
| | |
| | prompt = f"Context:\n{context}\nQuery: {query}" |
| | response = self.agent_executor.invoke({"input": prompt}) |
| | self.store_customer_interaction(user_id, query, response["output"], metadata={"type": "query"}) |
| | return response['output'] |
| |
|
| | |
| | def nutrition_disorder_streamlit(): |
| | st.title("Nutrition Disorder Specialist") |
| | st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
| | st.write("Type 'exit' to end the conversation.") |
| | |
| | if 'chat_history' not in st.session_state: |
| | st.session_state.chat_history = [] |
| | if 'user_id' not in st.session_state: |
| | st.session_state.user_id = None |
| |
|
| | if st.session_state.user_id is None: |
| | with st.form("login_form", clear_on_submit=True): |
| | user_id = st.text_input("Enter your name to begin:") |
| | submit_button = st.form_submit_button("Login") |
| | if submit_button and user_id: |
| | st.session_state.user_id = user_id |
| | st.session_state.chat_history.append({"role": "assistant", "content": f"Welcome {user_id}! How can I help you?"}) |
| | st.session_state.login_submitted = True |
| | if st.session_state.get("login_submitted", False): |
| | st.session_state.pop("login_submitted") |
| | st.rerun() |
| | else: |
| | for message in st.session_state.chat_history: |
| | with st.chat_message(message["role"]): |
| | st.write(message["content"]) |
| |
|
| | user_query = st.chat_input("Type your question here (or 'exit' to end)...") |
| | if user_query: |
| | if user_query.lower() == "exit": |
| | st.session_state.chat_history.append({"role": "user", "content": "exit"}) |
| | with st.chat_message("user"): |
| | st.write("exit") |
| | goodbye_msg = "Goodbye! Feel free to return if you have more questions." |
| | st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) |
| | with st.chat_message("assistant"): |
| | st.write(goodbye_msg) |
| | st.session_state.user_id = None |
| | st.rerun() |
| | return |
| |
|
| | st.session_state.chat_history.append({"role": "user", "content": user_query}) |
| | with st.chat_message("user"): |
| | st.write(user_query) |
| |
|
| | filtered_result = filter_input_with_llama_guard(user_query).replace("\n", " ") |
| | if filtered_result in ["safe", "unsafe S6", "unsafe S7"]: |
| | try: |
| | if 'chatbot' not in st.session_state: |
| | st.session_state.chatbot = NutritionBot() |
| | response = st.session_state.chatbot.handle_customer_query(st.session_state.user_id, user_query) |
| | st.write(response) |
| | st.session_state.chat_history.append({"role": "assistant", "content": response}) |
| | except Exception as e: |
| | error_msg = f"Error: {str(e)}" |
| | st.write(error_msg) |
| | st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
| | else: |
| | msg = "I apologize, but I cannot process that input as it may be inappropriate." |
| | st.write(msg) |
| | st.session_state.chat_history.append({"role": "assistant", "content": msg}) |
| |
|
| | if __name__ == "__main__": |
| | nutrition_disorder_streamlit() |
| |
|