cryogenic22 commited on
Commit
1340b5b
·
verified ·
1 Parent(s): 789b5e7

Update src/ai/services/intelligence_extractor.py

Browse files
src/ai/services/intelligence_extractor.py CHANGED
@@ -1,171 +1,356 @@
1
  """
2
- src/ai/services/intelligence_extractor.py
3
- Comprehensive business intelligence extraction from call transcripts
4
  """
5
- import logging
6
- from typing import Dict, Any, List
7
  from datetime import datetime
 
 
 
 
 
 
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
- class IntelligenceExtractor:
12
- """Extract comprehensive business intelligence from interaction transcripts"""
 
 
 
 
 
 
 
13
 
14
- def __init__(self, llm_service):
15
- self.llm_service = llm_service
16
- self.setup_extraction_schema()
17
-
18
- def setup_extraction_schema(self):
19
- """Setup the comprehensive extraction schema"""
20
- self.extraction_schema = {
21
- "business_relationships": {
22
- "stakeholders": {
23
- "mentioned_contacts": List[Dict], # Names, titles, influence levels
24
- "new_contacts": List[Dict], # Previously unknown contacts
25
- "relationship_changes": List[Dict], # Changes in relationships/roles
26
- "decision_makers": List[Dict], # Identified decision makers
27
- "influence_network": Dict # Relationship mapping
28
- },
29
- "team_intelligence": {
30
- "internal_stakeholders": List[Dict], # Our team members mentioned
31
- "role_changes": List[Dict], # Team role updates
32
- "expertise_needed": List[str] # Required expertise identified
33
- }
34
- },
35
- "opportunities": {
36
- "new_opportunities": List[Dict], # New business opportunities
37
- "existing_updates": List[Dict], # Updates to existing opportunities
38
- "cross_sell": List[Dict], # Cross-sell opportunities
39
- "upsell": List[Dict], # Upsell opportunities
40
- "risk_factors": List[Dict], # Identified risks
41
- "competition_intel": List[Dict] # Competitive intelligence
42
- },
43
- "project_intelligence": {
44
- "active_projects": List[Dict], # Current project updates
45
- "project_health": List[Dict], # Project status/health
46
- "resource_needs": List[Dict], # Resource requirements
47
- "timeline_updates": List[Dict], # Schedule changes
48
- "success_metrics": List[Dict] # Performance indicators
49
- },
50
- "client_intelligence": {
51
- "pain_points": List[Dict], # Client challenges
52
- "strategic_initiatives": List[Dict], # Client's strategic plans
53
- "budget_cycles": Dict, # Budget timing/constraints
54
- "technology_stack": List[str], # Tech infrastructure
55
- "organizational_changes": List[Dict] # Org changes at client
56
- },
57
- "market_intelligence": {
58
- "industry_trends": List[Dict], # Market trends mentioned
59
- "competitor_mentions": List[Dict], # Competitor information
60
- "regulatory_updates": List[Dict], # Compliance/regulatory info
61
- "market_challenges": List[Dict] # Industry challenges
62
- },
63
- "follow_up_actions": {
64
- "meetings": List[Dict], # Scheduled/requested meetings
65
- "action_items": List[Dict], # Specific tasks
66
- "deliverables": List[Dict], # Expected deliverables
67
- "proposals_needed": List[Dict], # Required proposals
68
- "approvals_required": List[Dict] # Needed approvals
69
- },
70
- "sentiment_analysis": {
71
- "overall_sentiment": float, # Call sentiment score
72
- "topic_sentiment": Dict, # Sentiment by topic
73
- "risk_signals": List[Dict], # Potential issues
74
- "opportunity_signals": List[Dict] # Positive indicators
75
- },
76
- "technical_requirements": {
77
- "integration_needs": List[Dict], # Integration requirements
78
- "technical_challenges": List[Dict], # Technical issues
79
- "infrastructure_updates": List[Dict], # Infrastructure needs
80
- "security_requirements": List[Dict] # Security considerations
81
- }
82
  }
 
 
 
 
