SYNRG / src /ai /services /intelligence_extractor.py
cryogenic22's picture
Update src/ai/services/intelligence_extractor.py
1340b5b verified
"""
Enhanced interaction analysis using LangGraph for agent orchestration
"""
from typing import Dict, List, Any, Optional, Annotated
from datetime import datetime
import uuid
import logging
from langgraph.graph import Graph, MessageGraph
from langgraph.prebuilt import ToolMessage
from langgraph.graph.message import MessageState
import json
logger = logging.getLogger(__name__)
class InteractionAnalysisGraph:
"""
Orchestrates interaction analysis using LangGraph
"""
def __init__(self, db_service, llm_service):
self.db = db_service
self.llm = llm_service
self.setup_tools()
self.build_graph()
def setup_tools(self):
"""Setup tools available to agents"""
self.tools = {
# Contact Management Tools
'find_contact': self._create_tool(
self._find_contact,
"Find existing contact in database",
{"name": str, "company": str}
),
'create_contact': self._create_tool(
self._create_contact,
"Create new contact record",
{"name": str, "title": str, "company": str}
),
'update_contact': self._create_tool(
self._update_contact,
"Update existing contact",
{"id": str, "updates": dict}
),
# Opportunity Tools
'find_opportunity': self._create_tool(
self._find_opportunity,
"Find existing opportunity",
{"name": str, "account_id": str}
),
'create_opportunity': self._create_tool(
self._create_opportunity,
"Create new opportunity",
{"name": str, "account_id": str, "value": float}
),
'update_opportunity': self._create_tool(
self._update_opportunity,
"Update opportunity details",
{"id": str, "updates": dict}
),
# Follow-up Tools
'create_follow_up': self._create_tool(
self._create_follow_up,
"Create follow-up action",
{"title": str, "due_date": str, "assignee": str}
),
'schedule_calendar': self._create_tool(
self._schedule_calendar,
"Schedule calendar event",
{"title": str, "date": str, "duration": int}
)
}
def build_graph(self):
"""Build the LangGraph processing graph"""
workflow = Graph()
# Define nodes
workflow.add_node("extract_intelligence", self.extract_intelligence_node)
workflow.add_node("process_contacts", self.process_contacts_node)
workflow.add_node("process_opportunities", self.process_opportunities_node)
workflow.add_node("process_follow_ups", self.process_follow_ups_node)
workflow.add_node("generate_summary", self.generate_summary_node)
# Define edges
workflow.add_edge("extract_intelligence", "process_contacts")
workflow.add_edge("process_contacts", "process_opportunities")
workflow.add_edge("process_opportunities", "process_follow_ups")
workflow.add_edge("process_follow_ups", "generate_summary")
# Set entry point
workflow.set_entry_point("extract_intelligence")
self.workflow = workflow.compile()
async def process_interaction(self, interaction_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process interaction through the graph
"""
try:
# Initialize state
state = MessageState(
messages=[],
metadata={
"interaction": interaction_data,
"processed_at": datetime.now().isoformat(),
"results": {}
}
)
# Run workflow
final_state = await self.workflow.ainvoke(state)
return final_state.metadata["results"]
except Exception as e:
logger.error(f"Graph processing failed: {str(e)}")
raise
async def extract_intelligence_node(self, state: MessageState) -> MessageState:
"""Extract structured intelligence from interaction"""
interaction = state.metadata["interaction"]
try:
# Extract using LLM
extracted = await self.llm.analyze_interaction(
interaction["transcript"],
self.intelligence_schema
)
# Update state
state.metadata["extracted"] = extracted
state.messages.append(
ToolMessage(
content="Intelligence extracted successfully",
tool_name="extract_intelligence",
tool_output=extracted
)
)
return state
except Exception as e:
logger.error(f"Intelligence extraction failed: {str(e)}")
raise
async def process_contacts_node(self, state: MessageState) -> MessageState:
"""Process and update contacts"""
extracted = state.metadata["extracted"]
contacts = extracted.get("contacts", [])
results = {"contacts": {"new": [], "updated": []}}
for contact in contacts:
try:
# Try to find existing contact
existing = await self.tools["find_contact"](
name=contact["name"],
company=contact["company"]
)
if existing:
# Update existing
if self._should_update_contact(contact, existing):
updated = await self.tools["update_contact"](
id=existing["id"],
updates=contact
)
results["contacts"]["updated"].append(updated)
else:
# Create new
if self._should_create_contact(contact):
new_contact = await self.tools["create_contact"](
name=contact["name"],
title=contact["title"],
company=contact["company"]
)
results["contacts"]["new"].append(new_contact)
except Exception as e:
logger.error(f"Contact processing failed: {str(e)}")
continue
state.metadata["results"].update(results)
return state
async def process_opportunities_node(self, state: MessageState) -> MessageState:
"""Process and update opportunities"""
extracted = state.metadata["extracted"]
opportunities = extracted.get("opportunities", [])
results = {"opportunities": {"new": [], "updated": []}}
for opp in opportunities:
try:
# Try to find existing opportunity
existing = await self.tools["find_opportunity"](
name=opp["name"],
account_id=state.metadata["interaction"]["account_id"]
)
if existing:
# Update existing
if self._should_update_opportunity(opp, existing):
updated = await self.tools["update_opportunity"](
id=existing["id"],
updates=opp
)
results["opportunities"]["updated"].append(updated)
else:
# Create new
if self._should_create_opportunity(opp):
new_opp = await self.tools["create_opportunity"](
name=opp["name"],
account_id=state.metadata["interaction"]["account_id"],
value=opp.get("value", 0)
)
results["opportunities"]["new"].append(new_opp)
except Exception as e:
logger.error(f"Opportunity processing failed: {str(e)}")
continue
state.metadata["results"].update(results)
return state
async def process_follow_ups_node(self, state: MessageState) -> MessageState:
"""Process follow-ups and calendar events"""
extracted = state.metadata["extracted"]
follow_ups = extracted.get("follow_ups", [])
results = {"follow_ups": [], "calendar_events": []}
for follow_up in follow_ups:
try:
# Create follow-up
new_follow_up = await self.tools["create_follow_up"](
title=follow_up["title"],
due_date=follow_up["due_date"],
assignee=follow_up["assignee"]
)
results["follow_ups"].append(new_follow_up)
# Schedule calendar event if needed
if follow_up.get("needs_calendar", False):
calendar_event = await self.tools["schedule_calendar"](
title=follow_up["title"],
date=follow_up["due_date"],
duration=follow_up.get("duration", 30)
)
results["calendar_events"].append(calendar_event)
except Exception as e:
logger.error(f"Follow-up processing failed: {str(e)}")
continue
state.metadata["results"].update(results)
return state
async def generate_summary_node(self, state: MessageState) -> MessageState:
"""Generate final summary of all updates"""
results = state.metadata["results"]
summary = {
"changes_made": {
"contacts": len(results["contacts"]["new"]) + len(results["contacts"]["updated"]),
"opportunities": len(results["opportunities"]["new"]) + len(results["opportunities"]["updated"]),
"follow_ups": len(results["follow_ups"])
},
"needs_attention": self._identify_attention_items(results),
"next_steps": self._generate_next_steps(results)
}
state.metadata["results"]["summary"] = summary
return state
def _should_update_contact(self, new_data: Dict, existing: Dict) -> bool:
"""Determine if contact should be updated"""
# Compare relevant fields and return True if update needed
# Add user confirmation logic here
return True # Placeholder
def _should_create_contact(self, contact_data: Dict) -> bool:
"""Determine if new contact should be created"""
# Add validation and user confirmation logic here
return True # Placeholder
def _should_update_opportunity(self, new_data: Dict, existing: Dict) -> bool:
"""Determine if opportunity should be updated"""
# Compare relevant fields and return True if update needed
# Add user confirmation logic here
return True # Placeholder
def _should_create_opportunity(self, opp_data: Dict) -> bool:
"""Determine if new opportunity should be created"""
# Add validation and user confirmation logic here
return True # Placeholder
def _identify_attention_items(self, results: Dict) -> List[Dict]:
"""Identify items needing user attention"""
attention_items = []
# Add logic to identify items needing review/confirmation
return attention_items
def _generate_next_steps(self, results: Dict) -> List[Dict]:
"""Generate recommended next steps"""
next_steps = []
# Add logic to generate recommended actions
return next_steps
@property
def intelligence_schema(self) -> Dict:
"""Schema for intelligence extraction"""
return {
"contacts": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"title": {"type": "string"},
"company": {"type": "string"},
"department": {"type": "string"},
"influence_level": {"type": "string"}
}
}
},
"opportunities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"value": {"type": "number"},
"stage": {"type": "string"},
"next_steps": {"type": "string"}
}
}
},
"follow_ups": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"type": {"type": "string"},
"due_date": {"type": "string"},
"assignee": {"type": "string"},
"needs_calendar": {"type": "boolean"}
}
}
}
}