SYNRG / src /core /services /interaction_service.py
cryogenic22's picture
Create interaction_service.py
4fa4599 verified
"""Service for managing interactions"""
import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional
class InteractionService:
def __init__(self):
"""Initialize interaction service"""
pass
def create_interaction(
self,
db_service,
llm_service,
interaction_data: Dict[str, Any]
) -> str:
"""
Create new interaction with AI analysis
Args:
db_service: Database service instance
llm_service: LLM service instance
interaction_data: Interaction details
Returns:
str: Interaction ID
"""
# Generate ID
interaction_id = str(uuid.uuid4())
# Analyze transcript with LLM
if interaction_data.get('transcript'):
analysis = llm_service.analyze_interaction(
interaction_data['transcript']
)
interaction_data.update({
'summary': analysis.get('summary'),
'sentiment_score': analysis.get('sentiment_score'),
'metadata': {
**interaction_data.get('metadata', {}),
'key_points': analysis.get('key_points', []),
'action_items': analysis.get('action_items', [])
}
})
# Save to database
return db_service.save_interaction({
'id': interaction_id,
**interaction_data
})
def get_interaction_stats(
self,
db_service,
user_id: str
) -> Dict[str, Any]:
"""Get interaction statistics for user"""
recent = db_service.get_recent_interactions(user_id)
# Calculate statistics
stats = {
'total_count': len(recent),
'avg_sentiment': sum(
float(i['sentiment_score'] or 0)
for i in recent
) / len(recent) if recent else 0,
'type_distribution': {}
}
# Count interaction types
for interaction in recent:
interaction_type = interaction['type']
stats['type_distribution'][interaction_type] = \
stats['type_distribution'].get(interaction_type, 0) + 1
return stats
def search_interactions(
self,
db_service,
query: str,
user_id: Optional[str] = None,
limit: int = 10
) -> List[Dict]:
"""Search interactions"""
return db_service.search_interactions(query, user_id, limit)