83
 
84
- async def extract_intelligence(self,
85
- transcript: str,
86
- context: Dict[str, Any]) -> Dict[str, Any]:
87
- """
88
- Extract comprehensive intelligence from transcript
 
 
 
 
 
 
 
 
 
 
89
 
90
- Args:
91
- transcript: Call transcript text
92
- context: Additional context (account history, etc.)
93
-
94
- Returns:
95
- Dict containing extracted intelligence
96
  """
97
  try:
98
- # Extract intelligence using LLM
99
- prompt = self._generate_extraction_prompt(transcript, context)
100
- analysis = await self.llm_service.analyze_with_context(prompt, self.extraction_schema)
 
 
 
 
 
 
101
 
102
- # Post-process and link entities
103
- processed_results = self._process_extracted_intelligence(analysis, context)
104
 
105
- return processed_results
106
 
107
  except Exception as e:
108
- logger.error(f"Intelligence extraction failed: {str(e)}")
109
  raise
110
 
111
- def _generate_extraction_prompt(self, transcript: str, context: Dict) -> str:
112
- """Generate context-aware extraction prompt"""
113
- return f"""
114
- Analyze this sales interaction transcript and extract comprehensive business intelligence.
115
- Consider all aspects of account management, relationships, opportunities, and market dynamics.
116
-
117
- Account Context:
118
- - Client: {context.get('account_name')}
119
- - Industry: {context.get('industry')}
120
- - Relationship Status: {context.get('relationship_status')}
121
 
122
- Focus on:
123
- 1. Relationship mapping and stakeholder influence
124
- 2. Opportunity identification and updates
125
- 3. Project intelligence and health
126
- 4. Market and competitive insights
127
- 5. Follow-up actions and next steps
128
- 6. Technical and integration requirements
129
-
130
- Transcript:
131
- {transcript}
132
- """
133
-
134
- def _process_extracted_intelligence(self,
135
- analysis: Dict[str, Any],
136
- context: Dict[str, Any]) -> Dict[str, Any]:
137
- """Process and link extracted intelligence"""
138
  try:
139
- # Enhance with relationship links
140
- self._link_stakeholders(analysis, context)
141
-
142
- # Map to existing opportunities
143
- self._map_opportunities(analysis, context)
144
-
145
- # Update project intelligence
146
- self._update_project_status(analysis, context)
147
 
148
- # Generate action recommendations
149
- self._generate_recommendations(analysis)
 
 
 
 
 
 
 
150
 
151
- return analysis
152
 
153
  except Exception as e:
154
- logger.error(f"Intelligence processing failed: {str(e)}")
155
  raise
156
 
157
- def _link_stakeholders(self, analysis: Dict, context: Dict) -> None:
158
- """Link mentioned stakeholders with CRM data"""
159
- # Implementation for stakeholder linking
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
 
161
- def _map_opportunities(self, analysis: Dict, context: Dict) -> None:
162
- """Map extracted opportunities to existing pipeline"""
163
- # Implementation for opportunity mapping
 
164
 
165
- def _update_project_status(self, analysis: Dict, context: Dict) -> None:
166
- """Update project status based on new intelligence"""
167
- # Implementation for project updates
 
 
168
 
169
- def _generate_recommendations(self, analysis: Dict) -> None:
170
- """Generate action recommendations based on intelligence"""
171
- # Implementation for recommendation generation
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Enhanced interaction analysis using LangGraph for agent orchestration
 
