File size: 13,804 Bytes
51ce190
1340b5b
51ce190
1340b5b
51ce190
1340b5b
 
 
 
 
 
51ce190
 
 
1340b5b
 
 
 
 
 
 
 
 
51ce190
1340b5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51ce190
1340b5b
 
 
 
51ce190
1340b5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51ce190
1340b5b
 
 
 
 
51ce190
 
1340b5b
 
 
 
 
 
 
 
 
51ce190
1340b5b
 
51ce190
1340b5b
51ce190
 
1340b5b
51ce190
 
1340b5b
 
 
51ce190
 
1340b5b
 
 
 
 
51ce190
1340b5b
 
 
 
 
 
 
 
 
51ce190
1340b5b
51ce190
 
1340b5b
51ce190
 
1340b5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51ce190
1340b5b
 
 
 
51ce190
1340b5b
 
 
 
 
51ce190
1340b5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""
Enhanced interaction analysis using LangGraph for agent orchestration
"""
from typing import Dict, List, Any, Optional, Annotated
from datetime import datetime
import uuid
import logging
from langgraph.graph import Graph, MessageGraph
from langgraph.prebuilt import ToolMessage
from langgraph.graph.message import MessageState
import json

logger = logging.getLogger(__name__)

class InteractionAnalysisGraph:
    """
    Orchestrates interaction analysis using LangGraph
    """
    def __init__(self, db_service, llm_service):
        self.db = db_service
        self.llm = llm_service
        self.setup_tools()
        self.build_graph()
    
    def setup_tools(self):
        """Setup tools available to agents"""
        self.tools = {
            # Contact Management Tools
            'find_contact': self._create_tool(
                self._find_contact,
                "Find existing contact in database",
                {"name": str, "company": str}
            ),
            'create_contact': self._create_tool(
                self._create_contact,
                "Create new contact record",
                {"name": str, "title": str, "company": str}
            ),
            'update_contact': self._create_tool(
                self._update_contact,
                "Update existing contact",
                {"id": str, "updates": dict}
            ),
            
            # Opportunity Tools
            'find_opportunity': self._create_tool(
                self._find_opportunity,
                "Find existing opportunity",
                {"name": str, "account_id": str}
            ),
            'create_opportunity': self._create_tool(
                self._create_opportunity,
                "Create new opportunity",
                {"name": str, "account_id": str, "value": float}
            ),
            'update_opportunity': self._create_tool(
                self._update_opportunity,
                "Update opportunity details",
                {"id": str, "updates": dict}
            ),
            
            # Follow-up Tools
            'create_follow_up': self._create_tool(
                self._create_follow_up,
                "Create follow-up action",
                {"title": str, "due_date": str, "assignee": str}
            ),
            'schedule_calendar': self._create_tool(
                self._schedule_calendar,
                "Schedule calendar event",
                {"title": str, "date": str, "duration": int}
            )
        }
    
    def build_graph(self):
        """Build the LangGraph processing graph"""
        workflow = Graph()

        # Define nodes
        workflow.add_node("extract_intelligence", self.extract_intelligence_node)
        workflow.add_node("process_contacts", self.process_contacts_node)
        workflow.add_node("process_opportunities", self.process_opportunities_node)
        workflow.add_node("process_follow_ups", self.process_follow_ups_node)
        workflow.add_node("generate_summary", self.generate_summary_node)

        # Define edges
        workflow.add_edge("extract_intelligence", "process_contacts")
        workflow.add_edge("process_contacts", "process_opportunities")
        workflow.add_edge("process_opportunities", "process_follow_ups")
        workflow.add_edge("process_follow_ups", "generate_summary")

        # Set entry point
        workflow.set_entry_point("extract_intelligence")
        
        self.workflow = workflow.compile()
    
    async def process_interaction(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process interaction through the graph
        """
        try:
            # Initialize state
            state = MessageState(
                messages=[],
                metadata={
                    "interaction": interaction_data,
                    "processed_at": datetime.now().isoformat(),
                    "results": {}
                }
            )
            
            # Run workflow
            final_state = await self.workflow.ainvoke(state)
            
            return final_state.metadata["results"]
            
        except Exception as e:
            logger.error(f"Graph processing failed: {str(e)}")
            raise

    async def extract_intelligence_node(self, state: MessageState) -> MessageState:
        """Extract structured intelligence from interaction"""
        interaction = state.metadata["interaction"]
        
        try:
            # Extract using LLM
            extracted = await self.llm.analyze_interaction(
                interaction["transcript"],
                self.intelligence_schema
            )
            
            # Update state
            state.metadata["extracted"] = extracted
            state.messages.append(
                ToolMessage(
                    content="Intelligence extracted successfully",
                    tool_name="extract_intelligence",
                    tool_output=extracted
                )
            )
            
            return state
            
        except Exception as e:
            logger.error(f"Intelligence extraction failed: {str(e)}")
            raise

    async def process_contacts_node(self, state: MessageState) -> MessageState:
        """Process and update contacts"""
        extracted = state.metadata["extracted"]
        contacts = extracted.get("contacts", [])
        results = {"contacts": {"new": [], "updated": []}}
        
        for contact in contacts:
            try:
                # Try to find existing contact
                existing = await self.tools["find_contact"](
                    name=contact["name"],
                    company=contact["company"]
                )
                
                if existing:
                    # Update existing
                    if self._should_update_contact(contact, existing):
                        updated = await self.tools["update_contact"](
                            id=existing["id"],
                            updates=contact
                        )
                        results["contacts"]["updated"].append(updated)
                else:
                    # Create new
                    if self._should_create_contact(contact):
                        new_contact = await self.tools["create_contact"](
                            name=contact["name"],
                            title=contact["title"],
                            company=contact["company"]
                        )
                        results["contacts"]["new"].append(new_contact)
                
            except Exception as e:
                logger.error(f"Contact processing failed: {str(e)}")
                continue
        
        state.metadata["results"].update(results)
        return state

    async def process_opportunities_node(self, state: MessageState) -> MessageState:
        """Process and update opportunities"""
        extracted = state.metadata["extracted"]
        opportunities = extracted.get("opportunities", [])
        results = {"opportunities": {"new": [], "updated": []}}
        
        for opp in opportunities:
            try:
                # Try to find existing opportunity
                existing = await self.tools["find_opportunity"](
                    name=opp["name"],
                    account_id=state.metadata["interaction"]["account_id"]
                )
                
                if existing:
                    # Update existing
                    if self._should_update_opportunity(opp, existing):
                        updated = await self.tools["update_opportunity"](
                            id=existing["id"],
                            updates=opp
                        )
                        results["opportunities"]["updated"].append(updated)
                else:
                    # Create new
                    if self._should_create_opportunity(opp):
                        new_opp = await self.tools["create_opportunity"](
                            name=opp["name"],
                            account_id=state.metadata["interaction"]["account_id"],
                            value=opp.get("value", 0)
                        )
                        results["opportunities"]["new"].append(new_opp)
                
            except Exception as e:
                logger.error(f"Opportunity processing failed: {str(e)}")
                continue
        
        state.metadata["results"].update(results)
        return state

    async def process_follow_ups_node(self, state: MessageState) -> MessageState:
        """Process follow-ups and calendar events"""
        extracted = state.metadata["extracted"]
        follow_ups = extracted.get("follow_ups", [])
        results = {"follow_ups": [], "calendar_events": []}
        
        for follow_up in follow_ups:
            try:
                # Create follow-up
                new_follow_up = await self.tools["create_follow_up"](
                    title=follow_up["title"],
                    due_date=follow_up["due_date"],
                    assignee=follow_up["assignee"]
                )
                results["follow_ups"].append(new_follow_up)
                
                # Schedule calendar event if needed
                if follow_up.get("needs_calendar", False):
                    calendar_event = await self.tools["schedule_calendar"](
                        title=follow_up["title"],
                        date=follow_up["due_date"],
                        duration=follow_up.get("duration", 30)
                    )
                    results["calendar_events"].append(calendar_event)
                
            except Exception as e:
                logger.error(f"Follow-up processing failed: {str(e)}")
                continue
        
        state.metadata["results"].update(results)
        return state

    async def generate_summary_node(self, state: MessageState) -> MessageState:
        """Generate final summary of all updates"""
        results = state.metadata["results"]
        
        summary = {
            "changes_made": {
                "contacts": len(results["contacts"]["new"]) + len(results["contacts"]["updated"]),
                "opportunities": len(results["opportunities"]["new"]) + len(results["opportunities"]["updated"]),
                "follow_ups": len(results["follow_ups"])
            },
            "needs_attention": self._identify_attention_items(results),
            "next_steps": self._generate_next_steps(results)
        }
        
        state.metadata["results"]["summary"] = summary
        return state

    def _should_update_contact(self, new_data: Dict, existing: Dict) -> bool:
        """Determine if contact should be updated"""
        # Compare relevant fields and return True if update needed
        # Add user confirmation logic here
        return True  # Placeholder

    def _should_create_contact(self, contact_data: Dict) -> bool:
        """Determine if new contact should be created"""
        # Add validation and user confirmation logic here
        return True  # Placeholder

    def _should_update_opportunity(self, new_data: Dict, existing: Dict) -> bool:
        """Determine if opportunity should be updated"""
        # Compare relevant fields and return True if update needed
        # Add user confirmation logic here
        return True  # Placeholder

    def _should_create_opportunity(self, opp_data: Dict) -> bool:
        """Determine if new opportunity should be created"""
        # Add validation and user confirmation logic here
        return True  # Placeholder

    def _identify_attention_items(self, results: Dict) -> List[Dict]:
        """Identify items needing user attention"""
        attention_items = []
        
        # Add logic to identify items needing review/confirmation
        
        return attention_items

    def _generate_next_steps(self, results: Dict) -> List[Dict]:
        """Generate recommended next steps"""
        next_steps = []
        
        # Add logic to generate recommended actions
        
        return next_steps

    @property
    def intelligence_schema(self) -> Dict:
        """Schema for intelligence extraction"""
        return {
            "contacts": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "title": {"type": "string"},
                        "company": {"type": "string"},
                        "department": {"type": "string"},
                        "influence_level": {"type": "string"}
                    }
                }
            },
            "opportunities": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "type": {"type": "string"},
                        "value": {"type": "number"},
                        "stage": {"type": "string"},
                        "next_steps": {"type": "string"}
                    }
                }
            },
            "follow_ups": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "title": {"type": "string"},
                        "type": {"type": "string"},
                        "due_date": {"type": "string"},
                        "assignee": {"type": "string"},
                        "needs_calendar": {"type": "boolean"}
                    }
                }
            }
        }