SparrowAgent / src /nodes /queryNode.py
nivakaran's picture
Create queryNode.py
1ee6a1a verified
from datetime import datetime
from typing_extensions import Literal
from src.llms.groqllm import GroqLLM
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, get_buffer_string
from src.utils.prompts import clarification_with_user_instructions, transform_messages_into_customer_query_brief_prompt
from src.states.queryState import SparrowAgentState, ClarifyWithUser, CustomerQuestion # Import from correct module
from src.utils.utils import get_today_str
class QueryNode:
def __init__(self, llm):
self.llm = llm
def clarify_with_user(self, state: SparrowAgentState) -> SparrowAgentState:
"""
Determine if the user's request contains sufficient information to proceed.
Returns updated state with clarification status.
"""
structured_output_model = self.llm.with_structured_output(ClarifyWithUser)
try:
response = structured_output_model.invoke([
SystemMessage(
content="Route the input to yes or no based on the need of clarification of the query"
),
HumanMessage(
content=clarification_with_user_instructions.format(
messages=get_buffer_string(messages=state.get("messages", [])),
date=get_today_str()
)
)
])
print("CLARIFICATION RESPONSE:", response)
# Update state based on response
updated_state = {**state}
if response.need_clarification == 'yes':
updated_state.update({
"messages": state.get("messages", []) + [AIMessage(content=response.question)],
"clarification_complete": False,
"needs_clarification": True
})
# Add to notes for routing logic
updated_state["notes"] = state.get("notes", []) + ["Clarification requested from user"]
else:
updated_state.update({
"messages": state.get("messages", []) + [AIMessage(content=response.verification)],
"clarification_complete": True,
"needs_clarification": False
})
# Add to notes for routing logic
updated_state["notes"] = state.get("notes", []) + ["Clarification complete, sufficient information provided"]
return updated_state
except Exception as e:
print(f"Error in clarify_with_user: {e}")
return {
**state,
"clarification_complete": False,
"needs_clarification": True,
"notes": state.get("notes", []) + [f"Error in clarification: {str(e)}"],
"error": str(e)
}
def write_query_brief(self, state: SparrowAgentState) -> SparrowAgentState:
"""
Transform the conversation history into a comprehensive customer query brief.
"""
try:
# Use the correct CustomerQuestion from queryState
structured_output_model = self.llm.with_structured_output(CustomerQuestion)
messages = state.get("messages", [])
print("STATE MESSAGES:", messages)
if not messages:
print("ERROR: No messages in state")
return {
**state,
"query_brief": "",
"notes": state.get("notes", []) + ["No messages available for query brief creation"],
"error": "No messages available for query brief creation"
}
# Format the prompt with the current date
prompt = transform_messages_into_customer_query_brief_prompt.format(
messages=get_buffer_string(messages),
date=get_today_str() # Add the missing date parameter
)
print("PROMPT:", prompt[:500] + "..." if len(prompt) > 500 else prompt)
# Test raw response first for debugging
try:
raw_response = self.llm.invoke([HumanMessage(content=prompt)])
print("RAW MODEL RESPONSE:", raw_response.content[:200] + "..." if len(raw_response.content) > 200 else raw_response.content)
except Exception as e:
print(f"Raw response test failed: {e}")
# Get structured response
response = structured_output_model.invoke([
SystemMessage(content="You are a helpful assistant that creates detailed query briefs based on conversation history."),
HumanMessage(content=prompt)
])
print("STRUCTURED RESPONSE:", response)
if response is None:
print("ERROR: Structured response is None")
return {
**state,
"query_brief": "",
"notes": state.get("notes", []) + ["Failed to generate structured response"],
"error": "Failed to generate structured response"
}
# Validate that we got a proper query brief
query_brief = getattr(response, 'query_brief', '') or ''
if not query_brief or len(query_brief.strip()) < 10:
print(f"ERROR: Query brief too short or empty: '{query_brief}'")
return {
**state,
"query_brief": "",
"notes": state.get("notes", []) + ["Generated query brief was too short or empty"],
"error": "Generated query brief was insufficient"
}
print(f"SUCCESS: Generated query brief: {query_brief}")
return {
**state,
"query_brief": query_brief,
"master_messages": [HumanMessage(content=query_brief)],
"query_brief_complete": True,
"notes": state.get("notes", []) + ["Query brief successfully created"]
}
except Exception as e:
print(f"Error in write_query_brief: {e}")
import traceback
traceback.print_exc()
return {
**state,
"query_brief": "",
"notes": state.get("notes", []) + [f"Query brief creation failed: {str(e)}"],
"error": str(e)
}