3
  """
4
+ from typing import Dict, List, Any, Optional, Annotated
 
5
  from datetime import datetime
6
+ import uuid
7
+ import logging
8
+ from langgraph.graph import Graph, MessageGraph
9
+ from langgraph.prebuilt import ToolMessage
10
+ from langgraph.graph.message import MessageState
11
+ import json
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
+ class InteractionAnalysisGraph:
16
+ """
17
+ Orchestrates interaction analysis using LangGraph
18
+ """
19
+ def __init__(self, db_service, llm_service):
20
+ self.db = db_service
21
+ self.llm = llm_service
22
+ self.setup_tools()
23
+ self.build_graph()
24
 
25
+ def setup_tools(self):
26
+ """Setup tools available to agents"""
27
+ self.tools = {
28
+ # Contact Management Tools
29
+ 'find_contact': self._create_tool(
30
+ self._find_contact,
31
+ "Find existing contact in database",
32
+ {"name": str, "company": str}
33
+ ),
34
+ 'create_contact': self._create_tool(
35
+ self._create_contact,
36
+ "Create new contact record",
37
+ {"name": str, "title": str, "company": str}
38
+ ),
39
+ 'update_contact': self._create_tool(
40
+ self._update_contact,
41
+ "Update existing contact",
42
+ {"id": str, "updates": dict}
43
+ ),
44
+
45
+ # Opportunity Tools
46
+ 'find_opportunity': self._create_tool(
47
+ self._find_opportunity,
48
+ "Find existing opportunity",
49
+ {"name": str, "account_id": str}
50
+ ),
51
+ 'create_opportunity': self._create_tool(
52
+ self._create_opportunity,
53
+ "Create new opportunity",
54
+ {"name": str, "account_id": str, "value": float}
55
+ ),
56
+ 'update_opportunity': self._create_tool(
57
+ self._update_opportunity,
58
+ "Update opportunity details",
59
+ {"id": str, "updates": dict}
60
+ ),
61
+
62
+ # Follow-up Tools
63
+ 'create_follow_up': self._create_tool(
64
+ self._create_follow_up,
65
+ "Create follow-up action",
66
+ {"title": str, "due_date": str, "assignee": str}
67
+ ),
68
+ 'schedule_calendar': self._create_tool(
69
+ self._schedule_calendar,
70
+ "Schedule calendar event",
71
+ {"title": str, "date": str, "duration": int}
72
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  }
74
+
75
+ def build_graph(self):
76
+ """Build the LangGraph processing graph"""
77
+ workflow = Graph()
78
 
79
+ # Define nodes
80
+ workflow.add_node("extract_intelligence", self.extract_intelligence_node)
81
+ workflow.add_node("process_contacts", self.process_contacts_node)
82
+ workflow.add_node("process_opportunities", self.process_opportunities_node)
83
+ workflow.add_node("process_follow_ups", self.process_follow_ups_node)
84
+ workflow.add_node("generate_summary", self.generate_summary_node)
85
+
86
+ # Define edges
87
+ workflow.add_edge("extract_intelligence", "process_contacts")
88
+ workflow.add_edge("process_contacts", "process_opportunities")
89
+ workflow.add_edge("process_opportunities", "process_follow_ups")
90
+ workflow.add_edge("process_follow_ups", "generate_summary")
91
+
92
+ # Set entry point
93
+ workflow.set_entry_point("extract_intelligence")
94
 
95
+ self.workflow = workflow.compile()
96
+
97
+ async def process_interaction(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]:
98
+ """
99
+ Process interaction through the graph
 
