SYNRG / src /core /services /database_service.py
cryogenic22's picture
Update src/core/services/database_service.py
1956fa7 verified
"""Enhanced database service with complete functionality"""
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import json
from datetime import datetime, timedelta
import random
import uuid
import pandas as pd
import os
import logging
from faker import Faker
logger = logging.getLogger(__name__)
class DatabaseService:
def __init__(self, db_path: str = "/data/robata.db", llm_service=None):
"""
Initialize database service
Args:
db_path: Path to SQLite database
llm_service: LLM service for synthetic data generation
"""
self.db_path = db_path
self.fake = Faker()
self.llm = llm_service
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.setup_database()
# Only generate data if database is empty
if not self._has_data():
self.generate_synthetic_data()
def _has_data(self) -> bool:
"""Check if database has existing data"""
with self.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
user_count = cursor.fetchone()[0]
return user_count > 0
@contextmanager
def get_db(self):
"""Context manager for database connections"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def setup_database(self):
"""Set up database schema"""
with self.get_db() as conn:
c = conn.cursor()
c.executescript('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE,
name TEXT,
role TEXT,
department TEXT,
title TEXT,
region TEXT,
quota REAL,
created_at TEXT,
last_login TEXT
);
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
name TEXT,
parent_account_id TEXT,
industry TEXT,
status TEXT,
website TEXT,
annual_revenue REAL,
employee_count INTEGER,
technology_stack TEXT,
region TEXT,
address TEXT,
account_owner_id TEXT,
engagement_score REAL,
created_at TEXT,
last_activity_at TEXT,
FOREIGN KEY (parent_account_id) REFERENCES accounts (id),
FOREIGN KEY (account_owner_id) REFERENCES users (id)
);
CREATE TABLE IF NOT EXISTS contacts (
id TEXT PRIMARY KEY,
account_id TEXT,
first_name TEXT,
last_name TEXT,
email TEXT,
phone TEXT,
title TEXT,
department TEXT,
reports_to_id TEXT,
influence_level TEXT,
engagement_score REAL,
preferences TEXT,
created_at TEXT,
last_contacted TEXT,
FOREIGN KEY (account_id) REFERENCES accounts (id),
FOREIGN KEY (reports_to_id) REFERENCES contacts (id)
);
CREATE TABLE IF NOT EXISTS interactions (
id TEXT PRIMARY KEY,
type TEXT,
account_id TEXT,
owner_id TEXT,
transcript TEXT,
summary TEXT,
sentiment_score REAL,
metadata TEXT,
created_at TEXT,
FOREIGN KEY (account_id) REFERENCES accounts (id),
FOREIGN KEY (owner_id) REFERENCES users (id)
);
-- Create indexes for better query performance
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_accounts_owner ON accounts(account_owner_id);
CREATE INDEX IF NOT EXISTS idx_contacts_account ON contacts(account_id);
CREATE INDEX IF NOT EXISTS idx_interactions_account ON interactions(account_id);
CREATE INDEX IF NOT EXISTS idx_interactions_owner ON interactions(owner_id);
''')
def get_user_by_email(self, email: str) -> Optional[Dict]:
"""
Get user details by email address
Args:
email: User's email address
Returns:
Dict containing user details if found, None otherwise
"""
try:
with self.get_db() as conn:
cursor = conn.execute("""
SELECT
id,
email,
name,
role,
department,
title,
region,
quota,
created_at,
last_login
FROM users
WHERE email = ?
""", (email,))
row = cursor.fetchone()
if row:
# Update last login
conn.execute("""
UPDATE users
SET last_login = ?
WHERE id = ?
""", (datetime.now().isoformat(), row['id']))
conn.commit()
# Convert row to dict
return dict(row)
return None
except Exception as e:
logger.error(f"Error getting user by email: {str(e)}")
return None
def generate_synthetic_data(self):
"""Generate synthetic test data"""
with self.get_db() as conn:
c = conn.cursor()
# Generate Users
users = []
user_ids = [] # Keep track of user IDs for account assignment
# Predefined test user
test_user_id = str(uuid.uuid4())
users.append({
'id': test_user_id,
'email': 'test@example.com', # Default test login
'name': 'Test User',
'role': 'sales_rep',
'department': 'Sales',
'title': 'Senior Sales Representative',
'region': 'North',
'quota': 1000000.0,
'created_at': datetime.now().isoformat(),
'last_login': datetime.now().isoformat()
})
user_ids.append(test_user_id)
# Generate additional users
for _ in range(10):
user_id = str(uuid.uuid4())
user_ids.append(user_id)
users.append({
'id': user_id,
'email': self.fake.company_email(),
'name': self.fake.name(),
'role': random.choice(['sales_rep', 'regional_lead', 'head_of_sales']),
'department': random.choice(['Sales', 'Consulting', 'Technology']),
'title': 'Senior Sales Representative',
'region': random.choice(['North', 'South', 'East', 'West']),
'quota': random.uniform(500000, 2000000),
'created_at': datetime.now().isoformat(),
'last_login': datetime.now().isoformat()
})
# Insert users
c.executemany('''
INSERT OR REPLACE INTO users VALUES (
:id, :email, :name, :role, :department, :title, :region,
:quota, :created_at, :last_login
)
''', users)
# Generate Accounts
accounts = []
industries = ['Technology', 'Healthcare', 'Financial Services', 'Manufacturing', 'Retail']
# Ensure test user has accounts
for _ in range(3):
accounts.append({
'id': str(uuid.uuid4()),
'name': self.fake.company(),
'parent_account_id': None,
'industry': random.choice(industries),
'status': 'active',
'website': self.fake.url(),
'annual_revenue': random.uniform(1000000, 100000000),
'employee_count': random.randint(50, 10000),
'technology_stack': json.dumps(['Python', 'React', 'AWS']),
'region': 'North',
'address': self.fake.address(),
'account_owner_id': test_user_id, # Assign to test user
'engagement_score': random.uniform(0, 100),
'created_at': datetime.now().isoformat(),
'last_activity_at': datetime.now().isoformat()
})
# Generate additional accounts
for user_id in user_ids:
for _ in range(random.randint(2, 5)):
accounts.append({
'id': str(uuid.uuid4()),
'name': self.fake.company(),
'parent_account_id': None,
'industry': random.choice(industries),
'status': 'active',
'website': self.fake.url(),
'annual_revenue': random.uniform(1000000, 100000000),
'employee_count': random.randint(50, 10000),
'technology_stack': json.dumps(['Python', 'React', 'AWS']),
'region': random.choice(['North', 'South', 'East', 'West']),
'address': self.fake.address(),
'account_owner_id': user_id,
'engagement_score': random.uniform(0, 100),
'created_at': datetime.now().isoformat(),
'last_activity_at': datetime.now().isoformat()
})
# Insert accounts
c.executemany('''
INSERT OR REPLACE INTO accounts VALUES (
:id, :name, :parent_account_id, :industry, :status, :website,
:annual_revenue, :employee_count, :technology_stack, :region,
:address, :account_owner_id, :engagement_score, :created_at,
:last_activity_at
)
''', accounts)
# Generate contacts for each account
contacts = []
for account in accounts:
for _ in range(random.randint(3, 8)): # 3-8 contacts per account
contacts.append({
'id': str(uuid.uuid4()),
'account_id': account['id'],
'first_name': self.fake.first_name(),
'last_name': self.fake.last_name(),
'email': self.fake.email(),
'phone': self.fake.phone_number(),
'title': random.choice(['CEO', 'CTO', 'CFO', 'VP Sales', 'Director']),
'department': random.choice(['Executive', 'Sales', 'IT', 'Finance']),
'reports_to_id': None,
'influence_level': random.choice(['High', 'Medium', 'Low']),
'engagement_score': random.uniform(0, 100),
'preferences': json.dumps({}),
'created_at': datetime.now().isoformat(),
'last_contacted': datetime.now().isoformat()
})
c.executemany('''
INSERT OR REPLACE INTO contacts VALUES (
:id, :account_id, :first_name, :last_name, :email, :phone,
:title, :department, :reports_to_id, :influence_level,
:engagement_score, :preferences, :created_at, :last_contacted
)
''', contacts)
# Generate interactions for each account
interactions = []
for account in accounts:
for _ in range(random.randint(5, 12)): # 5-12 interactions per account
interactions.append({
'id': str(uuid.uuid4()),
'type': random.choice(['call', 'meeting', 'email', 'presentation']),
'account_id': account['id'],
'owner_id': account['account_owner_id'],
'transcript': self.fake.paragraph(),
'summary': self.fake.sentence(),
'sentiment_score': random.uniform(0, 1),
'metadata': json.dumps({
'duration': random.randint(15, 120),
'location': random.choice(['virtual', 'in-person']),
'attendees': random.randint(1, 5),
'key_points': [
self.fake.sentence() for _ in range(random.randint(2, 5))
],
'action_items': [
{'description': self.fake.sentence(), 'owner': self.fake.name()}
for _ in range(random.randint(1, 3))
]
}),
'created_at': datetime.now().isoformat()
})
c.executemany('''
INSERT OR REPLACE INTO interactions VALUES (
:id, :type, :account_id, :owner_id, :transcript, :summary,
:sentiment_score, :metadata, :created_at
)
''', interactions)
conn.commit()
def get_user_accounts(self, user_id: str) -> List[Dict]:
"""Get accounts associated with user"""
with self.get_db() as conn:
cursor = conn.execute("""
SELECT * FROM accounts
WHERE account_owner_id = ?
ORDER BY name
""", (user_id,))
return [dict(row) for row in cursor.fetchall()]
def get_account_metrics(self, account_id: str) -> Dict:
"""Get metrics for a specific account"""
with self.get_db() as conn:
# Get contact count
cursor = conn.execute("""
SELECT COUNT(*) as contact_count
FROM contacts
WHERE account_id = ?
""", (account_id,))
contact_count = cursor.fetchone()['contact_count']
# Get interaction count and average sentiment
cursor = conn.execute("""
SELECT
COUNT(*) as interaction_count,
AVG(sentiment_score) as avg_sentiment
FROM interactions
WHERE account_id = ?
""", (account_id,))
interaction_stats = cursor.fetchone()
return {
'contact_count': contact_count,
'interaction_count': interaction_stats['interaction_count'],
'avg_sentiment': interaction_stats['avg_sentiment'] or 0.0
}
def get_recent_interactions(self, user_id: str = None, limit: int = 10) -> List[Dict]:
"""Get recent interactions with account and user details"""
with self.get_db() as conn:
query = """
SELECT
i.*,
a.name as account_name,
u.name as owner_name,
a.industry as account_industry
FROM interactions i
JOIN accounts a ON i.account_id = a.id
JOIN users u ON i.owner_id = u.id
"""
params = []
if user_id:
query += " WHERE i.owner_id = ?"
params.append(user_id)
query += " ORDER BY i.created_at DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
interactions = []
for row in cursor:
interaction = dict(row)
# Parse JSON fields
try:
if interaction.get('metadata'):
interaction['metadata'] = json.loads(interaction['metadata'])
else:
interaction['metadata'] = {}
except json.JSONDecodeError:
interaction['metadata'] = {}
interactions.append(interaction)
return interactions
def get_contacts(self, account_id: str) -> List[Dict]:
"""Get contacts for an account with their relationships"""
with self.get_db() as conn:
cursor = conn.execute("""
SELECT
c.*,
c2.first_name as reports_to_first_name,
c2.last_name as reports_to_last_name
FROM contacts c
LEFT JOIN contacts c2 ON c.reports_to_id = c2.id
WHERE c.account_id = ?
ORDER BY c.influence_level DESC, c.first_name, c.last_name
""", (account_id,))
contacts = []
for row in cursor:
contact = dict(row)
# Parse JSON fields
try:
if contact.get('preferences'):
contact['preferences'] = json.loads(contact['preferences'])
else:
contact['preferences'] = {}
except json.JSONDecodeError:
contact['preferences'] = {}
contacts.append(contact)
return contacts
def get_dashboard_data(self) -> Tuple[Dict, pd.DataFrame, pd.DataFrame]:
"""Get aggregated data for dashboard"""
with self.get_db() as conn:
# Get counts
counts = {
'accounts': conn.execute('SELECT COUNT(*) FROM accounts').fetchone()[0],
'contacts': conn.execute('SELECT COUNT(*) FROM contacts').fetchone()[0],
'interactions': conn.execute('SELECT COUNT(*) FROM interactions').fetchone()[0]
}
# Get recent interactions
recent_interactions = pd.read_sql("""
SELECT
i.created_at, i.type,
a.name as account_name,
u.name as owner_name,
i.sentiment_score
FROM interactions i
JOIN accounts a ON i.account_id = a.id
JOIN users u ON i.owner_id = u.id
ORDER BY i.created_at DESC
LIMIT 10
""", conn)
# Get account distribution
account_distribution = pd.read_sql("""
SELECT industry, COUNT(*) as count
FROM accounts
GROUP BY industry
""", conn)
return counts, recent_interactions, account_distribution
def save_interaction(self, interaction_data: Dict[str, Any]) -> str:
"""Save a new interaction to the database"""
with self.get_db() as conn:
cursor = conn.cursor()
# Ensure required fields
required_fields = ['id', 'type', 'account_id', 'owner_id', 'created_at']
for field in required_fields:
if field not in interaction_data:
raise ValueError(f"Missing required field: {field}")
# Convert any dict/list fields to JSON
if 'metadata' in interaction_data and isinstance(interaction_data['metadata'], (dict, list)):
interaction_data['metadata'] = json.dumps(interaction_data['metadata'])
# Build query dynamically based on provided fields
fields = interaction_data.keys()
placeholders = ','.join(['?' for _ in fields])
query = f"INSERT INTO interactions ({','.join(fields)}) VALUES ({placeholders})"
cursor.execute(query, list(interaction_data.values()))
conn.commit()
return interaction_data['id']
def add_account(self, account_data: Dict[str, Any]) -> str:
"""Add a new account to the database"""
account_id = str(uuid.uuid4())
account_data['id'] = account_id
account_data['created_at'] = datetime.now().isoformat()
account_data['last_activity_at'] = datetime.now().isoformat()
with self.get_db() as conn:
c = conn.cursor()
placeholders = ', '.join(['?' for _ in account_data])
columns = ', '.join(account_data.keys())
sql = f'INSERT INTO accounts ({columns}) VALUES ({placeholders})'
c.execute(sql, list(account_data.values()))
conn.commit()
return account_id
def add_contact(self, contact_data: Dict[str, Any]) -> str:
"""Add a new contact to the database"""
contact_id = str(uuid.uuid4())
contact_data['id'] = contact_id
contact_data['created_at'] = datetime.now().isoformat()
contact_data['last_contacted'] = datetime.now().isoformat()
with self.get_db() as conn:
c = conn.cursor()
placeholders = ', '.join(['?' for _ in contact_data])
columns = ', '.join(contact_data.keys())
sql = f'INSERT INTO contacts ({columns}) VALUES ({placeholders})'
c.execute(sql, list(contact_data.values()))
conn.commit()
return contact_id
def update_account(self, account_id: str, update_data: Dict[str, Any]) -> bool:
"""Update an existing account"""
update_data['last_activity_at'] = datetime.now().isoformat()
with self.get_db() as conn:
c = conn.cursor()
set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()])
sql = f'UPDATE accounts SET {set_clause} WHERE id = ?'
values = list(update_data.values()) + [account_id]
c.execute(sql, values)
conn.commit()
return c.rowcount > 0
def update_contact(self, contact_id: str, update_data: Dict[str, Any]) -> bool:
"""Update an existing contact"""
update_data['last_contacted'] = datetime.now().isoformat()
with self.get_db() as conn:
c = conn.cursor()
set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()])
sql = f'UPDATE contacts SET {set_clause} WHERE id = ?'
values = list(update_data.values()) + [contact_id]
c.execute(sql, values)
conn.commit()
return c.rowcount > 0
def get_account_timeline(self, account_id: str, days: int = 90) -> List[Dict]:
"""Get timeline of account activities"""
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
with self.get_db() as conn:
cursor = conn.execute("""
SELECT
'interaction' as event_type,
i.id,
i.type as subtype,
i.created_at,
i.summary as description,
i.sentiment_score,
u.name as actor_name
FROM interactions i
JOIN users u ON i.owner_id = u.id
WHERE i.account_id = ? AND i.created_at > ?
ORDER BY i.created_at DESC
""", (account_id, cutoff_date))
timeline = []
for row in cursor:
event = dict(row)
timeline.append(event)
return timeline
def get_account_details(self, account_id: str) -> Optional[Dict]:
"""Get detailed account information"""
with self.get_db() as conn:
cursor = conn.execute("""
SELECT
a.*,
u.name as owner_name,
COUNT(DISTINCT c.id) as contact_count,
COUNT(DISTINCT i.id) as interaction_count,
AVG(i.sentiment_score) as avg_sentiment
FROM accounts a
LEFT JOIN users u ON a.account_owner_id = u.id
LEFT JOIN contacts c ON a.id = c.account_id
LEFT JOIN interactions i ON a.id = i.account_id
WHERE a.id = ?
GROUP BY a.id
""", (account_id,))
row = cursor.fetchone()
if row:
account = dict(row)
try:
account['technology_stack'] = json.loads(account['technology_stack'])
except (json.JSONDecodeError, TypeError):
account['technology_stack'] = []
return account
return None
def search_accounts(self, query: str, limit: int = 10) -> List[Dict]:
"""Search accounts by name or industry"""
search_term = f"%{query}%"
with self.get_db() as conn:
cursor = conn.execute("""
SELECT
a.*,
u.name as owner_name
FROM accounts a
LEFT JOIN users u ON a.account_owner_id = u.id
WHERE
a.name LIKE ? OR
a.industry LIKE ? OR
a.website LIKE ?
LIMIT ?
""", (search_term, search_term, search_term, limit))
return [dict(row) for row in cursor]
def search_contacts(self, query: str, account_id: Optional[str] = None, limit: int = 10) -> List[Dict]:
"""Search contacts by name or email"""
search_term = f"%{query}%"
with self.get_db() as conn:
sql = """
SELECT
c.*,
a.name as account_name
FROM contacts c
JOIN accounts a ON c.account_id = a.id
WHERE
(c.first_name LIKE ? OR
c.last_name LIKE ? OR
c.email LIKE ?)
"""
params = [search_term, search_term, search_term]
if account_id:
sql += " AND c.account_id = ?"
params.append(account_id)
sql += " LIMIT ?"
params.append(limit)
cursor = conn.execute(sql, params)
return [dict(row) for row in cursor]
def search_interactions(self, query: str, user_id: Optional[str] = None, limit: int = 10) -> List[Dict]:
"""Search interactions by content"""
search_term = f"%{query}%"
with self.get_db() as conn:
sql = """
SELECT
i.*,
a.name as account_name,
u.name as owner_name
FROM interactions i
JOIN accounts a ON i.account_id = a.id
JOIN users u ON i.owner_id = u.id
WHERE
(i.transcript LIKE ? OR
i.summary LIKE ?)
"""
params = [search_term, search_term]
if user_id:
sql += " AND i.owner_id = ?"
params.append(user_id)
sql += " ORDER BY i.created_at DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(sql, params)
interactions = []
for row in cursor:
interaction = dict(row)
try:
if interaction.get('metadata'):
interaction['metadata'] = json.loads(interaction['metadata'])
except json.JSONDecodeError:
interaction['metadata'] = {}
interactions.append(interaction)
return interactions