""" Enhanced interaction analysis using LangGraph for agent orchestration """ from typing import Dict, List, Any, Optional, Annotated from datetime import datetime import uuid import logging from langgraph.graph import Graph, MessageGraph from langgraph.prebuilt import ToolMessage from langgraph.graph.message import MessageState import json logger = logging.getLogger(__name__) class InteractionAnalysisGraph: """ Orchestrates interaction analysis using LangGraph """ def __init__(self, db_service, llm_service): self.db = db_service self.llm = llm_service self.setup_tools() self.build_graph() def setup_tools(self): """Setup tools available to agents""" self.tools = { # Contact Management Tools 'find_contact': self._create_tool( self._find_contact, "Find existing contact in database", {"name": str, "company": str} ), 'create_contact': self._create_tool( self._create_contact, "Create new contact record", {"name": str, "title": str, "company": str} ), 'update_contact': self._create_tool( self._update_contact, "Update existing contact", {"id": str, "updates": dict} ), # Opportunity Tools 'find_opportunity': self._create_tool( self._find_opportunity, "Find existing opportunity", {"name": str, "account_id": str} ), 'create_opportunity': self._create_tool( self._create_opportunity, "Create new opportunity", {"name": str, "account_id": str, "value": float} ), 'update_opportunity': self._create_tool( self._update_opportunity, "Update opportunity details", {"id": str, "updates": dict} ), # Follow-up Tools 'create_follow_up': self._create_tool( self._create_follow_up, "Create follow-up action", {"title": str, "due_date": str, "assignee": str} ), 'schedule_calendar': self._create_tool( self._schedule_calendar, "Schedule calendar event", {"title": str, "date": str, "duration": int} ) } def build_graph(self): """Build the LangGraph processing graph""" workflow = Graph() # Define nodes workflow.add_node("extract_intelligence", self.extract_intelligence_node) workflow.add_node("process_contacts", self.process_contacts_node) workflow.add_node("process_opportunities", self.process_opportunities_node) workflow.add_node("process_follow_ups", self.process_follow_ups_node) workflow.add_node("generate_summary", self.generate_summary_node) # Define edges workflow.add_edge("extract_intelligence", "process_contacts") workflow.add_edge("process_contacts", "process_opportunities") workflow.add_edge("process_opportunities", "process_follow_ups") workflow.add_edge("process_follow_ups", "generate_summary") # Set entry point workflow.set_entry_point("extract_intelligence") self.workflow = workflow.compile() async def process_interaction(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]: """ Process interaction through the graph """ try: # Initialize state state = MessageState( messages=[], metadata={ "interaction": interaction_data, "processed_at": datetime.now().isoformat(), "results": {} } ) # Run workflow final_state = await self.workflow.ainvoke(state) return final_state.metadata["results"] except Exception as e: logger.error(f"Graph processing failed: {str(e)}") raise async def extract_intelligence_node(self, state: MessageState) -> MessageState: """Extract structured intelligence from interaction""" interaction = state.metadata["interaction"] try: # Extract using LLM extracted = await self.llm.analyze_interaction( interaction["transcript"], self.intelligence_schema ) # Update state state.metadata["extracted"] = extracted state.messages.append( ToolMessage( content="Intelligence extracted successfully", tool_name="extract_intelligence", tool_output=extracted ) ) return state except Exception as e: logger.error(f"Intelligence extraction failed: {str(e)}") raise async def process_contacts_node(self, state: MessageState) -> MessageState: """Process and update contacts""" extracted = state.metadata["extracted"] contacts = extracted.get("contacts", []) results = {"contacts": {"new": [], "updated": []}} for contact in contacts: try: # Try to find existing contact existing = await self.tools["find_contact"]( name=contact["name"], company=contact["company"] ) if existing: # Update existing if self._should_update_contact(contact, existing): updated = await self.tools["update_contact"]( id=existing["id"], updates=contact ) results["contacts"]["updated"].append(updated) else: # Create new if self._should_create_contact(contact): new_contact = await self.tools["create_contact"]( name=contact["name"], title=contact["title"], company=contact["company"] ) results["contacts"]["new"].append(new_contact) except Exception as e: logger.error(f"Contact processing failed: {str(e)}") continue state.metadata["results"].update(results) return state async def process_opportunities_node(self, state: MessageState) -> MessageState: """Process and update opportunities""" extracted = state.metadata["extracted"] opportunities = extracted.get("opportunities", []) results = {"opportunities": {"new": [], "updated": []}} for opp in opportunities: try: # Try to find existing opportunity existing = await self.tools["find_opportunity"]( name=opp["name"], account_id=state.metadata["interaction"]["account_id"] ) if existing: # Update existing if self._should_update_opportunity(opp, existing): updated = await self.tools["update_opportunity"]( id=existing["id"], updates=opp ) results["opportunities"]["updated"].append(updated) else: # Create new if self._should_create_opportunity(opp): new_opp = await self.tools["create_opportunity"]( name=opp["name"], account_id=state.metadata["interaction"]["account_id"], value=opp.get("value", 0) ) results["opportunities"]["new"].append(new_opp) except Exception as e: logger.error(f"Opportunity processing failed: {str(e)}") continue state.metadata["results"].update(results) return state async def process_follow_ups_node(self, state: MessageState) -> MessageState: """Process follow-ups and calendar events""" extracted = state.metadata["extracted"] follow_ups = extracted.get("follow_ups", []) results = {"follow_ups": [], "calendar_events": []} for follow_up in follow_ups: try: # Create follow-up new_follow_up = await self.tools["create_follow_up"]( title=follow_up["title"], due_date=follow_up["due_date"], assignee=follow_up["assignee"] ) results["follow_ups"].append(new_follow_up) # Schedule calendar event if needed if follow_up.get("needs_calendar", False): calendar_event = await self.tools["schedule_calendar"]( title=follow_up["title"], date=follow_up["due_date"], duration=follow_up.get("duration", 30) ) results["calendar_events"].append(calendar_event) except Exception as e: logger.error(f"Follow-up processing failed: {str(e)}") continue state.metadata["results"].update(results) return state async def generate_summary_node(self, state: MessageState) -> MessageState: """Generate final summary of all updates""" results = state.metadata["results"] summary = { "changes_made": { "contacts": len(results["contacts"]["new"]) + len(results["contacts"]["updated"]), "opportunities": len(results["opportunities"]["new"]) + len(results["opportunities"]["updated"]), "follow_ups": len(results["follow_ups"]) }, "needs_attention": self._identify_attention_items(results), "next_steps": self._generate_next_steps(results) } state.metadata["results"]["summary"] = summary return state def _should_update_contact(self, new_data: Dict, existing: Dict) -> bool: """Determine if contact should be updated""" # Compare relevant fields and return True if update needed # Add user confirmation logic here return True # Placeholder def _should_create_contact(self, contact_data: Dict) -> bool: """Determine if new contact should be created""" # Add validation and user confirmation logic here return True # Placeholder def _should_update_opportunity(self, new_data: Dict, existing: Dict) -> bool: """Determine if opportunity should be updated""" # Compare relevant fields and return True if update needed # Add user confirmation logic here return True # Placeholder def _should_create_opportunity(self, opp_data: Dict) -> bool: """Determine if new opportunity should be created""" # Add validation and user confirmation logic here return True # Placeholder def _identify_attention_items(self, results: Dict) -> List[Dict]: """Identify items needing user attention""" attention_items = [] # Add logic to identify items needing review/confirmation return attention_items def _generate_next_steps(self, results: Dict) -> List[Dict]: """Generate recommended next steps""" next_steps = [] # Add logic to generate recommended actions return next_steps @property def intelligence_schema(self) -> Dict: """Schema for intelligence extraction""" return { "contacts": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "title": {"type": "string"}, "company": {"type": "string"}, "department": {"type": "string"}, "influence_level": {"type": "string"} } } }, "opportunities": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "type": {"type": "string"}, "value": {"type": "number"}, "stage": {"type": "string"}, "next_steps": {"type": "string"} } } }, "follow_ups": { "type": "array", "items": { "type": "object", "properties": { "title": {"type": "string"}, "type": {"type": "string"}, "due_date": {"type": "string"}, "assignee": {"type": "string"}, "needs_calendar": {"type": "boolean"} } } } }