100
  """
101
  try:
102
+ # Initialize state
103
+ state = MessageState(
104
+ messages=[],
105
+ metadata={
106
+ "interaction": interaction_data,
107
+ "processed_at": datetime.now().isoformat(),
108
+ "results": {}
109
+ }
110
+ )
111
 
112
+ # Run workflow
113
+ final_state = await self.workflow.ainvoke(state)
114
 
115
+ return final_state.metadata["results"]
116
 
117
  except Exception as e:
118
+ logger.error(f"Graph processing failed: {str(e)}")
119
  raise
120
 
121
+ async def extract_intelligence_node(self, state: MessageState) -> MessageState:
122
+ """Extract structured intelligence from interaction"""
123
+ interaction = state.metadata["interaction"]
 
 
 
 
 
 
 
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  try:
126
+ # Extract using LLM
127
+ extracted = await self.llm.analyze_interaction(
128
+ interaction["transcript"],
129
+ self.intelligence_schema
130
+ )
 
 
 
131
 
132
+ # Update state
133
+ state.metadata["extracted"] = extracted
134
+ state.messages.append(
135
+ ToolMessage(
136
+ content="Intelligence extracted successfully",
137
+ tool_name="extract_intelligence",
138
+ tool_output=extracted
139
+ )
140
+ )
141
 
142
+ return state
143
 
144
  except Exception as e:
145
+ logger.error(f"Intelligence extraction failed: {str(e)}")
146
  raise
147
 
148
+ async def process_contacts_node(self, state: MessageState) -> MessageState:
149
+ """Process and update contacts"""
150
+ extracted = state.metadata["extracted"]
151
+ contacts = extracted.get("contacts", [])
152
+ results = {"contacts": {"new": [], "updated": []}}
153
+
154
+ for contact in contacts:
155
+ try:
156
+ # Try to find existing contact
157
+ existing = await self.tools["find_contact"](
158
+ name=contact["name"],
159
+ company=contact["company"]
160
+ )
161
+
162
+ if existing:
163
+ # Update existing
164
+ if self._should_update_contact(contact, existing):
165
+ updated = await self.tools["update_contact"](
166
+ id=existing["id"],
167
+ updates=contact
168
+ )
169
+ results["contacts"]["updated"].append(updated)
170
+ else:
171
+ # Create new
172
+ if self._should_create_contact(contact):
173
+ new_contact = await self.tools["create_contact"](
174
+ name=contact["name"],
175
+ title=contact["title"],
176
+ company=contact["company"]
177
+ )
178
+ results["contacts"]["new"].append(new_contact)
179
+
180
+ except Exception as e:
181
+ logger.error(f"Contact processing failed: {str(e)}")
182
+ continue
183
+
184
+ state.metadata["results"].update(results)
185
+ return state
186
+
187
+ async def process_opportunities_node(self, state: MessageState) -> MessageState:
188
+ """Process and update opportunities"""
189
+ extracted = state.metadata["extracted"]
190
+ opportunities = extracted.get("opportunities", [])
191
+ results = {"opportunities": {"new": [], "updated": []}}
192
+
193
+ for opp in opportunities:
194
+ try:
195
+ # Try to find existing opportunity
196
+ existing = await self.tools["find_opportunity"](
197
+ name=opp["name"],
198
+ account_id=state.metadata["interaction"]["account_id"]
199
+ )
200
+
201
+ if existing:
202
+ # Update existing
203
+ if self._should_update_opportunity(opp, existing):
204
+ updated = await self.tools["update_opportunity"](
205
+ id=existing["id"],
206
+ updates=opp
207
+ )
208
+ results["opportunities"]["updated"].append(updated)
209
+ else:
210
+ # Create new
211
+ if self._should_create_opportunity(opp):
212
+ new_opp = await self.tools["create_opportunity"](
213
+ name=opp["name"],
214
+ account_id=state.metadata["interaction"]["account_id"],
215
+ value=opp.get("value", 0)
216
+ )
217
+ results["opportunities"]["new"].append(new_opp)
218
+
219
+ except Exception as e:
220
+ logger.error(f"Opportunity processing failed: {str(e)}")
221
+ continue
222
+
223
+ state.metadata["results"].update(results)
224
+ return state
225
+
226
+ async def process_follow_ups_node(self, state: MessageState) -> MessageState:
227
+ """Process follow-ups and calendar events"""
228
+ extracted = state.metadata["extracted"]
229
+ follow_ups = extracted.get("follow_ups", [])
230
+ results = {"follow_ups": [], "calendar_events": []}
231
+
232
+ for follow_up in follow_ups:
233
+ try:
234
+ # Create follow-up
235
+ new_follow_up = await self.tools["create_follow_up"](
236
+ title=follow_up["title"],
237
+ due_date=follow_up["due_date"],
238
+ assignee=follow_up["assignee"]
239
+ )
240
+ results["follow_ups"].append(new_follow_up)
241
+
242
+ # Schedule calendar event if needed
243
+ if follow_up.get("needs_calendar", False):
244
+ calendar_event = await self.tools["schedule_calendar"](
245
+ title=follow_up["title"],
246
+ date=follow_up["due_date"],
247
+ duration=follow_up.get("duration", 30)
248
+ )
249
+ results["calendar_events"].append(calendar_event)
250
+
251
+ except Exception as e:
252
+ logger.error(f"Follow-up processing failed: {str(e)}")
253
+ continue
254
+
255
+ state.metadata["results"].update(results)
256
+ return state
257
+
258
+ async def generate_summary_node(self, state: MessageState) -> MessageState:
259
+ """Generate final summary of all updates"""
260
+ results = state.metadata["results"]
261
+
262
+ summary = {
263
+ "changes_made": {
264
+ "contacts": len(results["contacts"]["new"]) + len(results["contacts"]["updated"]),
265
+ "opportunities": len(results["opportunities"]["new"]) + len(results["opportunities"]["updated"]),
266
+ "follow_ups": len(results["follow_ups"])
267
+ },
268
+ "needs_attention": self._identify_attention_items(results),
269
+ "next_steps": self._generate_next_steps(results)
270
+ }
271
+
272
+ state.metadata["results"]["summary"] = summary
273
+ return state
274
+
275
+ def _should_update_contact(self, new_data: Dict, existing: Dict) -> bool:
276
+ """Determine if contact should be updated"""
277
+ # Compare relevant fields and return True if update needed
278
+ # Add user confirmation logic here
279
+ return True # Placeholder
280
 
281
+ def _should_create_contact(self, contact_data: Dict) -> bool:
282
+ """Determine if new contact should be created"""
283
+ # Add validation and user confirmation logic here
284
+ return True # Placeholder
285
 
286
+ def _should_update_opportunity(self, new_data: Dict, existing: Dict) -> bool:
287
+ """Determine if opportunity should be updated"""
288
+ # Compare relevant fields and return True if update needed
289
+ # Add user confirmation logic here
290
+ return True # Placeholder
291
 
292
+ def _should_create_opportunity(self, opp_data: Dict) -> bool:
293
+ """Determine if new opportunity should be created"""
294
+ # Add validation and user confirmation logic here
295
+ return True # Placeholder
296
+
297
+ def _identify_attention_items(self, results: Dict) -> List[Dict]:
298
+ """Identify items needing user attention"""
299
+ attention_items = []
300
+
301
+ # Add logic to identify items needing review/confirmation
302
+
303
+ return attention_items
304
+
305
+ def _generate_next_steps(self, results: Dict) -> List[Dict]:
306
+ """Generate recommended next steps"""
307
+ next_steps = []
308
+
309
+ # Add logic to generate recommended actions
310
+
311
+ return next_steps
312
+
313
+ @property
314
+ def intelligence_schema(self) -> Dict:
315
+ """Schema for intelligence extraction"""
316
+ return {
317
+ "contacts": {
318
+ "type": "array",
319
+ "items": {
320
+ "type": "object",
321
+ "properties": {
322
+ "name": {"type": "string"},
323
+ "title": {"type": "string"},
324
+ "company": {"type": "string"},
325
+ "department": {"type": "string"},
326
+ "influence_level": {"type": "string"}
327
+ }
328
+ }
329
+ },
330
+ "opportunities": {
331
+ "type": "array",
332
+ "items": {
333
+ "type": "object",
334
+ "properties": {
335
+ "name": {"type": "string"},
336
+ "type": {"type": "string"},
337
+ "value": {"type": "number"},
338
+ "stage": {"type": "string"},
339
+ "next_steps": {"type": "string"}
340
+ }
341
+ }
342
+ },
343
+ "follow_ups": {
344
+ "type": "array",
345
+ "items": {
346
+ "type": "object",
347
+ "properties": {
348
+ "title": {"type": "string"},
349
+ "type": {"type": "string"},
350
+ "due_date": {"type": "string"},
351
+ "assignee": {"type": "string"},
352
+ "needs_calendar": {"type": "boolean"}
353
+ }
354
+ }
355
+ }
356
+ }