|
|
import os |
|
|
import numpy as np |
|
|
import streamlit as st |
|
|
from datetime import datetime |
|
|
from typing import Any, Dict, List, Tuple, TypedDict |
|
|
|
|
|
|
|
|
from langchain_core.output_parsers import StrOutputParser |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
from langchain_core.runnables import RunnablePassthrough |
|
|
from langchain_core.tools import tool |
|
|
from langchain_core.documents import Document |
|
|
|
|
|
from langchain.agents import ( |
|
|
AgentExecutor, |
|
|
create_tool_calling_agent, |
|
|
create_openai_tools_agent, |
|
|
initialize_agent, |
|
|
AgentType |
|
|
) |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain.retrievers import ContextualCompressionRetriever |
|
|
from langchain.retrievers.self_query.base import SelfQueryRetriever |
|
|
from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
|
|
from langchain.chains.query_constructor.base import AttributeInfo |
|
|
|
|
|
|
|
|
from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
|
|
|
|
|
|
|
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
|
|
|
|
|
|
|
|
from langgraph.graph import END, StateGraph, START |
|
|
from pydantic import BaseModel, Field, validator |
|
|
import chromadb |
|
|
from tqdm import tqdm |
|
|
from llama_index.core.settings import Settings |
|
|
from groq import Groq |
|
|
from mem0 import MemoryClient |
|
|
|
|
|
|
|
|
np.float_ = np.float64 |
|
|
|
|
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
endpoint = os.getenv("OPENAI_API_BASE") |
|
|
memo_api_key = os.getenv('mem0') |
|
|
|
|
|
|
|
|
embedding_model = OpenAIEmbeddings( |
|
|
openai_api_base=endpoint, |
|
|
openai_api_key=api_key, |
|
|
model='text-embedding-ada-002' |
|
|
) |
|
|
|
|
|
|
|
|
llm = ChatOpenAI( |
|
|
openai_api_base=endpoint, |
|
|
openai_api_key=api_key, |
|
|
model="gpt-4o-mini", |
|
|
streaming=False) |
|
|
|
|
|
|
|
|
Settings.llm = llm |
|
|
Settings.embedding = embedding_model |
|
|
|
|
|
class AgentState(TypedDict): |
|
|
query: str |
|
|
expanded_query: str |
|
|
context: List[Dict[str, Any]] |
|
|
response: str |
|
|
precision_score: float |
|
|
groundedness_score: float |
|
|
groundedness_loop_count: int |
|
|
precision_loop_count: int |
|
|
feedback: str |
|
|
query_feedback: str |
|
|
groundedness_check: bool |
|
|
loop_max_iter: int |
|
|
|
|
|
def expand_query(state): |
|
|
print("---------Expanding Query---------") |
|
|
|
|
|
system_message = """ |
|
|
You are a domain expert assisting in answering questions related to medical reference documentation. |
|
|
Convert the user query into more specific and domain-related phrasing that a Nutrition Disorder Specialist would understand. |
|
|
Expand the query by considering the use of appropriate medical terminology, synonyms, and various common ways to phrase the query. |
|
|
|
|
|
Guidelines: |
|
|
If the query has multiple distinct parts, break them into separate, simpler queries. |
|
|
If there are common synonyms or alternative phrasing for key terms, provide multiple versions of the query. |
|
|
Do not generate more than three queries, except when the query involves multiple separate parts (in which case, you can generate more than three). |
|
|
Do not attempt to rephrase unfamiliar acronyms or terms. Leave them as is. |
|
|
Return only a list of up to three queries. Do not include anything before or after the list. |
|
|
""" |
|
|
|
|
|
expand_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Expand this query: {query} using the feedback: {query_feedback}") |
|
|
|
|
|
]) |
|
|
|
|
|
chain = expand_prompt | llm | StrOutputParser() |
|
|
expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) |
|
|
print("expanded_query", expanded_query) |
|
|
state["expanded_query"] = expanded_query |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
vector_store = Chroma( |
|
|
collection_name='Nutrition', |
|
|
persist_directory='./nutritional_db', |
|
|
embedding_function=embedding_model |
|
|
) |
|
|
|
|
|
|
|
|
retriever = vector_store.as_retriever( |
|
|
search_type='similarity', |
|
|
search_kwargs={'k': 5} |
|
|
) |
|
|
|
|
|
def retrieve_context(state): |
|
|
""" |
|
|
Retrieves context from the vector store using the expanded or original query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the retrieved context. |
|
|
""" |
|
|
print("---------Retrieve_Context---------") |
|
|
query = state['expanded_query'] |
|
|
print("Query used for retrieval:", query) |
|
|
|
|
|
|
|
|
docs = retriever.invoke(query) |
|
|
print("Retrieved documents:", docs) |
|
|
|
|
|
|
|
|
context= [ |
|
|
{ |
|
|
"content": doc.page_content, |
|
|
"metadata": doc.metadata |
|
|
} |
|
|
for doc in docs |
|
|
] |
|
|
state['context'] = context |
|
|
print("Extracted context with metadata:", context) |
|
|
print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
|
|
return state |
|
|
|
|
|
def craft_response(state: Dict) -> Dict: |
|
|
""" |
|
|
Generates a response using the retrieved context, focusing on nutrition disorders. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and retrieved context. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the generated response. |
|
|
""" |
|
|
print("---------Craft_Response---------") |
|
|
system_message = """ |
|
|
Ensure the information is grounded in the context, avoid speculation, and prioritize clarity. |
|
|
You are a knowledgeable and empathetic medical assistant specializing in nutritional disorders. |
|
|
Given the retrieved context, generate a precise, informative, and concise response that directly addresses the user's query. |
|
|
Ensure the information is fully grounded in the provided context, and avoid introducing speculative or unsupported content. |
|
|
Focus on clarity and accuracy, ensuring the user receives helpful and relevant advice regarding nutritional disorders. |
|
|
""" |
|
|
|
|
|
response_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") |
|
|
]) |
|
|
|
|
|
chain = response_prompt | llm |
|
|
response = chain.invoke({ |
|
|
"query": state['query'], |
|
|
"context": "\n".join([doc["content"] for doc in state['context']]), |
|
|
"feedback": state['feedback'] |
|
|
}) |
|
|
state['response'] = response |
|
|
print("intermediate response: ", response) |
|
|
|
|
|
return state |
|
|
|
|
|
def score_groundedness(state: Dict) -> Dict: |
|
|
""" |
|
|
Checks whether the response is grounded in the retrieved context. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the response and context. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the groundedness score. |
|
|
""" |
|
|
print("---------Check_Groundedness---------") |
|
|
system_message = """ |
|
|
You are an expert evaluator for Retrieval-Augmented Generation (RAG) systems. Your task is to assess the GROUNDEDNESS of a response. |
|
|
Given an answer and its retrieved context, determine whether the response is based on or supported by the provided context. |
|
|
Respond with: |
|
|
1.0 if the answer is grounded in the context |
|
|
0.0 if the answer is not grounded in the context |
|
|
The output must be a float: either 1.0 or 0.0. Do not include any explanation. |
|
|
""" |
|
|
|
|
|
groundedness_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
|
|
]) |
|
|
|
|
|
chain = groundedness_prompt | llm | StrOutputParser() |
|
|
groundedness_score = float(chain.invoke({ |
|
|
"context": "\n".join([doc["content"] for doc in state['context']]), |
|
|
"response": state['response'] |
|
|
})) |
|
|
print("groundedness_score: ", groundedness_score) |
|
|
state['groundedness_loop_count'] += 1 |
|
|
print("#########Groundedness Incremented###########") |
|
|
state['groundedness_score'] = groundedness_score |
|
|
|
|
|
return state |
|
|
|
|
|
def check_precision(state: Dict) -> Dict: |
|
|
""" |
|
|
Checks whether the response precisely addresses the user’s query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the precision score. |
|
|
""" |
|
|
print("---------Check_Precision---------") |
|
|
system_message = """ |
|
|
You are an expert evaluator for Retrieval-Augmented Generation (RAG) systems. Your task is to assess the PRECISION of a response. |
|
|
Given a query and a response, determine whether the response precisely addresses the user's query without including unrelated or irrelevant information. |
|
|
Respond with: |
|
|
1.0 if the response is precise |
|
|
0.0 if the response is not precise |
|
|
Your answer must be a float: either 1.0 or 0.0. Do not include any explanation. |
|
|
""" |
|
|
|
|
|
precision_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
|
|
]) |
|
|
|
|
|
chain = precision_prompt | llm | StrOutputParser() |
|
|
precision_score = float(chain.invoke({ |
|
|
"query": state['query'], |
|
|
"response":state['response'] |
|
|
})) |
|
|
state['precision_score'] = precision_score |
|
|
print("precision_score:", precision_score) |
|
|
state['precision_loop_count'] +=1 |
|
|
print("#########Precision Incremented###########") |
|
|
return state |
|
|
|
|
|
def refine_response(state: Dict) -> Dict: |
|
|
""" |
|
|
Suggests improvements for the generated response. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with response refinement suggestions. |
|
|
""" |
|
|
print("---------Refine_Response---------") |
|
|
|
|
|
system_message = """ |
|
|
You are an expert evaluator for medical responses. Given the generated response, your task is to identify areas for improvement. |
|
|
Provide feedback on any gaps, ambiguities, or missing details in the response. |
|
|
Ensure that the suggestions focus on: |
|
|
Factual grounding: Ensure the information is scientifically accurate and well-supported by evidence. |
|
|
Logic and coherence: Assess the clarity and flow of the response. Ensure the response makes sense and is easy to follow. |
|
|
Completeness: Identify any missing details or important context that should be included. |
|
|
Empathy and tone: If the response is intended for a medical or patient-facing audience, ensure the tone is supportive, clear, and empathetic. |
|
|
|
|
|
After reviewing the response, provide detailed suggestions for improvement in the following format: |
|
|
1. [Description of Issue] - [Suggested Improvement]. |
|
|
2. [Description of Issue] - [Suggested Improvement]. |
|
|
(Continue providing numbered suggestions as needed.) |
|
|
|
|
|
Make sure to provide constructive, actionable suggestions that can enhance the response. |
|
|
""" |
|
|
|
|
|
refine_response_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nResponse: {response}\n\n" |
|
|
"What improvements can be made to enhance accuracy and completeness?") |
|
|
]) |
|
|
|
|
|
chain = refine_response_prompt | llm| StrOutputParser() |
|
|
|
|
|
|
|
|
feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" |
|
|
print("feedback: ", feedback) |
|
|
print(f"State: {state}") |
|
|
state['feedback'] = feedback |
|
|
return state |
|
|
|
|
|
def refine_query(state: Dict) -> Dict: |
|
|
""" |
|
|
Suggests improvements for the expanded query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with query refinement suggestions. |
|
|
""" |
|
|
print("---------Refine_Query---------") |
|
|
system_message = """ |
|
|
You are an expert in query refinement, helping to improve search precision for medical topics. |
|
|
Given the original and expanded queries, your task is to suggest improvements that can enhance search effectiveness. |
|
|
|
|
|
Areas to focus on: |
|
|
1. **Missing Details**: Identify any essential details or context that could improve the specificity of the query. |
|
|
2. **Keywords**: Recommend more relevant or precise keywords that are specific to the topic (e.g., nutrition disorders, medical terminology). |
|
|
3. **Scope Refinement**: Suggest ways to narrow or broaden the query to improve the relevance of the results (e.g., focus on specific conditions, age groups, etc.). |
|
|
4. **Clarity**: Ensure the query is clear, concise, and free from ambiguity to help improve search engine interpretation. |
|
|
5. **Synonym Usage**: Suggest any common synonyms or alternate phrases for key terms that could improve search recall. |
|
|
|
|
|
After reviewing the original and expanded queries, provide your suggestions in a clear and actionable format, such as: |
|
|
1. [Description of Issue] - [Suggested Improvement]. |
|
|
2. [Description of Issue] - [Suggested Improvement]. |
|
|
(Continue providing numbered suggestions as needed.) |
|
|
|
|
|
Your goal is to make the query more precise, comprehensive, and search-friendly to enhance the retrieval of relevant information. |
|
|
""" |
|
|
|
|
|
refine_query_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" |
|
|
"What improvements can be made for a better search?") |
|
|
]) |
|
|
|
|
|
chain = refine_query_prompt | llm | StrOutputParser() |
|
|
|
|
|
|
|
|
query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" |
|
|
print("query_feedback: ", query_feedback) |
|
|
print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
|
|
state['query_feedback'] = query_feedback |
|
|
return state |
|
|
|
|
|
def should_continue_groundedness(state): |
|
|
"""Decides if groundedness is sufficient or needs improvement.""" |
|
|
print("---------Should_Continue_Groundedness---------") |
|
|
print("Groundedness Loop Count: ", state['groundedness_loop_count']) |
|
|
if state['groundedness_score'] > 0: |
|
|
print("Moving to Precision") |
|
|
return "check_precision" |
|
|
else: |
|
|
if state["groundedness_loop_count"] > state['loop_max_iter']: |
|
|
return "max_iterations_reached" |
|
|
else: |
|
|
print(f"---------Groundedness Score Threshold Not Met. Refining Response-----------") |
|
|
return "refine_response" |
|
|
|
|
|
def should_continue_precision(state: Dict) -> str: |
|
|
"""Decides if precision is sufficient or needs improvement.""" |
|
|
print("---------Should_Continue_Precision---------") |
|
|
print("precision loop count: ", state['precision_loop_count']) |
|
|
if state['precision_score'] > 0: |
|
|
return "pass" |
|
|
else: |
|
|
if state['precision_loop_count'] > state['loop_max_iter']: |
|
|
return "max_iterations_reached" |
|
|
else: |
|
|
print(f"---------Precision Score Threshold Not Met. Refining Query-----------") |
|
|
return "refine_query" |
|
|
|
|
|
def max_iterations_reached(state: Dict) -> Dict: |
|
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
|
print("---------Max_Iterations_Reached---------") |
|
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
|
response = "I'm unable to refine the response further. Please provide more context or clarify your question." |
|
|
state['response'] = response |
|
|
return state |
|
|
|
|
|
def create_workflow() -> StateGraph: |
|
|
"""Creates the updated workflow for the AI nutrition agent.""" |
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
workflow.add_node("expand_query", expand_query) |
|
|
workflow.add_node("retrieve_context", retrieve_context) |
|
|
workflow.add_node("craft_response", craft_response) |
|
|
workflow.add_node("score_groundedness", score_groundedness) |
|
|
workflow.add_node("refine_response", refine_response) |
|
|
workflow.add_node("check_precision", check_precision) |
|
|
workflow.add_node("refine_query", refine_query) |
|
|
workflow.add_node("max_iterations_reached", max_iterations_reached) |
|
|
|
|
|
|
|
|
workflow.add_edge(START, "expand_query") |
|
|
workflow.add_edge("expand_query", "retrieve_context") |
|
|
workflow.add_edge("retrieve_context", "craft_response") |
|
|
workflow.add_edge("craft_response", "score_groundedness") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"score_groundedness", |
|
|
should_continue_groundedness, |
|
|
{ |
|
|
"check_precision": 'check_precision', |
|
|
"refine_response": 'refine_response', |
|
|
"max_iterations_reached": 'max_iterations_reached' |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge('refine_response', 'craft_response') |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"check_precision", |
|
|
should_continue_precision, |
|
|
{ |
|
|
"pass": END, |
|
|
"refine_query": 'refine_query', |
|
|
"max_iterations_reached": 'max_iterations_reached' |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge('refine_query', 'expand_query') |
|
|
|
|
|
workflow.add_edge("max_iterations_reached", END) |
|
|
|
|
|
return workflow |
|
|
|
|
|
WORKFLOW_APP = create_workflow().compile() |
|
|
|
|
|
@tool |
|
|
def agentic_rag(query: str): |
|
|
""" |
|
|
Runs the RAG-based agent with conversation history for context-aware responses. |
|
|
|
|
|
Args: |
|
|
query (str): The current user query. |
|
|
|
|
|
Returns: |
|
|
Dict[str, Any]: The updated state with the generated response and conversation history. |
|
|
""" |
|
|
|
|
|
inputs = { |
|
|
"query": query, |
|
|
"expanded_query": "", |
|
|
"context": [], |
|
|
"response": "", |
|
|
"precision_score": 0, |
|
|
"groundedness_score": 0, |
|
|
"groundedness_loop_count": 0, |
|
|
"precision_loop_count": 0, |
|
|
"feedback": "", |
|
|
"query_feedback": "", |
|
|
"loop_max_iter": 3 |
|
|
} |
|
|
|
|
|
output = WORKFLOW_APP.invoke(inputs) |
|
|
|
|
|
|
|
|
response = output.get("response", "Error: No response generated.") |
|
|
|
|
|
|
|
|
if isinstance(response, str): |
|
|
return response.strip() |
|
|
elif hasattr(response, "content"): |
|
|
return response.content.strip() |
|
|
else: |
|
|
|
|
|
return str(response).strip() |
|
|
|
|
|
|
|
|
groq_api_key = os.getenv('Groq') |
|
|
|
|
|
|
|
|
llama_guard_client = Groq(api_key=groq_api_key) |
|
|
|
|
|
|
|
|
def filter_input_with_llama_guard(user_input, model="llama-guard-3-8b"): |
|
|
""" |
|
|
Filters user input using Llama Guard to ensure it is safe. |
|
|
|
|
|
Parameters: |
|
|
- user_input: The input provided by the user. |
|
|
- model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). |
|
|
|
|
|
Returns: |
|
|
- The filtered and safe input. |
|
|
""" |
|
|
try: |
|
|
|
|
|
llama_response = llama_guard_client.chat.completions.create( |
|
|
messages=[{"role": "user", "content": user_input}], |
|
|
model=model, |
|
|
) |
|
|
|
|
|
return llama_response.choices[0].message.content.strip() |
|
|
except Exception as e: |
|
|
print(f"Error with Llama Guard: {e}") |
|
|
return None |
|
|
|
|
|
class NutritionBot: |
|
|
def __init__(self): |
|
|
""" |
|
|
Initialize the NutritionBot class with memory, LLM client, tools, and the agent executor. |
|
|
""" |
|
|
|
|
|
|
|
|
self.memory = MemoryClient(api_key=os.getenv("Mem0")) |
|
|
|
|
|
|
|
|
self.llm = ChatOpenAI( |
|
|
model="gpt-4o-mini", |
|
|
openai_api_key=os.getenv("API_KEY"), |
|
|
openai_api_base=os.getenv("OPENAI_API_BASE"), |
|
|
temperature=0, |
|
|
streaming=False |
|
|
) |
|
|
|
|
|
|
|
|
system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
|
|
Guidelines for Interaction: |
|
|
Maintain a polite, professional, and reassuring tone. |
|
|
Show genuine empathy for customer concerns and health challenges. |
|
|
Reference past interactions to provide personalized and consistent advice. |
|
|
Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
|
|
Ensure consistent and accurate information across conversations. |
|
|
If any detail is unclear or missing, proactively ask for clarification. |
|
|
Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
|
|
Keep track of ongoing issues and follow-ups to ensure continuity in support. |
|
|
Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences.""" |
|
|
|
|
|
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_prompt), |
|
|
("human", "{input}"), |
|
|
("placeholder", "{agent_scratchpad}") |
|
|
]) |
|
|
|
|
|
|
|
|
self.tools = [agentic_rag] |
|
|
|
|
|
|
|
|
self.agent_executor = initialize_agent( |
|
|
tools=self.tools, |
|
|
llm=self.llm, |
|
|
prompt=prompt, |
|
|
agent=AgentType.OPENAI_FUNCTIONS, |
|
|
verbose=True, |
|
|
handle_parsing_errors=True, |
|
|
return_intermediate_steps=True |
|
|
) |
|
|
|
|
|
def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): |
|
|
""" |
|
|
Store customer interaction in memory for future reference. |
|
|
|
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
message (str): Customer's query or message. |
|
|
response (str): Chatbot's response. |
|
|
metadata (Dict, optional): Additional metadata for the interaction. |
|
|
""" |
|
|
if metadata is None: |
|
|
metadata = {} |
|
|
|
|
|
|
|
|
metadata["timestamp"] = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
conversation = [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": response} |
|
|
] |
|
|
|
|
|
|
|
|
self.memory.add( |
|
|
conversation, |
|
|
user_id=user_id, |
|
|
output_format="v1.1", |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: |
|
|
""" |
|
|
Retrieve past interactions relevant to the current query. |
|
|
|
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
query (str): The customer's current query. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: A list of relevant past interactions. |
|
|
""" |
|
|
return self.memory.search( |
|
|
query=query, |
|
|
user_id=user_id, |
|
|
limit=5 |
|
|
) |
|
|
|
|
|
def handle_customer_query(self, user_id: str, query: str) -> str: |
|
|
""" |
|
|
Process a customer's query and provide a response, taking into account past interactions. |
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
query (str): Customer's query. |
|
|
|
|
|
Returns: |
|
|
str: Chatbot's response. |
|
|
""" |
|
|
|
|
|
relevant_history = self.get_relevant_history(user_id, query) |
|
|
|
|
|
|
|
|
context = "Previous relevant interactions:\n" |
|
|
for memory in relevant_history: |
|
|
context += f"{memory}\n---\n" |
|
|
print(f"Context:{context}") |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
Context: {context} |
|
|
Current customer query: {query} |
|
|
Provide a helpful response that takes into account any relevant past interactions. |
|
|
""" |
|
|
try: |
|
|
response = self.agent_executor.invoke({"input": prompt}) |
|
|
except Exception as e: |
|
|
print(f"An error occurred while invoking the agent executor: {e}") |
|
|
return "I'm sorry, something went wrong while processing your request." |
|
|
|
|
|
self.store_customer_interaction( |
|
|
user_id=user_id, |
|
|
message=query, |
|
|
response=response["output"], |
|
|
metadata={"type": "support_query"} |
|
|
) |
|
|
|
|
|
return response["output"] |
|
|
|
|
|
|
|
|
def nutrition_disorder_streamlit(): |
|
|
""" |
|
|
A Streamlit-based UI for the Nutrition Disorder Specialist Agent. |
|
|
""" |
|
|
st.title("Nutrition Disorder Specialist") |
|
|
st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
|
|
st.write("Type 'exit' to end the conversation.") |
|
|
|
|
|
|
|
|
if 'user_id' not in st.session_state: |
|
|
st.session_state.user_id = None |
|
|
|
|
|
|
|
|
user_chat_key = f"chat_history_{st.session_state.user_id}" if st.session_state.user_id else "chat_history_temp" |
|
|
|
|
|
if user_chat_key not in st.session_state: |
|
|
st.session_state[user_chat_key] = [] |
|
|
|
|
|
|
|
|
if st.session_state.user_id is None: |
|
|
with st.form("login_form", clear_on_submit=True): |
|
|
user_id = st.text_input("Please enter your name to begin:") |
|
|
submit_button = st.form_submit_button("Login") |
|
|
if submit_button and user_id: |
|
|
st.session_state.user_id = user_id |
|
|
user_chat_key = f"chat_history_{user_id}" |
|
|
st.session_state[user_chat_key] = [{ |
|
|
"role": "assistant", |
|
|
"content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" |
|
|
}] |
|
|
st.session_state.login_submitted = True |
|
|
if st.session_state.get("login_submitted", False): |
|
|
st.session_state.pop("login_submitted") |
|
|
st.rerun() |
|
|
else: |
|
|
|
|
|
for message in st.session_state[user_chat_key]: |
|
|
with st.chat_message(message["role"]): |
|
|
st.write(message["content"]) |
|
|
|
|
|
|
|
|
user_query = st.chat_input("Type your question here or 'exit' to end") |
|
|
if user_query: |
|
|
if user_query.lower() == "exit": |
|
|
st.session_state[user_chat_key].append({"role": "user", "content": "exit"}) |
|
|
with st.chat_message("user"): |
|
|
st.write("exit") |
|
|
goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." |
|
|
st.session_state[user_chat_key].append({"role": "assistant", "content": goodbye_msg}) |
|
|
with st.chat_message("assistant"): |
|
|
st.write(goodbye_msg) |
|
|
st.session_state.pop(user_chat_key, None) |
|
|
st.session_state.user_id = None |
|
|
st.rerun() |
|
|
return |
|
|
|
|
|
st.session_state[user_chat_key].append({"role": "user", "content": user_query}) |
|
|
with st.chat_message("user"): |
|
|
st.write(user_query) |
|
|
|
|
|
|
|
|
filtered_result = filter_input_with_llama_guard(user_query) |
|
|
filtered_result = filtered_result.replace("\n", " ") |
|
|
|
|
|
|
|
|
if filtered_result in ['safe', 'unsafe S6', 'unsafe S7']: |
|
|
try: |
|
|
if 'chatbot' not in st.session_state: |
|
|
st.session_state.chatbot = NutritionBot() |
|
|
response = st.session_state.chatbot.handle_customer_query( |
|
|
st.session_state.user_id, |
|
|
user_query |
|
|
) |
|
|
st.write(response) |
|
|
st.session_state[user_chat_key].append({"role": "assistant", "content": response}) |
|
|
except Exception as e: |
|
|
error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" |
|
|
st.write(error_msg) |
|
|
st.session_state[user_chat_key].append({"role": "assistant", "content": error_msg}) |
|
|
else: |
|
|
inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." |
|
|
st.write(inappropriate_msg) |
|
|
st.session_state[user_chat_key].append({"role": "assistant", "content": inappropriate_msg}) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
nutrition_disorder_streamlit() |
|
|
|