Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced interaction analysis using LangGraph for agent orchestration | |
| """ | |
| from typing import Dict, List, Any, Optional, Annotated | |
| from datetime import datetime | |
| import uuid | |
| import logging | |
| from langgraph.graph import Graph, MessageGraph | |
| from langgraph.prebuilt import ToolMessage | |
| from langgraph.graph.message import MessageState | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class InteractionAnalysisGraph: | |
| """ | |
| Orchestrates interaction analysis using LangGraph | |
| """ | |
| def __init__(self, db_service, llm_service): | |
| self.db = db_service | |
| self.llm = llm_service | |
| self.setup_tools() | |
| self.build_graph() | |
| def setup_tools(self): | |
| """Setup tools available to agents""" | |
| self.tools = { | |
| # Contact Management Tools | |
| 'find_contact': self._create_tool( | |
| self._find_contact, | |
| "Find existing contact in database", | |
| {"name": str, "company": str} | |
| ), | |
| 'create_contact': self._create_tool( | |
| self._create_contact, | |
| "Create new contact record", | |
| {"name": str, "title": str, "company": str} | |
| ), | |
| 'update_contact': self._create_tool( | |
| self._update_contact, | |
| "Update existing contact", | |
| {"id": str, "updates": dict} | |
| ), | |
| # Opportunity Tools | |
| 'find_opportunity': self._create_tool( | |
| self._find_opportunity, | |
| "Find existing opportunity", | |
| {"name": str, "account_id": str} | |
| ), | |
| 'create_opportunity': self._create_tool( | |
| self._create_opportunity, | |
| "Create new opportunity", | |
| {"name": str, "account_id": str, "value": float} | |
| ), | |
| 'update_opportunity': self._create_tool( | |
| self._update_opportunity, | |
| "Update opportunity details", | |
| {"id": str, "updates": dict} | |
| ), | |
| # Follow-up Tools | |
| 'create_follow_up': self._create_tool( | |
| self._create_follow_up, | |
| "Create follow-up action", | |
| {"title": str, "due_date": str, "assignee": str} | |
| ), | |
| 'schedule_calendar': self._create_tool( | |
| self._schedule_calendar, | |
| "Schedule calendar event", | |
| {"title": str, "date": str, "duration": int} | |
| ) | |
| } | |
| def build_graph(self): | |
| """Build the LangGraph processing graph""" | |
| workflow = Graph() | |
| # Define nodes | |
| workflow.add_node("extract_intelligence", self.extract_intelligence_node) | |
| workflow.add_node("process_contacts", self.process_contacts_node) | |
| workflow.add_node("process_opportunities", self.process_opportunities_node) | |
| workflow.add_node("process_follow_ups", self.process_follow_ups_node) | |
| workflow.add_node("generate_summary", self.generate_summary_node) | |
| # Define edges | |
| workflow.add_edge("extract_intelligence", "process_contacts") | |
| workflow.add_edge("process_contacts", "process_opportunities") | |
| workflow.add_edge("process_opportunities", "process_follow_ups") | |
| workflow.add_edge("process_follow_ups", "generate_summary") | |
| # Set entry point | |
| workflow.set_entry_point("extract_intelligence") | |
| self.workflow = workflow.compile() | |
| async def process_interaction(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process interaction through the graph | |
| """ | |
| try: | |
| # Initialize state | |
| state = MessageState( | |
| messages=[], | |
| metadata={ | |
| "interaction": interaction_data, | |
| "processed_at": datetime.now().isoformat(), | |
| "results": {} | |
| } | |
| ) | |
| # Run workflow | |
| final_state = await self.workflow.ainvoke(state) | |
| return final_state.metadata["results"] | |
| except Exception as e: | |
| logger.error(f"Graph processing failed: {str(e)}") | |
| raise | |
| async def extract_intelligence_node(self, state: MessageState) -> MessageState: | |
| """Extract structured intelligence from interaction""" | |
| interaction = state.metadata["interaction"] | |
| try: | |
| # Extract using LLM | |
| extracted = await self.llm.analyze_interaction( | |
| interaction["transcript"], | |
| self.intelligence_schema | |
| ) | |
| # Update state | |
| state.metadata["extracted"] = extracted | |
| state.messages.append( | |
| ToolMessage( | |
| content="Intelligence extracted successfully", | |
| tool_name="extract_intelligence", | |
| tool_output=extracted | |
| ) | |
| ) | |
| return state | |
| except Exception as e: | |
| logger.error(f"Intelligence extraction failed: {str(e)}") | |
| raise | |
| async def process_contacts_node(self, state: MessageState) -> MessageState: | |
| """Process and update contacts""" | |
| extracted = state.metadata["extracted"] | |
| contacts = extracted.get("contacts", []) | |
| results = {"contacts": {"new": [], "updated": []}} | |
| for contact in contacts: | |
| try: | |
| # Try to find existing contact | |
| existing = await self.tools["find_contact"]( | |
| name=contact["name"], | |
| company=contact["company"] | |
| ) | |
| if existing: | |
| # Update existing | |
| if self._should_update_contact(contact, existing): | |
| updated = await self.tools["update_contact"]( | |
| id=existing["id"], | |
| updates=contact | |
| ) | |
| results["contacts"]["updated"].append(updated) | |
| else: | |
| # Create new | |
| if self._should_create_contact(contact): | |
| new_contact = await self.tools["create_contact"]( | |
| name=contact["name"], | |
| title=contact["title"], | |
| company=contact["company"] | |
| ) | |
| results["contacts"]["new"].append(new_contact) | |
| except Exception as e: | |
| logger.error(f"Contact processing failed: {str(e)}") | |
| continue | |
| state.metadata["results"].update(results) | |
| return state | |
| async def process_opportunities_node(self, state: MessageState) -> MessageState: | |
| """Process and update opportunities""" | |
| extracted = state.metadata["extracted"] | |
| opportunities = extracted.get("opportunities", []) | |
| results = {"opportunities": {"new": [], "updated": []}} | |
| for opp in opportunities: | |
| try: | |
| # Try to find existing opportunity | |
| existing = await self.tools["find_opportunity"]( | |
| name=opp["name"], | |
| account_id=state.metadata["interaction"]["account_id"] | |
| ) | |
| if existing: | |
| # Update existing | |
| if self._should_update_opportunity(opp, existing): | |
| updated = await self.tools["update_opportunity"]( | |
| id=existing["id"], | |
| updates=opp | |
| ) | |
| results["opportunities"]["updated"].append(updated) | |
| else: | |
| # Create new | |
| if self._should_create_opportunity(opp): | |
| new_opp = await self.tools["create_opportunity"]( | |
| name=opp["name"], | |
| account_id=state.metadata["interaction"]["account_id"], | |
| value=opp.get("value", 0) | |
| ) | |
| results["opportunities"]["new"].append(new_opp) | |
| except Exception as e: | |
| logger.error(f"Opportunity processing failed: {str(e)}") | |
| continue | |
| state.metadata["results"].update(results) | |
| return state | |
| async def process_follow_ups_node(self, state: MessageState) -> MessageState: | |
| """Process follow-ups and calendar events""" | |
| extracted = state.metadata["extracted"] | |
| follow_ups = extracted.get("follow_ups", []) | |
| results = {"follow_ups": [], "calendar_events": []} | |
| for follow_up in follow_ups: | |
| try: | |
| # Create follow-up | |
| new_follow_up = await self.tools["create_follow_up"]( | |
| title=follow_up["title"], | |
| due_date=follow_up["due_date"], | |
| assignee=follow_up["assignee"] | |
| ) | |
| results["follow_ups"].append(new_follow_up) | |
| # Schedule calendar event if needed | |
| if follow_up.get("needs_calendar", False): | |
| calendar_event = await self.tools["schedule_calendar"]( | |
| title=follow_up["title"], | |
| date=follow_up["due_date"], | |
| duration=follow_up.get("duration", 30) | |
| ) | |
| results["calendar_events"].append(calendar_event) | |
| except Exception as e: | |
| logger.error(f"Follow-up processing failed: {str(e)}") | |
| continue | |
| state.metadata["results"].update(results) | |
| return state | |
| async def generate_summary_node(self, state: MessageState) -> MessageState: | |
| """Generate final summary of all updates""" | |
| results = state.metadata["results"] | |
| summary = { | |
| "changes_made": { | |
| "contacts": len(results["contacts"]["new"]) + len(results["contacts"]["updated"]), | |
| "opportunities": len(results["opportunities"]["new"]) + len(results["opportunities"]["updated"]), | |
| "follow_ups": len(results["follow_ups"]) | |
| }, | |
| "needs_attention": self._identify_attention_items(results), | |
| "next_steps": self._generate_next_steps(results) | |
| } | |
| state.metadata["results"]["summary"] = summary | |
| return state | |
| def _should_update_contact(self, new_data: Dict, existing: Dict) -> bool: | |
| """Determine if contact should be updated""" | |
| # Compare relevant fields and return True if update needed | |
| # Add user confirmation logic here | |
| return True # Placeholder | |
| def _should_create_contact(self, contact_data: Dict) -> bool: | |
| """Determine if new contact should be created""" | |
| # Add validation and user confirmation logic here | |
| return True # Placeholder | |
| def _should_update_opportunity(self, new_data: Dict, existing: Dict) -> bool: | |
| """Determine if opportunity should be updated""" | |
| # Compare relevant fields and return True if update needed | |
| # Add user confirmation logic here | |
| return True # Placeholder | |
| def _should_create_opportunity(self, opp_data: Dict) -> bool: | |
| """Determine if new opportunity should be created""" | |
| # Add validation and user confirmation logic here | |
| return True # Placeholder | |
| def _identify_attention_items(self, results: Dict) -> List[Dict]: | |
| """Identify items needing user attention""" | |
| attention_items = [] | |
| # Add logic to identify items needing review/confirmation | |
| return attention_items | |
| def _generate_next_steps(self, results: Dict) -> List[Dict]: | |
| """Generate recommended next steps""" | |
| next_steps = [] | |
| # Add logic to generate recommended actions | |
| return next_steps | |
| def intelligence_schema(self) -> Dict: | |
| """Schema for intelligence extraction""" | |
| return { | |
| "contacts": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string"}, | |
| "title": {"type": "string"}, | |
| "company": {"type": "string"}, | |
| "department": {"type": "string"}, | |
| "influence_level": {"type": "string"} | |
| } | |
| } | |
| }, | |
| "opportunities": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string"}, | |
| "type": {"type": "string"}, | |
| "value": {"type": "number"}, | |
| "stage": {"type": "string"}, | |
| "next_steps": {"type": "string"} | |
| } | |
| } | |
| }, | |
| "follow_ups": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "title": {"type": "string"}, | |
| "type": {"type": "string"}, | |
| "due_date": {"type": "string"}, | |
| "assignee": {"type": "string"}, | |
| "needs_calendar": {"type": "boolean"} | |
| } | |
| } | |
| } | |
| } |