|
|
|
|
|
|
|
|
import os |
|
|
import chromadb |
|
|
from dotenv import load_dotenv |
|
|
import json |
|
|
|
|
|
|
|
|
from langchain_core.documents import Document |
|
|
from langchain_core.runnables import RunnablePassthrough |
|
|
from langchain_core.output_parsers import StrOutputParser |
|
|
from langchain.prompts import ChatPromptTemplate |
|
|
from langchain.chains.query_constructor.base import AttributeInfo |
|
|
from langchain.retrievers.self_query.base import SelfQueryRetriever |
|
|
from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
|
|
from langchain.retrievers import ContextualCompressionRetriever |
|
|
|
|
|
|
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
|
|
from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
|
|
from langchain_experimental.text_splitter import SemanticChunker |
|
|
from langchain.text_splitter import ( |
|
|
CharacterTextSplitter, |
|
|
RecursiveCharacterTextSplitter |
|
|
) |
|
|
from langchain_core.tools import tool |
|
|
from langchain.agents import create_tool_calling_agent, AgentExecutor |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
|
|
|
|
|
|
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI |
|
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
|
|
|
|
|
|
|
from llama_parse import LlamaParse |
|
|
from llama_index.core import Settings, SimpleDirectoryReader |
|
|
|
|
|
|
|
|
from langgraph.graph import StateGraph, END, START |
|
|
|
|
|
|
|
|
from pydantic import BaseModel |
|
|
|
|
|
|
|
|
from typing import Dict, List, Tuple, Any, TypedDict |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
from groq import Groq |
|
|
from mem0 import MemoryClient |
|
|
import streamlit as st |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
api_key = config.get("API_KEY") |
|
|
endpoint = config.get("OPENAI_API_BASE") |
|
|
llama_api_key = os.environ['GROQ_API_KEY'] |
|
|
MEM0_api_key = os.environ['mem0'] |
|
|
|
|
|
|
|
|
embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( |
|
|
api_base=endpoint, |
|
|
api_key=api_key, |
|
|
model_name='text-embedding-ada-002' |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
embedding_model = OpenAIEmbeddings( |
|
|
openai_api_base=endpoint, |
|
|
openai_api_key=api_key, |
|
|
model='text-embedding-ada-002' |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI( |
|
|
openai_api_base=endpoint, |
|
|
openai_api_key=api_key, |
|
|
model="gpt-4o-mini", |
|
|
streaming=False |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
Settings.llm = llm |
|
|
Settings.embedding = embedding_model |
|
|
|
|
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
|
query: str |
|
|
expanded_query: str |
|
|
context: List[Dict[str, Any]] |
|
|
response: str |
|
|
precision_score: float |
|
|
groundedness_score: float |
|
|
groundedness_loop_count: int |
|
|
precision_loop_count: int |
|
|
feedback: str |
|
|
query_feedback: str |
|
|
groundedness_check: bool |
|
|
loop_max_iter: int |
|
|
|
|
|
def expand_query(state): |
|
|
""" |
|
|
Expands the user query to improve retrieval of nutrition disorder-related information. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the user query. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the expanded query. |
|
|
""" |
|
|
print("---------Expanding Query---------") |
|
|
system_message = '''You are an AI assistant whose task is to expand a user's query to make it more effective for searching a knowledge base about nutritional disorders. |
|
|
The expanded query should be more detailed and may include synonyms or related terms. |
|
|
Focus on aspects relevant to nutritional disorders, such as: |
|
|
- Symptoms |
|
|
- Causes |
|
|
- Diagnosis |
|
|
- Treatment options |
|
|
- Management strategies |
|
|
- Specific vitamins, minerals, or nutrient imbalances |
|
|
- Related health conditions |
|
|
|
|
|
Ensure the expanded query remains true to the original intent of the user's query. |
|
|
Use any provided feedback to guide the expansion. If the feedback is 'NA', empty, or not relevant, expand the query based on the original query alone. |
|
|
The output should be only the expanded query, ready for a semantic search.''' |
|
|
|
|
|
|
|
|
expand_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Expand this query: {query} using the feedback: {query_feedback}") |
|
|
|
|
|
]) |
|
|
|
|
|
chain = expand_prompt | llm | StrOutputParser() |
|
|
expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) |
|
|
print("expanded_query", expanded_query) |
|
|
state["expanded_query"] = expanded_query |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
vector_store = Chroma( |
|
|
collection_name="nutritional_hypotheticals", |
|
|
persist_directory="./nutritional_db", |
|
|
embedding_function=embedding_model |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
retriever = vector_store.as_retriever( |
|
|
search_type='similarity', |
|
|
search_kwargs={'k': 3} |
|
|
) |
|
|
|
|
|
def retrieve_context(state): |
|
|
""" |
|
|
Retrieves context from the vector store using the expanded or original query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the retrieved context. |
|
|
""" |
|
|
print("---------retrieve_context---------") |
|
|
query = state['query'] |
|
|
|
|
|
|
|
|
|
|
|
docs = retriever.invoke(query) |
|
|
print("Retrieved documents:", docs) |
|
|
|
|
|
|
|
|
context= [ |
|
|
{ |
|
|
"content": doc.page_content, |
|
|
"metadata": doc.metadata |
|
|
} |
|
|
for doc in docs |
|
|
] |
|
|
state['context'] = context |
|
|
print("Extracted context with metadata:", context) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def craft_response(state: Dict) -> Dict: |
|
|
""" |
|
|
Generates a response using the retrieved context, focusing on nutrition disorders. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and retrieved context. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the generated response. |
|
|
""" |
|
|
print("---------craft_response---------") |
|
|
system_message = '''You are a Smart Nutrition Disorder Specialist Bot. Your primary task is to answer the user's 'Query' with a focus on nutrition disorders, using *only* the provided 'Context'. |
|
|
Follow these instructions carefully: |
|
|
1. Base your answer strictly on the information found in the 'Context'. Do not add any information from outside the 'Context'. |
|
|
2. If the 'Context' directly addresses the 'Query', synthesize the relevant information into a clear, concise, and helpful answer. |
|
|
3. The 'feedback' field will tell you if the provided 'Context' was deemed relevant ('Relevant') or not ('Not Relevant') to the query by a previous step. |
|
|
- If the feedback is 'Not Relevant', or if the 'Context' (even if deemed 'Relevant') does not contain information to answer the 'Query', you MUST explicitly state that you cannot answer the query based on the provided information. For example: "I cannot answer this query based on the provided documents." |
|
|
- If the feedback is 'Relevant' and the context contains the answer, proceed to answer. |
|
|
4. Do not make assumptions or infer information beyond what is explicitly stated in the 'Context'. |
|
|
5. If you are stating you cannot answer, do not apologize or use conversational fluff beyond the direct statement. |
|
|
6. Your response should be directly to the user, addressing their 'Query'.''' |
|
|
|
|
|
response_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") |
|
|
]) |
|
|
|
|
|
chain = response_prompt | llm |
|
|
response = chain.invoke({ |
|
|
"query": state['query'], |
|
|
"context": "\n".join([doc["content"] for doc in state['context']]), |
|
|
"feedback": state['feedback'] |
|
|
}) |
|
|
state['response'] = response |
|
|
print("intermediate response: ", response) |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def score_groundedness(state: Dict) -> Dict: |
|
|
""" |
|
|
Checks whether the response is grounded in the retrieved context. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the response and context. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the groundedness score. |
|
|
""" |
|
|
print("---------check_groundedness---------") |
|
|
system_message = '''Your task is to evaluate the groundedness of a 'Response' based *solely* on the provided 'Context'. |
|
|
Groundedness means that all factual claims, statements, and information within the 'Response' must be directly supported by, or be a logical deduction from, the information present in the 'Context'. The 'Response' must not introduce any external information, new entities, or claims not found in the 'Context'. |
|
|
|
|
|
You need to provide a numerical score between 0.0 and 1.0, where: |
|
|
- **1.0**: The 'Response' is perfectly and entirely grounded in the 'Context'. Every piece of information in the 'Response' is explicitly stated or directly and undeniably implied by the 'Context'. |
|
|
- **0.75**: The 'Response' is mostly grounded, but might contain very minor, trivial information not explicitly in the context, or makes very safe, direct inferences. |
|
|
- **0.5**: The 'Response' is partially grounded. Some significant parts of the 'Response' are supported by the 'Context', but other significant parts are not, or external information is introduced. |
|
|
- **0.25**: The 'Response' is poorly grounded. Most of the 'Response' is not supported by the 'Context' or relies heavily on external information. |
|
|
- **0.0**: The 'Response' is not at all grounded in the 'Context' or completely contradicts the 'Context'. It seems to be based on external knowledge or is a hallucination. |
|
|
|
|
|
Analyze the 'Response' carefully against the 'Context'. Your output must be ONLY the numerical score (e.g., 0.0, 0.5, 1.0). Do not include any other text, explanation, or labels.''' |
|
|
|
|
|
groundedness_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
|
|
]) |
|
|
|
|
|
chain = groundedness_prompt | llm | StrOutputParser() |
|
|
groundedness_score = float(chain.invoke({ |
|
|
"context": "\n".join([doc["content"] for doc in state['context']]), |
|
|
"response": state['response'] |
|
|
})) |
|
|
print("groundedness_score: ", groundedness_score) |
|
|
state['groundedness_loop_count'] += 1 |
|
|
print("#########Groundedness Incremented###########") |
|
|
state['groundedness_score'] = groundedness_score |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def check_precision(state: Dict) -> Dict: |
|
|
""" |
|
|
Checks whether the response precisely addresses the user’s query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with the precision score. |
|
|
""" |
|
|
print("---------check_precision---------") |
|
|
system_message = '''You are an expert evaluator. Your specific task is to assess the precision of a 'Response' in relation to a given 'Query'. |
|
|
Precision refers to how well the 'Response' directly, accurately, and comprehensively answers all aspects of the 'Query'. The 'Response' should be focused on the question asked and should not evade the question or provide irrelevant information in place of a direct answer. |
|
|
|
|
|
Please provide a numerical precision score between 0.0 and 1.0, based on the following scale: |
|
|
- **1.0 (Perfect Precision)**: The 'Response' directly, accurately, and completely answers all aspects of the 'Query'. There is no ambiguity, and all parts of the 'Query' are addressed. |
|
|
- **0.75 (High Precision)**: The 'Response' addresses the main aspects of the 'Query' well and directly, but might miss very minor details, or could be slightly more concise or direct in one aspect. |
|
|
- **0.5 (Moderate Precision)**: The 'Response' addresses some significant parts of the 'Query' but fails to answer other important aspects, or the answer is somewhat indirect or incomplete for key parts of the 'Query'. |
|
|
- **0.25 (Low Precision)**: The 'Response' only superficially addresses the 'Query', misses most key aspects, or is largely indirect. |
|
|
- **0.0 (No Precision)**: The 'Response' does not answer the 'Query' at all, is completely off-topic, or clearly misunderstands the 'Query'. |
|
|
|
|
|
Focus solely on whether the 'Response' answers the 'Query'. Do not evaluate for factual accuracy or groundedness here (assume that's handled elsewhere). |
|
|
Your output must be ONLY the numerical score (e.g., 0.0, 0.5, 1.0). Do not include any other text, explanation, or labels.''' |
|
|
|
|
|
precision_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
|
|
]) |
|
|
|
|
|
chain = precision_prompt | llm | StrOutputParser() |
|
|
precision_score = float(chain.invoke({ |
|
|
"query": state['query'], |
|
|
"response":state['response'] |
|
|
})) |
|
|
state['precision_score'] = precision_score |
|
|
print("precision_score:", precision_score) |
|
|
state['precision_loop_count'] +=1 |
|
|
print("#########Precision Incremented###########") |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def refine_response(state: Dict) -> Dict: |
|
|
""" |
|
|
Suggests improvements for the generated response. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with response refinement suggestions. |
|
|
""" |
|
|
print("---------refine_response---------") |
|
|
|
|
|
system_message = '''You are an expert AI assistant tasked with providing constructive feedback to refine a given 'Response' based on an original 'Query'. Your objective is to help make the 'Response' more accurate, complete, and clear, without rewriting it yourself. |
|
|
Focus on the following when generating your suggestions: |
|
|
1. **Identify Gaps:** Does the 'Response' miss any important aspects or details requested by or implied in the 'Query'? |
|
|
2. **Spot Ambiguities:** Are there parts of the 'Response' that are unclear, vague, or could be misinterpreted? |
|
|
3. **Completeness:** Could the 'Response' be more thorough or provide more comprehensive information relevant to the 'Query'? |
|
|
4. **Accuracy of Addressal:** While not judging factual correctness of the content itself (assume content is from a trusted source if provided as context in other steps), does the 'Response' accurately address all parts of the 'Query'? For example, if the Query has multiple parts, are all parts answered? |
|
|
5. **Actionable Suggestions:** Your feedback should be specific and actionable. Instead of saying "This is unclear," try "Consider clarifying [specific part] by adding details about [specific aspect]." |
|
|
|
|
|
**Important Constraints:** |
|
|
* **Do NOT rewrite the response.** Your role is to provide suggestions for improvement. |
|
|
* The output should be a list of bullet points or a concise paragraph outlining these suggestions. |
|
|
* The tone should be constructive and helpful. |
|
|
|
|
|
You will be given the 'Query' and the 'Response'. Your feedback should guide the enhancement of the 'Response' to better serve the 'Query'.''' |
|
|
|
|
|
refine_response_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Query: {query}\nResponse: {response}\n\n" |
|
|
"What improvements can be made to enhance accuracy and completeness?") |
|
|
]) |
|
|
|
|
|
chain = refine_response_prompt | llm| StrOutputParser() |
|
|
|
|
|
|
|
|
feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" |
|
|
print("feedback: ", feedback) |
|
|
print(f"State: {state}") |
|
|
state['feedback'] = feedback |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def refine_query(state: Dict) -> Dict: |
|
|
""" |
|
|
Suggests improvements for the expanded query. |
|
|
|
|
|
Args: |
|
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
|
|
Returns: |
|
|
Dict: The updated state with query refinement suggestions. |
|
|
""" |
|
|
print("---------refine_query---------") |
|
|
system_message = '''You are an AI expert in search query optimization and refinement. Your task is to analyze an 'Expanded Query', which was generated from an 'Original Query', and provide specific, actionable suggestions to improve its effectiveness for retrieving precise information, particularly from a knowledge base focused on nutrition disorders. |
|
|
You will receive both the 'Original Query' and the current 'Expanded Query'. |
|
|
Based on these, your goal is to offer suggestions that could enhance the 'Expanded Query'. Consider these aspects: |
|
|
|
|
|
1. **Keyword Enhancement**: |
|
|
* Are there any missing critical keywords, synonyms, or related terms (e.g., specific nutrient names, medical terminology for symptoms or diagnoses, alternative names for conditions) that, if added, would likely improve search results? |
|
|
* Are there any overly generic terms that could be replaced with more specific ones relevant to nutrition disorders? |
|
|
|
|
|
2. **Scope Refinement**: |
|
|
* Is the 'Expanded Query' too broad, potentially leading to many irrelevant results? If so, suggest how to narrow its focus (e.g., by adding terms related to specific aspects like "treatment options," "diagnostic criteria," "pediatric impact," "dietary management"). |
|
|
* Is the 'Expanded Query' too narrow, potentially missing relevant broader information? If so, suggest how to carefully broaden it. |
|
|
|
|
|
3. **Clarity and Precision**: |
|
|
* Are there any ambiguous terms or phrases in the 'Expanded Query' that could be clarified? |
|
|
* Could the structure of the query be improved for better interpretation by a search system (e.g., using boolean-like operators if appropriate, though usually just natural language is fine, focus on phrasing)? |
|
|
|
|
|
4. **Conciseness**: |
|
|
* Can any redundant terms or phrases be removed from the 'Expanded Query' without losing essential meaning or search effectiveness? |
|
|
|
|
|
**Crucial Instructions:** |
|
|
* **Do NOT provide a new, rewritten 'Expanded Query'.** Your output must be a set of *suggestions* on how the *existing* 'Expanded Query' could be improved. For example: "Consider adding 'vitamin B12 deficiency symptoms'," or "Try specifying 'for adults' to narrow the scope," or "The term 'food issues' is vague; consider replacing with 'specific dietary intolerances'." |
|
|
* Your suggestions should be presented clearly, perhaps as a bulleted list or a concise paragraph. |
|
|
* The ultimate aim is to help craft an 'Expanded Query' that will yield highly relevant and precise search results from a nutrition-focused database. |
|
|
|
|
|
Focus your suggestions on improving the 'Expanded Query' for its intended purpose of information retrieval.''' |
|
|
|
|
|
refine_query_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_message), |
|
|
("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" |
|
|
"What improvements can be made for a better search?") |
|
|
]) |
|
|
|
|
|
chain = refine_query_prompt | llm | StrOutputParser() |
|
|
|
|
|
|
|
|
query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" |
|
|
print("query_feedback: ", query_feedback) |
|
|
print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
|
|
state['query_feedback'] = query_feedback |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def should_continue_groundedness(state): |
|
|
"""Decides if groundedness is sufficient or needs improvement.""" |
|
|
print("---------should_continue_groundedness---------") |
|
|
print("groundedness loop count: ", state['groundedness_loop_count']) |
|
|
if state['groundedness_score'] >= 0.75: |
|
|
print("Moving to precision") |
|
|
return "check_precision" |
|
|
else: |
|
|
if state["groundedness_loop_count"] > state['loop_max_iter']: |
|
|
return "max_iterations_reached" |
|
|
else: |
|
|
print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") |
|
|
return "refine_response" |
|
|
|
|
|
|
|
|
def should_continue_precision(state: Dict) -> str: |
|
|
"""Decides if precision is sufficient or needs improvement.""" |
|
|
print("---------should_continue_precision---------") |
|
|
print("precision loop count: ", state['precision_loop_count']) |
|
|
if state['precision_score'] >= 0.75: |
|
|
return "pass" |
|
|
else: |
|
|
if state['loop_max_iter']: |
|
|
return "max_iterations_reached" |
|
|
else: |
|
|
print(f"---------Precision Score Threshold Not met. Refining Query-----------") |
|
|
return "refine_query" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def max_iterations_reached(state: Dict) -> Dict: |
|
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
|
print("---------max_iterations_reached---------") |
|
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
|
response = "I'm unable to refine the response further. Please provide more context or clarify your question." |
|
|
state['response'] = response |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
from langgraph.graph import END, StateGraph, START |
|
|
|
|
|
def create_workflow() -> StateGraph: |
|
|
"""Creates the updated workflow for the AI nutrition agent.""" |
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
workflow.add_node("expand_query", expand_query ) |
|
|
workflow.add_node("retrieve_context", retrieve_context ) |
|
|
workflow.add_node("craft_response", craft_response ) |
|
|
workflow.add_node("score_groundedness", score_groundedness ) |
|
|
workflow.add_node("refine_response", refine_response ) |
|
|
workflow.add_node("check_precision", check_precision ) |
|
|
workflow.add_node("refine_query", refine_query ) |
|
|
workflow.add_node("max_iterations_reached", max_iterations_reached ) |
|
|
|
|
|
|
|
|
workflow.add_edge(START, "expand_query") |
|
|
workflow.add_edge("expand_query", "retrieve_context") |
|
|
workflow.add_edge("retrieve_context", "craft_response") |
|
|
workflow.add_edge("craft_response", "score_groundedness") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"score_groundedness", |
|
|
should_continue_groundedness, |
|
|
{ |
|
|
"check_precision": "check_precision", |
|
|
"refine_response": "refine_response", |
|
|
"max_iterations_reached": "max_iterations_reached" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge("refine_response", "craft_response") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"check_precision", |
|
|
should_continue_precision, |
|
|
{ |
|
|
"pass": END, |
|
|
"refine_query": "refine_query", |
|
|
"max_iterations_reached": "max_iterations_reached" |
|
|
} |
|
|
) |
|
|
|
|
|
workflow.add_edge("refine_query", "expand_query") |
|
|
|
|
|
workflow.add_edge("max_iterations_reached", END) |
|
|
|
|
|
return workflow |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WORKFLOW_APP = create_workflow().compile() |
|
|
@tool |
|
|
def agentic_rag(query: str): |
|
|
""" |
|
|
Runs the RAG-based agent with conversation history for context-aware responses. |
|
|
|
|
|
Args: |
|
|
query (str): The current user query. |
|
|
|
|
|
Returns: |
|
|
Dict[str, Any]: The updated state with the generated response and conversation history. |
|
|
""" |
|
|
|
|
|
inputs = { |
|
|
"query": query, |
|
|
"expanded_query": "", |
|
|
"context": [], |
|
|
"response": "", |
|
|
"precision_score": 0.0, |
|
|
"groundedness_score": 0.0, |
|
|
"groundedness_loop_count": 0, |
|
|
"precision_loop_count": 0, |
|
|
"feedback": "", |
|
|
"query_feedback": "", |
|
|
"loop_max_iter": 3 |
|
|
} |
|
|
|
|
|
output = WORKFLOW_APP.invoke(inputs) |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
llama_guard_client = Groq(api_key=llama_api_key) |
|
|
|
|
|
def filter_input_with_llama_guard(user_input, model="llama-guard-3-8b"): |
|
|
""" |
|
|
Filters user input using Llama Guard to ensure it is safe. |
|
|
|
|
|
Parameters: |
|
|
- user_input: The input provided by the user. |
|
|
- model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). |
|
|
|
|
|
Returns: |
|
|
- The filtered and safe input. |
|
|
""" |
|
|
try: |
|
|
|
|
|
response = llama_guard_client.chat.completions.create( |
|
|
messages=[{"role": "user", "content": user_input}], |
|
|
model=model, |
|
|
) |
|
|
|
|
|
return response.choices[0].message.content.strip() |
|
|
except Exception as e: |
|
|
print(f"Error with Llama Guard: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NutritionBot: |
|
|
def __init__(self): |
|
|
""" |
|
|
Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. |
|
|
""" |
|
|
|
|
|
|
|
|
self.memory = MemoryClient(api_key=userdata.get("mem0")) |
|
|
|
|
|
|
|
|
self.client = ChatOpenAI( |
|
|
model_name="gpt-4o-mini", |
|
|
api_key=config.get("API_KEY"), |
|
|
endpoint = config.get("OPENAI_API_BASE"), |
|
|
temperature=0 |
|
|
) |
|
|
|
|
|
|
|
|
tools = [agentic_rag] |
|
|
|
|
|
|
|
|
system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
|
|
Guidelines for Interaction: |
|
|
Maintain a polite, professional, and reassuring tone. |
|
|
Show genuine empathy for customer concerns and health challenges. |
|
|
Reference past interactions to provide personalized and consistent advice. |
|
|
Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
|
|
Ensure consistent and accurate information across conversations. |
|
|
If any detail is unclear or missing, proactively ask for clarification. |
|
|
Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
|
|
Keep track of ongoing issues and follow-ups to ensure continuity in support. |
|
|
Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_prompt), |
|
|
("human", "{input}"), |
|
|
("placeholder", "{agent_scratchpad}") |
|
|
]) |
|
|
|
|
|
|
|
|
agent = create_tool_calling_agent(self.client, tools, prompt) |
|
|
|
|
|
|
|
|
self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) |
|
|
|
|
|
|
|
|
def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): |
|
|
""" |
|
|
Store customer interaction in memory for future reference. |
|
|
|
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
message (str): Customer's query or message. |
|
|
response (str): Chatbot's response. |
|
|
metadata (Dict, optional): Additional metadata for the interaction. |
|
|
""" |
|
|
if metadata is None: |
|
|
metadata = {} |
|
|
|
|
|
|
|
|
metadata["timestamp"] = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
conversation = [ |
|
|
{"role": "user", "content": message}, |
|
|
{"role": "assistant", "content": response} |
|
|
] |
|
|
|
|
|
|
|
|
self.memory.add( |
|
|
conversation, |
|
|
user_id=user_id, |
|
|
output_format="v1.1", |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
|
|
|
def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: |
|
|
""" |
|
|
Retrieve past interactions relevant to the current query. |
|
|
|
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
query (str): The customer's current query. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: A list of relevant past interactions. |
|
|
""" |
|
|
return self.memory.search( |
|
|
query=query, |
|
|
user_id=user_id, |
|
|
limit= 5 |
|
|
) |
|
|
|
|
|
|
|
|
def handle_customer_query(self, user_id: str, query: str) -> str: |
|
|
""" |
|
|
Process a customer's query and provide a response, taking into account past interactions. |
|
|
|
|
|
Args: |
|
|
user_id (str): Unique identifier for the customer. |
|
|
query (str): Customer's query. |
|
|
|
|
|
Returns: |
|
|
str: Chatbot's response. |
|
|
""" |
|
|
|
|
|
|
|
|
relevant_history = self.get_relevant_history(user_id, query) |
|
|
|
|
|
|
|
|
context = "Previous relevant interactions:\n" |
|
|
for memory in relevant_history: |
|
|
context += f"Customer: {memory['memory']}\n" |
|
|
context += f"Support: {memory['memory']}\n" |
|
|
context += "---\n" |
|
|
|
|
|
|
|
|
print("Context: ", context) |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
Context: |
|
|
{context} |
|
|
|
|
|
Current customer query: {query} |
|
|
|
|
|
Provide a helpful response that takes into account any relevant past interactions. |
|
|
""" |
|
|
|
|
|
|
|
|
response = self.agent_executor.invoke({"input": prompt}) |
|
|
|
|
|
|
|
|
self.store_customer_interaction( |
|
|
user_id=user_id, |
|
|
message=query, |
|
|
response=response["output"], |
|
|
metadata={"type": "support_query"} |
|
|
) |
|
|
|
|
|
|
|
|
return response['output'] |
|
|
|
|
|
|
|
|
|
|
|
def nutrition_disorder_streamlit(): |
|
|
""" |
|
|
A Streamlit-based UI for the Nutrition Disorder Specialist Agent. |
|
|
""" |
|
|
st.title("Nutrition Disorder Specialist") |
|
|
st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
|
|
st.write("Type 'exit' to end the conversation.") |
|
|
|
|
|
|
|
|
if 'chat_history' not in st.session_state: |
|
|
st.session_state.chat_history = [] |
|
|
if 'user_id' not in st.session_state: |
|
|
st.session_state.user_id = None |
|
|
|
|
|
|
|
|
if st.session_state.user_id is None: |
|
|
with st.form("login_form", clear_on_submit=True): |
|
|
user_id = st.text_input("Please enter your name to begin:") |
|
|
submit_button = st.form_submit_button("Login") |
|
|
if submit_button and user_id: |
|
|
st.session_state.user_id = user_id |
|
|
st.session_state.chat_history.append({ |
|
|
"role": "assistant", |
|
|
"content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" |
|
|
}) |
|
|
st.session_state.login_submitted = True |
|
|
if st.session_state.get("login_submitted", False): |
|
|
st.session_state.pop("login_submitted") |
|
|
st.rerun() |
|
|
else: |
|
|
|
|
|
for message in st.session_state.chat_history: |
|
|
with st.chat_message(message["role"]): |
|
|
st.write(message["content"]) |
|
|
|
|
|
|
|
|
user_query = st.chat_input("Ask about nutrition disorders or type 'exit' to end...") |
|
|
if user_query: |
|
|
if user_query.lower() == "exit": |
|
|
st.session_state.chat_history.append({"role": "user", "content": "exit"}) |
|
|
with st.chat_message("user"): |
|
|
st.write("exit") |
|
|
goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) |
|
|
with st.chat_message("assistant"): |
|
|
st.write(goodbye_msg) |
|
|
st.session_state.user_id = None |
|
|
st.rerun() |
|
|
return |
|
|
|
|
|
st.session_state.chat_history.append({"role": "user", "content": user_query}) |
|
|
with st.chat_message("user"): |
|
|
st.write(user_query) |
|
|
|
|
|
|
|
|
filtered_result = filter_input_with_llama_guard(user_query) |
|
|
filtered_result = filtered_result.replace("\n", " ") |
|
|
|
|
|
|
|
|
if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: |
|
|
try: |
|
|
if 'chatbot' not in st.session_state: |
|
|
st.session_state.chatbot = NutritionBot() |
|
|
response = st.session_state.chatbot.handle_customer_query(st.session_state.user_id, user_query) |
|
|
|
|
|
st.write(response) |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": response}) |
|
|
except Exception as e: |
|
|
error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" |
|
|
st.write(error_msg) |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
|
|
else: |
|
|
inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." |
|
|
st.write(inappropriate_msg) |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
nutrition_disorder_streamlit() |
|
|
|