Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| DocAssistant - LangGraph agent that routes between responding and editing | |
| Single node agent that decides whether to respond to user questions or edit the document | |
| """ | |
| import json | |
| import logging | |
| import traceback | |
| from typing import Dict, Any, List, Optional | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage | |
| from agent_states.doc_assistant_state import DocAssistantState | |
| from prompts.doc_assistant import get_router_system_prompt, get_router_user_prompt | |
| logger = logging.getLogger(__name__) | |
| class DocAssistant: | |
| """ | |
| Router agent that decides whether to respond to user questions or edit the document. | |
| Workflow: | |
| - Single node with optional tool calling | |
| - Document is in user prompt (no inspect needed) | |
| - Agent decides: respond directly OR call edit_document tool | |
| - No looping at router level (single pass) | |
| """ | |
| def __init__(self, llm, tools: List[Any] = None, tools_facade: List[Any] = None): | |
| """ | |
| Initialize the router agent. | |
| Args: | |
| llm: LLM for routing decisions (with tool calling capability) | |
| tools: Real tool implementations for lookup | |
| tools_facade: Facade tools for LLM (minimal parameters) | |
| """ | |
| self.tools = tools | |
| self.tools_facade = tools_facade if tools_facade else tools | |
| self.llm= llm | |
| # Bind facade tools to LLM - optional tool calling | |
| self.llm_with_tools = self.llm.bind_tools(self.tools_facade) | |
| logger.info("π§ DocAssistant initialized") | |
| logger.info(f"π€ Using {type(llm).__name__} for routing decisions") | |
| logger.info(f"π οΈ Tools available: {[t.name for t in self.tools_facade]}") | |
| self.workflow = self._build_workflow() | |
| def _build_workflow(self) -> StateGraph: | |
| """Build the LangGraph workflow for the router agent.""" | |
| workflow = StateGraph(DocAssistantState) | |
| workflow.add_node("agent", self._agent_node) | |
| workflow.add_node("tools", self._tools_node) | |
| workflow.set_entry_point("agent") | |
| # Conditional edge after agent: go to tools if tool calls, else END | |
| workflow.add_conditional_edges( | |
| "agent", | |
| self._should_call_tools, | |
| {"tools": "tools", "end": END} | |
| ) | |
| # Conditional edge after tools: END if edit_document was called, else continue | |
| workflow.add_conditional_edges( | |
| "tools", | |
| self._after_tools, | |
| {"end": END, "continue": "agent"} | |
| ) | |
| return workflow.compile() | |
| def _should_call_tools(self, state: DocAssistantState) -> str: | |
| """ | |
| Decide whether to call tools after agent node. | |
| Returns: | |
| "tools" if agent made tool calls, "end" otherwise | |
| """ | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| last_message = intermediate_steps[-1] | |
| # Check if last message has tool calls | |
| if hasattr(last_message, 'tool_calls') and last_message.tool_calls: | |
| tool_names = [tc['name'] for tc in last_message.tool_calls] | |
| logger.info(f"π§ Agent calling tools: {tool_names}") | |
| return "tools" | |
| return "end" | |
| async def _agent_node(self, state: DocAssistantState) -> DocAssistantState: | |
| """Agent node: Generate response or tool call based on user request.""" | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| logger.info("π― Router agent processing request...") | |
| # Build messages | |
| messages = [ | |
| SystemMessage(content=get_router_system_prompt()), | |
| HumanMessage(content=get_router_user_prompt( | |
| doc_text=state["doc_text"], | |
| instruction=state["user_instruction"], | |
| doc_summaries=state.get("doc_summaries", []), | |
| conversation_history=state.get("conversation_history", []) | |
| )) | |
| ] | |
| logger.info(f"π Document size: {len(state['doc_text'])} bytes") | |
| logger.info(f"π Instruction: {state['user_instruction'][:100]}{'...' if len(state['user_instruction']) > 100 else ''}") | |
| # Call LLM | |
| response = await self.llm_with_tools.ainvoke(messages) | |
| intermediate_steps.append(response) | |
| logger.info(f"π Response type: {type(response).__name__}") | |
| if hasattr(response, 'tool_calls') and response.tool_calls: | |
| logger.info(f"π§ Tool calls: {[tc['name'] for tc in response.tool_calls]}") | |
| else: | |
| logger.info(f"π¬ Direct response (no tool calls):{response.content}") | |
| state['message']= response.content | |
| state["intermediate_steps"] = intermediate_steps | |
| return state | |
| async def _tools_node(self, state: DocAssistantState) -> DocAssistantState: | |
| """Tools node: Execute tool calls (edit_document or retrieve_lawyer_document).""" | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| last_message = intermediate_steps[-1] | |
| if not (hasattr(last_message, 'tool_calls') and last_message.tool_calls): | |
| return state | |
| for tool_call in last_message.tool_calls: | |
| tool_name = tool_call['name'] | |
| # Get the tool function directly from self.tools (add underscore prefix) | |
| tool_func = next((t for t in self.tools if t.name == "_" + tool_name), None) | |
| if tool_func: | |
| try: | |
| args = tool_call['args'].copy() | |
| logger.info(f"Launching tool: {tool_name} with args {json.dumps(args, default=str)}") | |
| if tool_name == "edit_document": | |
| logger.info("π edit_document tool called - invoking doc_editor_agent") | |
| args["doc_text"] = state["doc_text"] | |
| args["user_instruction"] = state["user_instruction"] | |
| args["doc_summaries"] = state.get("doc_summaries", []) | |
| args["conversation_history"] = state.get("conversation_history", []) | |
| args["max_iterations"] = 10 | |
| args["document_id"] = state.get("document_id") | |
| args["user_id"] = state.get("user_id") | |
| elif tool_name == "retrieve_lawyer_document": | |
| logger.info(f"π retrieve_lawyer_document tool called: {args.get('file_path')}") | |
| if "user_id" not in args and state.get("user_id"): | |
| args["user_id"] = state["user_id"] | |
| result = await tool_func.ainvoke(args) | |
| if tool_name == "edit_document": | |
| doc_text = result['doc_text'] | |
| tool_result = f"Document was edited with this summary: {result['final_summary']}" | |
| state['modified_document'] = doc_text | |
| state['message'] = tool_result | |
| logger.info(f"β edit_document completed - ending router workflow") | |
| else: | |
| tool_result = result | |
| intermediate_steps.append( | |
| ToolMessage( | |
| content=tool_result, | |
| tool_call_id=tool_call['id'], | |
| name=tool_name | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"β Error executing {tool_name}: {str(e)}") | |
| intermediate_steps.append( | |
| ToolMessage( | |
| content=f"Error: {str(e)}", | |
| tool_call_id=tool_call['id'], | |
| name=tool_name | |
| ) | |
| ) | |
| else: | |
| logger.warning(f"β οΈ Tool function not found for {tool_name}") | |
| state["intermediate_steps"] = intermediate_steps | |
| return state | |
| def _after_tools(self, state: DocAssistantState) -> str: | |
| """ | |
| Decide whether to continue after tools node. | |
| Returns: | |
| "end" if edit_document was called (stops workflow), | |
| "continue" if retrieve_lawyer_document or query_knowledge_graph were called (allows more tool calls or response) | |
| """ | |
| intermediate_steps = state.get("intermediate_steps", []) | |
| # Check if edit_document was called | |
| for msg in reversed(intermediate_steps): | |
| if isinstance(msg, ToolMessage) and msg.name == "edit_document": | |
| logger.info("β edit_document called - ending router workflow") | |
| return "end" | |
| # If edit_document wasn't called, continue (allows agent to make more tool calls or respond) | |
| logger.info("π Continuing router workflow (edit_document not yet called)") | |
| return "continue" | |
| async def process_request( | |
| self, | |
| doc_text: str, | |
| user_instruction: str, | |
| doc_summaries: List[str] = [], | |
| conversation_history: List[Dict[str, str]] = [], | |
| document_id: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process the user's request and decide whether to respond or edit. | |
| Args: | |
| doc_text: The HTML document content | |
| user_instruction: User's instruction or question | |
| doc_summaries: Optional list of document summaries for context | |
| conversation_history: Optional conversation history | |
| document_id: Optional UUID of the document for live updates | |
| user_id: Optional user ID for authentication | |
| Returns: | |
| Dict with: | |
| - message: Response message to the user | |
| - modified_document: Modified document (if editing was done) or None | |
| - success: Boolean indicating success | |
| """ | |
| logger.info("=" * 80) | |
| logger.info("π― DOC ASSISTANT STARTING") | |
| logger.info("=" * 80) | |
| logger.info(f"π Document size: {len(doc_text)} bytes") | |
| logger.info(f"π Instruction: {user_instruction[:100]}{'...' if len(user_instruction) > 100 else ''}") | |
| logger.info(f"π Document summaries: {len(doc_summaries)}") | |
| logger.info(f"π¬ Conversation history: {len(conversation_history)} messages") | |
| try: | |
| # Initialize state | |
| initial_state = { | |
| "doc_text": doc_text, | |
| "doc_summaries": doc_summaries, | |
| "conversation_history": conversation_history, | |
| "user_instruction": user_instruction, | |
| "intermediate_steps": [], | |
| "document_id": document_id, | |
| "user_id": user_id, | |
| "modified_document": None | |
| } | |
| # Run workflow | |
| logger.info("π Invoking router workflow...") | |
| final_state = await self.workflow.ainvoke(initial_state) | |
| modified_doc=final_state.get("modified_document") | |
| message=final_state.get("message") | |
| return { | |
| "message": message, | |
| "modified_document": modified_doc, | |
| "success": True | |
| } | |
| except Exception as e: | |
| logger.error("=" * 80) | |
| logger.error("β DOC ASSISTANT FAILED") | |
| logger.error("=" * 80) | |
| logger.error(f"π Location: subagents/doc_assistant.py:{traceback.extract_tb(e.__traceback__)[-1].lineno} (process_request)") | |
| logger.error("") | |
| logger.error("π Input Parameters:") | |
| logger.error(f" - User Instruction: {user_instruction[:100] if len(user_instruction) > 100 else user_instruction}") | |
| logger.error(f" - Document Size: {len(doc_text):,} bytes") | |
| if document_id: | |
| logger.error(f" - Document ID: {document_id}") | |
| if user_id: | |
| logger.error(f" - User ID: {user_id}") | |
| logger.error(f" - Document Summaries: {len(doc_summaries)}") | |
| logger.error(f" - Conversation History: {len(conversation_history)} messages") | |
| logger.error("") | |
| logger.error("π€ LLM Configuration:") | |
| logger.error(f" - LLM: {type(self.llm).__name__}") | |
| logger.error(f" - Tools Available: {len(self.tools)}") | |
| logger.error(f" - Tool Names: {', '.join([t.name for t in self.tools])}") | |
| logger.error(f" - Facade Tools: {len(self.tools_facade)}") | |
| logger.error("") | |
| logger.error("π Workflow State:") | |
| if final_state and "intermediate_steps" in final_state: | |
| intermediate_steps = final_state["intermediate_steps"] | |
| logger.error(f" - Intermediate Steps: {len(intermediate_steps)} messages") | |
| if intermediate_steps: | |
| last_msg = intermediate_steps[-1] | |
| logger.error(f" - Last Message Type: {type(last_msg).__name__}") | |
| if hasattr(last_msg, 'tool_calls') and last_msg.tool_calls: | |
| logger.error(f" - Last Tool Calls: {', '.join([tc['name'] for tc in last_msg.tool_calls])}") | |
| logger.error("") | |
| logger.error("π Exception Details:") | |
| logger.error(f" Type: {type(e).__name__}") | |
| logger.error(f" Message: {str(e)}") | |
| logger.error("") | |
| logger.error("π Full Traceback:") | |
| logger.error(traceback.format_exc()) | |
| logger.error("") | |
| logger.error("πΎ Document Preview (first 200 chars):") | |
| logger.error(f" {doc_text[:200]}") | |
| logger.error("=" * 80) | |
| return { | |
| "message": f"Error processing request: {str(e)}", | |
| "modified_document": None, | |
| "success": False | |
| } | |