Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing_extensions import Literal | |
| from src.llms.groqllm import GroqLLM | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, get_buffer_string | |
| from src.utils.prompts import clarification_with_user_instructions, transform_messages_into_customer_query_brief_prompt | |
| from src.states.queryState import SparrowAgentState, ClarifyWithUser, CustomerQuestion # Import from correct module | |
| from src.utils.utils import get_today_str | |
| class QueryNode: | |
| def __init__(self, llm): | |
| self.llm = llm | |
| def clarify_with_user(self, state: SparrowAgentState) -> SparrowAgentState: | |
| """ | |
| Determine if the user's request contains sufficient information to proceed. | |
| Returns updated state with clarification status. | |
| """ | |
| structured_output_model = self.llm.with_structured_output(ClarifyWithUser) | |
| try: | |
| response = structured_output_model.invoke([ | |
| SystemMessage( | |
| content="Route the input to yes or no based on the need of clarification of the query" | |
| ), | |
| HumanMessage( | |
| content=clarification_with_user_instructions.format( | |
| messages=get_buffer_string(messages=state.get("messages", [])), | |
| date=get_today_str() | |
| ) | |
| ) | |
| ]) | |
| print("CLARIFICATION RESPONSE:", response) | |
| # Update state based on response | |
| updated_state = {**state} | |
| if response.need_clarification == 'yes': | |
| updated_state.update({ | |
| "messages": state.get("messages", []) + [AIMessage(content=response.question)], | |
| "clarification_complete": False, | |
| "needs_clarification": True | |
| }) | |
| # Add to notes for routing logic | |
| updated_state["notes"] = state.get("notes", []) + ["Clarification requested from user"] | |
| else: | |
| updated_state.update({ | |
| "messages": state.get("messages", []) + [AIMessage(content=response.verification)], | |
| "clarification_complete": True, | |
| "needs_clarification": False | |
| }) | |
| # Add to notes for routing logic | |
| updated_state["notes"] = state.get("notes", []) + ["Clarification complete, sufficient information provided"] | |
| return updated_state | |
| except Exception as e: | |
| print(f"Error in clarify_with_user: {e}") | |
| return { | |
| **state, | |
| "clarification_complete": False, | |
| "needs_clarification": True, | |
| "notes": state.get("notes", []) + [f"Error in clarification: {str(e)}"], | |
| "error": str(e) | |
| } | |
| def write_query_brief(self, state: SparrowAgentState) -> SparrowAgentState: | |
| """ | |
| Transform the conversation history into a comprehensive customer query brief. | |
| """ | |
| try: | |
| # Use the correct CustomerQuestion from queryState | |
| structured_output_model = self.llm.with_structured_output(CustomerQuestion) | |
| messages = state.get("messages", []) | |
| print("STATE MESSAGES:", messages) | |
| if not messages: | |
| print("ERROR: No messages in state") | |
| return { | |
| **state, | |
| "query_brief": "", | |
| "notes": state.get("notes", []) + ["No messages available for query brief creation"], | |
| "error": "No messages available for query brief creation" | |
| } | |
| # Format the prompt with the current date | |
| prompt = transform_messages_into_customer_query_brief_prompt.format( | |
| messages=get_buffer_string(messages), | |
| date=get_today_str() # Add the missing date parameter | |
| ) | |
| print("PROMPT:", prompt[:500] + "..." if len(prompt) > 500 else prompt) | |
| # Test raw response first for debugging | |
| try: | |
| raw_response = self.llm.invoke([HumanMessage(content=prompt)]) | |
| print("RAW MODEL RESPONSE:", raw_response.content[:200] + "..." if len(raw_response.content) > 200 else raw_response.content) | |
| except Exception as e: | |
| print(f"Raw response test failed: {e}") | |
| # Get structured response | |
| response = structured_output_model.invoke([ | |
| SystemMessage(content="You are a helpful assistant that creates detailed query briefs based on conversation history."), | |
| HumanMessage(content=prompt) | |
| ]) | |
| print("STRUCTURED RESPONSE:", response) | |
| if response is None: | |
| print("ERROR: Structured response is None") | |
| return { | |
| **state, | |
| "query_brief": "", | |
| "notes": state.get("notes", []) + ["Failed to generate structured response"], | |
| "error": "Failed to generate structured response" | |
| } | |
| # Validate that we got a proper query brief | |
| query_brief = getattr(response, 'query_brief', '') or '' | |
| if not query_brief or len(query_brief.strip()) < 10: | |
| print(f"ERROR: Query brief too short or empty: '{query_brief}'") | |
| return { | |
| **state, | |
| "query_brief": "", | |
| "notes": state.get("notes", []) + ["Generated query brief was too short or empty"], | |
| "error": "Generated query brief was insufficient" | |
| } | |
| print(f"SUCCESS: Generated query brief: {query_brief}") | |
| return { | |
| **state, | |
| "query_brief": query_brief, | |
| "master_messages": [HumanMessage(content=query_brief)], | |
| "query_brief_complete": True, | |
| "notes": state.get("notes", []) + ["Query brief successfully created"] | |
| } | |
| except Exception as e: | |
| print(f"Error in write_query_brief: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| **state, | |
| "query_brief": "", | |
| "notes": state.get("notes", []) + [f"Query brief creation failed: {str(e)}"], | |
| "error": str(e) | |
| } |