"""Enhanced database service with complete functionality""" import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Dict, List, Any, Optional, Tuple import json from datetime import datetime, timedelta import random import uuid import pandas as pd import os import logging from faker import Faker logger = logging.getLogger(__name__) class DatabaseService: def __init__(self, db_path: str = "/data/robata.db", llm_service=None): """ Initialize database service Args: db_path: Path to SQLite database llm_service: LLM service for synthetic data generation """ self.db_path = db_path self.fake = Faker() self.llm = llm_service Path(db_path).parent.mkdir(parents=True, exist_ok=True) self.setup_database() # Only generate data if database is empty if not self._has_data(): self.generate_synthetic_data() def _has_data(self) -> bool: """Check if database has existing data""" with self.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM users") user_count = cursor.fetchone()[0] return user_count > 0 @contextmanager def get_db(self): """Context manager for database connections""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def setup_database(self): """Set up database schema""" with self.get_db() as conn: c = conn.cursor() c.executescript(''' CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, email TEXT UNIQUE, name TEXT, role TEXT, department TEXT, title TEXT, region TEXT, quota REAL, created_at TEXT, last_login TEXT ); CREATE TABLE IF NOT EXISTS accounts ( id TEXT PRIMARY KEY, name TEXT, parent_account_id TEXT, industry TEXT, status TEXT, website TEXT, annual_revenue REAL, employee_count INTEGER, technology_stack TEXT, region TEXT, address TEXT, account_owner_id TEXT, engagement_score REAL, created_at TEXT, last_activity_at TEXT, FOREIGN KEY (parent_account_id) REFERENCES accounts (id), FOREIGN KEY (account_owner_id) REFERENCES users (id) ); CREATE TABLE IF NOT EXISTS contacts ( id TEXT PRIMARY KEY, account_id TEXT, first_name TEXT, last_name TEXT, email TEXT, phone TEXT, title TEXT, department TEXT, reports_to_id TEXT, influence_level TEXT, engagement_score REAL, preferences TEXT, created_at TEXT, last_contacted TEXT, FOREIGN KEY (account_id) REFERENCES accounts (id), FOREIGN KEY (reports_to_id) REFERENCES contacts (id) ); CREATE TABLE IF NOT EXISTS interactions ( id TEXT PRIMARY KEY, type TEXT, account_id TEXT, owner_id TEXT, transcript TEXT, summary TEXT, sentiment_score REAL, metadata TEXT, created_at TEXT, FOREIGN KEY (account_id) REFERENCES accounts (id), FOREIGN KEY (owner_id) REFERENCES users (id) ); -- Create indexes for better query performance CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); CREATE INDEX IF NOT EXISTS idx_accounts_owner ON accounts(account_owner_id); CREATE INDEX IF NOT EXISTS idx_contacts_account ON contacts(account_id); CREATE INDEX IF NOT EXISTS idx_interactions_account ON interactions(account_id); CREATE INDEX IF NOT EXISTS idx_interactions_owner ON interactions(owner_id); ''') def get_user_by_email(self, email: str) -> Optional[Dict]: """ Get user details by email address Args: email: User's email address Returns: Dict containing user details if found, None otherwise """ try: with self.get_db() as conn: cursor = conn.execute(""" SELECT id, email, name, role, department, title, region, quota, created_at, last_login FROM users WHERE email = ? """, (email,)) row = cursor.fetchone() if row: # Update last login conn.execute(""" UPDATE users SET last_login = ? WHERE id = ? """, (datetime.now().isoformat(), row['id'])) conn.commit() # Convert row to dict return dict(row) return None except Exception as e: logger.error(f"Error getting user by email: {str(e)}") return None def generate_synthetic_data(self): """Generate synthetic test data""" with self.get_db() as conn: c = conn.cursor() # Generate Users users = [] user_ids = [] # Keep track of user IDs for account assignment # Predefined test user test_user_id = str(uuid.uuid4()) users.append({ 'id': test_user_id, 'email': 'test@example.com', # Default test login 'name': 'Test User', 'role': 'sales_rep', 'department': 'Sales', 'title': 'Senior Sales Representative', 'region': 'North', 'quota': 1000000.0, 'created_at': datetime.now().isoformat(), 'last_login': datetime.now().isoformat() }) user_ids.append(test_user_id) # Generate additional users for _ in range(10): user_id = str(uuid.uuid4()) user_ids.append(user_id) users.append({ 'id': user_id, 'email': self.fake.company_email(), 'name': self.fake.name(), 'role': random.choice(['sales_rep', 'regional_lead', 'head_of_sales']), 'department': random.choice(['Sales', 'Consulting', 'Technology']), 'title': 'Senior Sales Representative', 'region': random.choice(['North', 'South', 'East', 'West']), 'quota': random.uniform(500000, 2000000), 'created_at': datetime.now().isoformat(), 'last_login': datetime.now().isoformat() }) # Insert users c.executemany(''' INSERT OR REPLACE INTO users VALUES ( :id, :email, :name, :role, :department, :title, :region, :quota, :created_at, :last_login ) ''', users) # Generate Accounts accounts = [] industries = ['Technology', 'Healthcare', 'Financial Services', 'Manufacturing', 'Retail'] # Ensure test user has accounts for _ in range(3): accounts.append({ 'id': str(uuid.uuid4()), 'name': self.fake.company(), 'parent_account_id': None, 'industry': random.choice(industries), 'status': 'active', 'website': self.fake.url(), 'annual_revenue': random.uniform(1000000, 100000000), 'employee_count': random.randint(50, 10000), 'technology_stack': json.dumps(['Python', 'React', 'AWS']), 'region': 'North', 'address': self.fake.address(), 'account_owner_id': test_user_id, # Assign to test user 'engagement_score': random.uniform(0, 100), 'created_at': datetime.now().isoformat(), 'last_activity_at': datetime.now().isoformat() }) # Generate additional accounts for user_id in user_ids: for _ in range(random.randint(2, 5)): accounts.append({ 'id': str(uuid.uuid4()), 'name': self.fake.company(), 'parent_account_id': None, 'industry': random.choice(industries), 'status': 'active', 'website': self.fake.url(), 'annual_revenue': random.uniform(1000000, 100000000), 'employee_count': random.randint(50, 10000), 'technology_stack': json.dumps(['Python', 'React', 'AWS']), 'region': random.choice(['North', 'South', 'East', 'West']), 'address': self.fake.address(), 'account_owner_id': user_id, 'engagement_score': random.uniform(0, 100), 'created_at': datetime.now().isoformat(), 'last_activity_at': datetime.now().isoformat() }) # Insert accounts c.executemany(''' INSERT OR REPLACE INTO accounts VALUES ( :id, :name, :parent_account_id, :industry, :status, :website, :annual_revenue, :employee_count, :technology_stack, :region, :address, :account_owner_id, :engagement_score, :created_at, :last_activity_at ) ''', accounts) # Generate contacts for each account contacts = [] for account in accounts: for _ in range(random.randint(3, 8)): # 3-8 contacts per account contacts.append({ 'id': str(uuid.uuid4()), 'account_id': account['id'], 'first_name': self.fake.first_name(), 'last_name': self.fake.last_name(), 'email': self.fake.email(), 'phone': self.fake.phone_number(), 'title': random.choice(['CEO', 'CTO', 'CFO', 'VP Sales', 'Director']), 'department': random.choice(['Executive', 'Sales', 'IT', 'Finance']), 'reports_to_id': None, 'influence_level': random.choice(['High', 'Medium', 'Low']), 'engagement_score': random.uniform(0, 100), 'preferences': json.dumps({}), 'created_at': datetime.now().isoformat(), 'last_contacted': datetime.now().isoformat() }) c.executemany(''' INSERT OR REPLACE INTO contacts VALUES ( :id, :account_id, :first_name, :last_name, :email, :phone, :title, :department, :reports_to_id, :influence_level, :engagement_score, :preferences, :created_at, :last_contacted ) ''', contacts) # Generate interactions for each account interactions = [] for account in accounts: for _ in range(random.randint(5, 12)): # 5-12 interactions per account interactions.append({ 'id': str(uuid.uuid4()), 'type': random.choice(['call', 'meeting', 'email', 'presentation']), 'account_id': account['id'], 'owner_id': account['account_owner_id'], 'transcript': self.fake.paragraph(), 'summary': self.fake.sentence(), 'sentiment_score': random.uniform(0, 1), 'metadata': json.dumps({ 'duration': random.randint(15, 120), 'location': random.choice(['virtual', 'in-person']), 'attendees': random.randint(1, 5), 'key_points': [ self.fake.sentence() for _ in range(random.randint(2, 5)) ], 'action_items': [ {'description': self.fake.sentence(), 'owner': self.fake.name()} for _ in range(random.randint(1, 3)) ] }), 'created_at': datetime.now().isoformat() }) c.executemany(''' INSERT OR REPLACE INTO interactions VALUES ( :id, :type, :account_id, :owner_id, :transcript, :summary, :sentiment_score, :metadata, :created_at ) ''', interactions) conn.commit() def get_user_accounts(self, user_id: str) -> List[Dict]: """Get accounts associated with user""" with self.get_db() as conn: cursor = conn.execute(""" SELECT * FROM accounts WHERE account_owner_id = ? ORDER BY name """, (user_id,)) return [dict(row) for row in cursor.fetchall()] def get_account_metrics(self, account_id: str) -> Dict: """Get metrics for a specific account""" with self.get_db() as conn: # Get contact count cursor = conn.execute(""" SELECT COUNT(*) as contact_count FROM contacts WHERE account_id = ? """, (account_id,)) contact_count = cursor.fetchone()['contact_count'] # Get interaction count and average sentiment cursor = conn.execute(""" SELECT COUNT(*) as interaction_count, AVG(sentiment_score) as avg_sentiment FROM interactions WHERE account_id = ? """, (account_id,)) interaction_stats = cursor.fetchone() return { 'contact_count': contact_count, 'interaction_count': interaction_stats['interaction_count'], 'avg_sentiment': interaction_stats['avg_sentiment'] or 0.0 } def get_recent_interactions(self, user_id: str = None, limit: int = 10) -> List[Dict]: """Get recent interactions with account and user details""" with self.get_db() as conn: query = """ SELECT i.*, a.name as account_name, u.name as owner_name, a.industry as account_industry FROM interactions i JOIN accounts a ON i.account_id = a.id JOIN users u ON i.owner_id = u.id """ params = [] if user_id: query += " WHERE i.owner_id = ?" params.append(user_id) query += " ORDER BY i.created_at DESC LIMIT ?" params.append(limit) cursor = conn.execute(query, params) interactions = [] for row in cursor: interaction = dict(row) # Parse JSON fields try: if interaction.get('metadata'): interaction['metadata'] = json.loads(interaction['metadata']) else: interaction['metadata'] = {} except json.JSONDecodeError: interaction['metadata'] = {} interactions.append(interaction) return interactions def get_contacts(self, account_id: str) -> List[Dict]: """Get contacts for an account with their relationships""" with self.get_db() as conn: cursor = conn.execute(""" SELECT c.*, c2.first_name as reports_to_first_name, c2.last_name as reports_to_last_name FROM contacts c LEFT JOIN contacts c2 ON c.reports_to_id = c2.id WHERE c.account_id = ? ORDER BY c.influence_level DESC, c.first_name, c.last_name """, (account_id,)) contacts = [] for row in cursor: contact = dict(row) # Parse JSON fields try: if contact.get('preferences'): contact['preferences'] = json.loads(contact['preferences']) else: contact['preferences'] = {} except json.JSONDecodeError: contact['preferences'] = {} contacts.append(contact) return contacts def get_dashboard_data(self) -> Tuple[Dict, pd.DataFrame, pd.DataFrame]: """Get aggregated data for dashboard""" with self.get_db() as conn: # Get counts counts = { 'accounts': conn.execute('SELECT COUNT(*) FROM accounts').fetchone()[0], 'contacts': conn.execute('SELECT COUNT(*) FROM contacts').fetchone()[0], 'interactions': conn.execute('SELECT COUNT(*) FROM interactions').fetchone()[0] } # Get recent interactions recent_interactions = pd.read_sql(""" SELECT i.created_at, i.type, a.name as account_name, u.name as owner_name, i.sentiment_score FROM interactions i JOIN accounts a ON i.account_id = a.id JOIN users u ON i.owner_id = u.id ORDER BY i.created_at DESC LIMIT 10 """, conn) # Get account distribution account_distribution = pd.read_sql(""" SELECT industry, COUNT(*) as count FROM accounts GROUP BY industry """, conn) return counts, recent_interactions, account_distribution def save_interaction(self, interaction_data: Dict[str, Any]) -> str: """Save a new interaction to the database""" with self.get_db() as conn: cursor = conn.cursor() # Ensure required fields required_fields = ['id', 'type', 'account_id', 'owner_id', 'created_at'] for field in required_fields: if field not in interaction_data: raise ValueError(f"Missing required field: {field}") # Convert any dict/list fields to JSON if 'metadata' in interaction_data and isinstance(interaction_data['metadata'], (dict, list)): interaction_data['metadata'] = json.dumps(interaction_data['metadata']) # Build query dynamically based on provided fields fields = interaction_data.keys() placeholders = ','.join(['?' for _ in fields]) query = f"INSERT INTO interactions ({','.join(fields)}) VALUES ({placeholders})" cursor.execute(query, list(interaction_data.values())) conn.commit() return interaction_data['id'] def add_account(self, account_data: Dict[str, Any]) -> str: """Add a new account to the database""" account_id = str(uuid.uuid4()) account_data['id'] = account_id account_data['created_at'] = datetime.now().isoformat() account_data['last_activity_at'] = datetime.now().isoformat() with self.get_db() as conn: c = conn.cursor() placeholders = ', '.join(['?' for _ in account_data]) columns = ', '.join(account_data.keys()) sql = f'INSERT INTO accounts ({columns}) VALUES ({placeholders})' c.execute(sql, list(account_data.values())) conn.commit() return account_id def add_contact(self, contact_data: Dict[str, Any]) -> str: """Add a new contact to the database""" contact_id = str(uuid.uuid4()) contact_data['id'] = contact_id contact_data['created_at'] = datetime.now().isoformat() contact_data['last_contacted'] = datetime.now().isoformat() with self.get_db() as conn: c = conn.cursor() placeholders = ', '.join(['?' for _ in contact_data]) columns = ', '.join(contact_data.keys()) sql = f'INSERT INTO contacts ({columns}) VALUES ({placeholders})' c.execute(sql, list(contact_data.values())) conn.commit() return contact_id def update_account(self, account_id: str, update_data: Dict[str, Any]) -> bool: """Update an existing account""" update_data['last_activity_at'] = datetime.now().isoformat() with self.get_db() as conn: c = conn.cursor() set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()]) sql = f'UPDATE accounts SET {set_clause} WHERE id = ?' values = list(update_data.values()) + [account_id] c.execute(sql, values) conn.commit() return c.rowcount > 0 def update_contact(self, contact_id: str, update_data: Dict[str, Any]) -> bool: """Update an existing contact""" update_data['last_contacted'] = datetime.now().isoformat() with self.get_db() as conn: c = conn.cursor() set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()]) sql = f'UPDATE contacts SET {set_clause} WHERE id = ?' values = list(update_data.values()) + [contact_id] c.execute(sql, values) conn.commit() return c.rowcount > 0 def get_account_timeline(self, account_id: str, days: int = 90) -> List[Dict]: """Get timeline of account activities""" cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() with self.get_db() as conn: cursor = conn.execute(""" SELECT 'interaction' as event_type, i.id, i.type as subtype, i.created_at, i.summary as description, i.sentiment_score, u.name as actor_name FROM interactions i JOIN users u ON i.owner_id = u.id WHERE i.account_id = ? AND i.created_at > ? ORDER BY i.created_at DESC """, (account_id, cutoff_date)) timeline = [] for row in cursor: event = dict(row) timeline.append(event) return timeline def get_account_details(self, account_id: str) -> Optional[Dict]: """Get detailed account information""" with self.get_db() as conn: cursor = conn.execute(""" SELECT a.*, u.name as owner_name, COUNT(DISTINCT c.id) as contact_count, COUNT(DISTINCT i.id) as interaction_count, AVG(i.sentiment_score) as avg_sentiment FROM accounts a LEFT JOIN users u ON a.account_owner_id = u.id LEFT JOIN contacts c ON a.id = c.account_id LEFT JOIN interactions i ON a.id = i.account_id WHERE a.id = ? GROUP BY a.id """, (account_id,)) row = cursor.fetchone() if row: account = dict(row) try: account['technology_stack'] = json.loads(account['technology_stack']) except (json.JSONDecodeError, TypeError): account['technology_stack'] = [] return account return None def search_accounts(self, query: str, limit: int = 10) -> List[Dict]: """Search accounts by name or industry""" search_term = f"%{query}%" with self.get_db() as conn: cursor = conn.execute(""" SELECT a.*, u.name as owner_name FROM accounts a LEFT JOIN users u ON a.account_owner_id = u.id WHERE a.name LIKE ? OR a.industry LIKE ? OR a.website LIKE ? LIMIT ? """, (search_term, search_term, search_term, limit)) return [dict(row) for row in cursor] def search_contacts(self, query: str, account_id: Optional[str] = None, limit: int = 10) -> List[Dict]: """Search contacts by name or email""" search_term = f"%{query}%" with self.get_db() as conn: sql = """ SELECT c.*, a.name as account_name FROM contacts c JOIN accounts a ON c.account_id = a.id WHERE (c.first_name LIKE ? OR c.last_name LIKE ? OR c.email LIKE ?) """ params = [search_term, search_term, search_term] if account_id: sql += " AND c.account_id = ?" params.append(account_id) sql += " LIMIT ?" params.append(limit) cursor = conn.execute(sql, params) return [dict(row) for row in cursor] def search_interactions(self, query: str, user_id: Optional[str] = None, limit: int = 10) -> List[Dict]: """Search interactions by content""" search_term = f"%{query}%" with self.get_db() as conn: sql = """ SELECT i.*, a.name as account_name, u.name as owner_name FROM interactions i JOIN accounts a ON i.account_id = a.id JOIN users u ON i.owner_id = u.id WHERE (i.transcript LIKE ? OR i.summary LIKE ?) """ params = [search_term, search_term] if user_id: sql += " AND i.owner_id = ?" params.append(user_id) sql += " ORDER BY i.created_at DESC LIMIT ?" params.append(limit) cursor = conn.execute(sql, params) interactions = [] for row in cursor: interaction = dict(row) try: if interaction.get('metadata'): interaction['metadata'] = json.loads(interaction['metadata']) except json.JSONDecodeError: interaction['metadata'] = {} interactions.append(interaction) return interactions