Spaces:
Sleeping
Sleeping
| """Enhanced database service with complete functionality""" | |
| import sqlite3 | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import json | |
| from datetime import datetime, timedelta | |
| import random | |
| import uuid | |
| import pandas as pd | |
| import os | |
| import logging | |
| from faker import Faker | |
| logger = logging.getLogger(__name__) | |
| class DatabaseService: | |
| def __init__(self, db_path: str = "/data/robata.db", llm_service=None): | |
| """ | |
| Initialize database service | |
| Args: | |
| db_path: Path to SQLite database | |
| llm_service: LLM service for synthetic data generation | |
| """ | |
| self.db_path = db_path | |
| self.fake = Faker() | |
| self.llm = llm_service | |
| Path(db_path).parent.mkdir(parents=True, exist_ok=True) | |
| self.setup_database() | |
| # Only generate data if database is empty | |
| if not self._has_data(): | |
| self.generate_synthetic_data() | |
| def _has_data(self) -> bool: | |
| """Check if database has existing data""" | |
| with self.get_db() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT COUNT(*) FROM users") | |
| user_count = cursor.fetchone()[0] | |
| return user_count > 0 | |
| def get_db(self): | |
| """Context manager for database connections""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def setup_database(self): | |
| """Set up database schema""" | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| c.executescript(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id TEXT PRIMARY KEY, | |
| email TEXT UNIQUE, | |
| name TEXT, | |
| role TEXT, | |
| department TEXT, | |
| title TEXT, | |
| region TEXT, | |
| quota REAL, | |
| created_at TEXT, | |
| last_login TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS accounts ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT, | |
| parent_account_id TEXT, | |
| industry TEXT, | |
| status TEXT, | |
| website TEXT, | |
| annual_revenue REAL, | |
| employee_count INTEGER, | |
| technology_stack TEXT, | |
| region TEXT, | |
| address TEXT, | |
| account_owner_id TEXT, | |
| engagement_score REAL, | |
| created_at TEXT, | |
| last_activity_at TEXT, | |
| FOREIGN KEY (parent_account_id) REFERENCES accounts (id), | |
| FOREIGN KEY (account_owner_id) REFERENCES users (id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS contacts ( | |
| id TEXT PRIMARY KEY, | |
| account_id TEXT, | |
| first_name TEXT, | |
| last_name TEXT, | |
| email TEXT, | |
| phone TEXT, | |
| title TEXT, | |
| department TEXT, | |
| reports_to_id TEXT, | |
| influence_level TEXT, | |
| engagement_score REAL, | |
| preferences TEXT, | |
| created_at TEXT, | |
| last_contacted TEXT, | |
| FOREIGN KEY (account_id) REFERENCES accounts (id), | |
| FOREIGN KEY (reports_to_id) REFERENCES contacts (id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS interactions ( | |
| id TEXT PRIMARY KEY, | |
| type TEXT, | |
| account_id TEXT, | |
| owner_id TEXT, | |
| transcript TEXT, | |
| summary TEXT, | |
| sentiment_score REAL, | |
| metadata TEXT, | |
| created_at TEXT, | |
| FOREIGN KEY (account_id) REFERENCES accounts (id), | |
| FOREIGN KEY (owner_id) REFERENCES users (id) | |
| ); | |
| -- Create indexes for better query performance | |
| CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); | |
| CREATE INDEX IF NOT EXISTS idx_accounts_owner ON accounts(account_owner_id); | |
| CREATE INDEX IF NOT EXISTS idx_contacts_account ON contacts(account_id); | |
| CREATE INDEX IF NOT EXISTS idx_interactions_account ON interactions(account_id); | |
| CREATE INDEX IF NOT EXISTS idx_interactions_owner ON interactions(owner_id); | |
| ''') | |
| def get_user_by_email(self, email: str) -> Optional[Dict]: | |
| """ | |
| Get user details by email address | |
| Args: | |
| email: User's email address | |
| Returns: | |
| Dict containing user details if found, None otherwise | |
| """ | |
| try: | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT | |
| id, | |
| email, | |
| name, | |
| role, | |
| department, | |
| title, | |
| region, | |
| quota, | |
| created_at, | |
| last_login | |
| FROM users | |
| WHERE email = ? | |
| """, (email,)) | |
| row = cursor.fetchone() | |
| if row: | |
| # Update last login | |
| conn.execute(""" | |
| UPDATE users | |
| SET last_login = ? | |
| WHERE id = ? | |
| """, (datetime.now().isoformat(), row['id'])) | |
| conn.commit() | |
| # Convert row to dict | |
| return dict(row) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting user by email: {str(e)}") | |
| return None | |
| def generate_synthetic_data(self): | |
| """Generate synthetic test data""" | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| # Generate Users | |
| users = [] | |
| user_ids = [] # Keep track of user IDs for account assignment | |
| # Predefined test user | |
| test_user_id = str(uuid.uuid4()) | |
| users.append({ | |
| 'id': test_user_id, | |
| 'email': 'test@example.com', # Default test login | |
| 'name': 'Test User', | |
| 'role': 'sales_rep', | |
| 'department': 'Sales', | |
| 'title': 'Senior Sales Representative', | |
| 'region': 'North', | |
| 'quota': 1000000.0, | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_login': datetime.now().isoformat() | |
| }) | |
| user_ids.append(test_user_id) | |
| # Generate additional users | |
| for _ in range(10): | |
| user_id = str(uuid.uuid4()) | |
| user_ids.append(user_id) | |
| users.append({ | |
| 'id': user_id, | |
| 'email': self.fake.company_email(), | |
| 'name': self.fake.name(), | |
| 'role': random.choice(['sales_rep', 'regional_lead', 'head_of_sales']), | |
| 'department': random.choice(['Sales', 'Consulting', 'Technology']), | |
| 'title': 'Senior Sales Representative', | |
| 'region': random.choice(['North', 'South', 'East', 'West']), | |
| 'quota': random.uniform(500000, 2000000), | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_login': datetime.now().isoformat() | |
| }) | |
| # Insert users | |
| c.executemany(''' | |
| INSERT OR REPLACE INTO users VALUES ( | |
| :id, :email, :name, :role, :department, :title, :region, | |
| :quota, :created_at, :last_login | |
| ) | |
| ''', users) | |
| # Generate Accounts | |
| accounts = [] | |
| industries = ['Technology', 'Healthcare', 'Financial Services', 'Manufacturing', 'Retail'] | |
| # Ensure test user has accounts | |
| for _ in range(3): | |
| accounts.append({ | |
| 'id': str(uuid.uuid4()), | |
| 'name': self.fake.company(), | |
| 'parent_account_id': None, | |
| 'industry': random.choice(industries), | |
| 'status': 'active', | |
| 'website': self.fake.url(), | |
| 'annual_revenue': random.uniform(1000000, 100000000), | |
| 'employee_count': random.randint(50, 10000), | |
| 'technology_stack': json.dumps(['Python', 'React', 'AWS']), | |
| 'region': 'North', | |
| 'address': self.fake.address(), | |
| 'account_owner_id': test_user_id, # Assign to test user | |
| 'engagement_score': random.uniform(0, 100), | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_activity_at': datetime.now().isoformat() | |
| }) | |
| # Generate additional accounts | |
| for user_id in user_ids: | |
| for _ in range(random.randint(2, 5)): | |
| accounts.append({ | |
| 'id': str(uuid.uuid4()), | |
| 'name': self.fake.company(), | |
| 'parent_account_id': None, | |
| 'industry': random.choice(industries), | |
| 'status': 'active', | |
| 'website': self.fake.url(), | |
| 'annual_revenue': random.uniform(1000000, 100000000), | |
| 'employee_count': random.randint(50, 10000), | |
| 'technology_stack': json.dumps(['Python', 'React', 'AWS']), | |
| 'region': random.choice(['North', 'South', 'East', 'West']), | |
| 'address': self.fake.address(), | |
| 'account_owner_id': user_id, | |
| 'engagement_score': random.uniform(0, 100), | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_activity_at': datetime.now().isoformat() | |
| }) | |
| # Insert accounts | |
| c.executemany(''' | |
| INSERT OR REPLACE INTO accounts VALUES ( | |
| :id, :name, :parent_account_id, :industry, :status, :website, | |
| :annual_revenue, :employee_count, :technology_stack, :region, | |
| :address, :account_owner_id, :engagement_score, :created_at, | |
| :last_activity_at | |
| ) | |
| ''', accounts) | |
| # Generate contacts for each account | |
| contacts = [] | |
| for account in accounts: | |
| for _ in range(random.randint(3, 8)): # 3-8 contacts per account | |
| contacts.append({ | |
| 'id': str(uuid.uuid4()), | |
| 'account_id': account['id'], | |
| 'first_name': self.fake.first_name(), | |
| 'last_name': self.fake.last_name(), | |
| 'email': self.fake.email(), | |
| 'phone': self.fake.phone_number(), | |
| 'title': random.choice(['CEO', 'CTO', 'CFO', 'VP Sales', 'Director']), | |
| 'department': random.choice(['Executive', 'Sales', 'IT', 'Finance']), | |
| 'reports_to_id': None, | |
| 'influence_level': random.choice(['High', 'Medium', 'Low']), | |
| 'engagement_score': random.uniform(0, 100), | |
| 'preferences': json.dumps({}), | |
| 'created_at': datetime.now().isoformat(), | |
| 'last_contacted': datetime.now().isoformat() | |
| }) | |
| c.executemany(''' | |
| INSERT OR REPLACE INTO contacts VALUES ( | |
| :id, :account_id, :first_name, :last_name, :email, :phone, | |
| :title, :department, :reports_to_id, :influence_level, | |
| :engagement_score, :preferences, :created_at, :last_contacted | |
| ) | |
| ''', contacts) | |
| # Generate interactions for each account | |
| interactions = [] | |
| for account in accounts: | |
| for _ in range(random.randint(5, 12)): # 5-12 interactions per account | |
| interactions.append({ | |
| 'id': str(uuid.uuid4()), | |
| 'type': random.choice(['call', 'meeting', 'email', 'presentation']), | |
| 'account_id': account['id'], | |
| 'owner_id': account['account_owner_id'], | |
| 'transcript': self.fake.paragraph(), | |
| 'summary': self.fake.sentence(), | |
| 'sentiment_score': random.uniform(0, 1), | |
| 'metadata': json.dumps({ | |
| 'duration': random.randint(15, 120), | |
| 'location': random.choice(['virtual', 'in-person']), | |
| 'attendees': random.randint(1, 5), | |
| 'key_points': [ | |
| self.fake.sentence() for _ in range(random.randint(2, 5)) | |
| ], | |
| 'action_items': [ | |
| {'description': self.fake.sentence(), 'owner': self.fake.name()} | |
| for _ in range(random.randint(1, 3)) | |
| ] | |
| }), | |
| 'created_at': datetime.now().isoformat() | |
| }) | |
| c.executemany(''' | |
| INSERT OR REPLACE INTO interactions VALUES ( | |
| :id, :type, :account_id, :owner_id, :transcript, :summary, | |
| :sentiment_score, :metadata, :created_at | |
| ) | |
| ''', interactions) | |
| conn.commit() | |
| def get_user_accounts(self, user_id: str) -> List[Dict]: | |
| """Get accounts associated with user""" | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT * FROM accounts | |
| WHERE account_owner_id = ? | |
| ORDER BY name | |
| """, (user_id,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_account_metrics(self, account_id: str) -> Dict: | |
| """Get metrics for a specific account""" | |
| with self.get_db() as conn: | |
| # Get contact count | |
| cursor = conn.execute(""" | |
| SELECT COUNT(*) as contact_count | |
| FROM contacts | |
| WHERE account_id = ? | |
| """, (account_id,)) | |
| contact_count = cursor.fetchone()['contact_count'] | |
| # Get interaction count and average sentiment | |
| cursor = conn.execute(""" | |
| SELECT | |
| COUNT(*) as interaction_count, | |
| AVG(sentiment_score) as avg_sentiment | |
| FROM interactions | |
| WHERE account_id = ? | |
| """, (account_id,)) | |
| interaction_stats = cursor.fetchone() | |
| return { | |
| 'contact_count': contact_count, | |
| 'interaction_count': interaction_stats['interaction_count'], | |
| 'avg_sentiment': interaction_stats['avg_sentiment'] or 0.0 | |
| } | |
| def get_recent_interactions(self, user_id: str = None, limit: int = 10) -> List[Dict]: | |
| """Get recent interactions with account and user details""" | |
| with self.get_db() as conn: | |
| query = """ | |
| SELECT | |
| i.*, | |
| a.name as account_name, | |
| u.name as owner_name, | |
| a.industry as account_industry | |
| FROM interactions i | |
| JOIN accounts a ON i.account_id = a.id | |
| JOIN users u ON i.owner_id = u.id | |
| """ | |
| params = [] | |
| if user_id: | |
| query += " WHERE i.owner_id = ?" | |
| params.append(user_id) | |
| query += " ORDER BY i.created_at DESC LIMIT ?" | |
| params.append(limit) | |
| cursor = conn.execute(query, params) | |
| interactions = [] | |
| for row in cursor: | |
| interaction = dict(row) | |
| # Parse JSON fields | |
| try: | |
| if interaction.get('metadata'): | |
| interaction['metadata'] = json.loads(interaction['metadata']) | |
| else: | |
| interaction['metadata'] = {} | |
| except json.JSONDecodeError: | |
| interaction['metadata'] = {} | |
| interactions.append(interaction) | |
| return interactions | |
| def get_contacts(self, account_id: str) -> List[Dict]: | |
| """Get contacts for an account with their relationships""" | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT | |
| c.*, | |
| c2.first_name as reports_to_first_name, | |
| c2.last_name as reports_to_last_name | |
| FROM contacts c | |
| LEFT JOIN contacts c2 ON c.reports_to_id = c2.id | |
| WHERE c.account_id = ? | |
| ORDER BY c.influence_level DESC, c.first_name, c.last_name | |
| """, (account_id,)) | |
| contacts = [] | |
| for row in cursor: | |
| contact = dict(row) | |
| # Parse JSON fields | |
| try: | |
| if contact.get('preferences'): | |
| contact['preferences'] = json.loads(contact['preferences']) | |
| else: | |
| contact['preferences'] = {} | |
| except json.JSONDecodeError: | |
| contact['preferences'] = {} | |
| contacts.append(contact) | |
| return contacts | |
| def get_dashboard_data(self) -> Tuple[Dict, pd.DataFrame, pd.DataFrame]: | |
| """Get aggregated data for dashboard""" | |
| with self.get_db() as conn: | |
| # Get counts | |
| counts = { | |
| 'accounts': conn.execute('SELECT COUNT(*) FROM accounts').fetchone()[0], | |
| 'contacts': conn.execute('SELECT COUNT(*) FROM contacts').fetchone()[0], | |
| 'interactions': conn.execute('SELECT COUNT(*) FROM interactions').fetchone()[0] | |
| } | |
| # Get recent interactions | |
| recent_interactions = pd.read_sql(""" | |
| SELECT | |
| i.created_at, i.type, | |
| a.name as account_name, | |
| u.name as owner_name, | |
| i.sentiment_score | |
| FROM interactions i | |
| JOIN accounts a ON i.account_id = a.id | |
| JOIN users u ON i.owner_id = u.id | |
| ORDER BY i.created_at DESC | |
| LIMIT 10 | |
| """, conn) | |
| # Get account distribution | |
| account_distribution = pd.read_sql(""" | |
| SELECT industry, COUNT(*) as count | |
| FROM accounts | |
| GROUP BY industry | |
| """, conn) | |
| return counts, recent_interactions, account_distribution | |
| def save_interaction(self, interaction_data: Dict[str, Any]) -> str: | |
| """Save a new interaction to the database""" | |
| with self.get_db() as conn: | |
| cursor = conn.cursor() | |
| # Ensure required fields | |
| required_fields = ['id', 'type', 'account_id', 'owner_id', 'created_at'] | |
| for field in required_fields: | |
| if field not in interaction_data: | |
| raise ValueError(f"Missing required field: {field}") | |
| # Convert any dict/list fields to JSON | |
| if 'metadata' in interaction_data and isinstance(interaction_data['metadata'], (dict, list)): | |
| interaction_data['metadata'] = json.dumps(interaction_data['metadata']) | |
| # Build query dynamically based on provided fields | |
| fields = interaction_data.keys() | |
| placeholders = ','.join(['?' for _ in fields]) | |
| query = f"INSERT INTO interactions ({','.join(fields)}) VALUES ({placeholders})" | |
| cursor.execute(query, list(interaction_data.values())) | |
| conn.commit() | |
| return interaction_data['id'] | |
| def add_account(self, account_data: Dict[str, Any]) -> str: | |
| """Add a new account to the database""" | |
| account_id = str(uuid.uuid4()) | |
| account_data['id'] = account_id | |
| account_data['created_at'] = datetime.now().isoformat() | |
| account_data['last_activity_at'] = datetime.now().isoformat() | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| placeholders = ', '.join(['?' for _ in account_data]) | |
| columns = ', '.join(account_data.keys()) | |
| sql = f'INSERT INTO accounts ({columns}) VALUES ({placeholders})' | |
| c.execute(sql, list(account_data.values())) | |
| conn.commit() | |
| return account_id | |
| def add_contact(self, contact_data: Dict[str, Any]) -> str: | |
| """Add a new contact to the database""" | |
| contact_id = str(uuid.uuid4()) | |
| contact_data['id'] = contact_id | |
| contact_data['created_at'] = datetime.now().isoformat() | |
| contact_data['last_contacted'] = datetime.now().isoformat() | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| placeholders = ', '.join(['?' for _ in contact_data]) | |
| columns = ', '.join(contact_data.keys()) | |
| sql = f'INSERT INTO contacts ({columns}) VALUES ({placeholders})' | |
| c.execute(sql, list(contact_data.values())) | |
| conn.commit() | |
| return contact_id | |
| def update_account(self, account_id: str, update_data: Dict[str, Any]) -> bool: | |
| """Update an existing account""" | |
| update_data['last_activity_at'] = datetime.now().isoformat() | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()]) | |
| sql = f'UPDATE accounts SET {set_clause} WHERE id = ?' | |
| values = list(update_data.values()) + [account_id] | |
| c.execute(sql, values) | |
| conn.commit() | |
| return c.rowcount > 0 | |
| def update_contact(self, contact_id: str, update_data: Dict[str, Any]) -> bool: | |
| """Update an existing contact""" | |
| update_data['last_contacted'] = datetime.now().isoformat() | |
| with self.get_db() as conn: | |
| c = conn.cursor() | |
| set_clause = ', '.join([f"{k} = ?" for k in update_data.keys()]) | |
| sql = f'UPDATE contacts SET {set_clause} WHERE id = ?' | |
| values = list(update_data.values()) + [contact_id] | |
| c.execute(sql, values) | |
| conn.commit() | |
| return c.rowcount > 0 | |
| def get_account_timeline(self, account_id: str, days: int = 90) -> List[Dict]: | |
| """Get timeline of account activities""" | |
| cutoff_date = (datetime.now() - timedelta(days=days)).isoformat() | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT | |
| 'interaction' as event_type, | |
| i.id, | |
| i.type as subtype, | |
| i.created_at, | |
| i.summary as description, | |
| i.sentiment_score, | |
| u.name as actor_name | |
| FROM interactions i | |
| JOIN users u ON i.owner_id = u.id | |
| WHERE i.account_id = ? AND i.created_at > ? | |
| ORDER BY i.created_at DESC | |
| """, (account_id, cutoff_date)) | |
| timeline = [] | |
| for row in cursor: | |
| event = dict(row) | |
| timeline.append(event) | |
| return timeline | |
| def get_account_details(self, account_id: str) -> Optional[Dict]: | |
| """Get detailed account information""" | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT | |
| a.*, | |
| u.name as owner_name, | |
| COUNT(DISTINCT c.id) as contact_count, | |
| COUNT(DISTINCT i.id) as interaction_count, | |
| AVG(i.sentiment_score) as avg_sentiment | |
| FROM accounts a | |
| LEFT JOIN users u ON a.account_owner_id = u.id | |
| LEFT JOIN contacts c ON a.id = c.account_id | |
| LEFT JOIN interactions i ON a.id = i.account_id | |
| WHERE a.id = ? | |
| GROUP BY a.id | |
| """, (account_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| account = dict(row) | |
| try: | |
| account['technology_stack'] = json.loads(account['technology_stack']) | |
| except (json.JSONDecodeError, TypeError): | |
| account['technology_stack'] = [] | |
| return account | |
| return None | |
| def search_accounts(self, query: str, limit: int = 10) -> List[Dict]: | |
| """Search accounts by name or industry""" | |
| search_term = f"%{query}%" | |
| with self.get_db() as conn: | |
| cursor = conn.execute(""" | |
| SELECT | |
| a.*, | |
| u.name as owner_name | |
| FROM accounts a | |
| LEFT JOIN users u ON a.account_owner_id = u.id | |
| WHERE | |
| a.name LIKE ? OR | |
| a.industry LIKE ? OR | |
| a.website LIKE ? | |
| LIMIT ? | |
| """, (search_term, search_term, search_term, limit)) | |
| return [dict(row) for row in cursor] | |
| def search_contacts(self, query: str, account_id: Optional[str] = None, limit: int = 10) -> List[Dict]: | |
| """Search contacts by name or email""" | |
| search_term = f"%{query}%" | |
| with self.get_db() as conn: | |
| sql = """ | |
| SELECT | |
| c.*, | |
| a.name as account_name | |
| FROM contacts c | |
| JOIN accounts a ON c.account_id = a.id | |
| WHERE | |
| (c.first_name LIKE ? OR | |
| c.last_name LIKE ? OR | |
| c.email LIKE ?) | |
| """ | |
| params = [search_term, search_term, search_term] | |
| if account_id: | |
| sql += " AND c.account_id = ?" | |
| params.append(account_id) | |
| sql += " LIMIT ?" | |
| params.append(limit) | |
| cursor = conn.execute(sql, params) | |
| return [dict(row) for row in cursor] | |
| def search_interactions(self, query: str, user_id: Optional[str] = None, limit: int = 10) -> List[Dict]: | |
| """Search interactions by content""" | |
| search_term = f"%{query}%" | |
| with self.get_db() as conn: | |
| sql = """ | |
| SELECT | |
| i.*, | |
| a.name as account_name, | |
| u.name as owner_name | |
| FROM interactions i | |
| JOIN accounts a ON i.account_id = a.id | |
| JOIN users u ON i.owner_id = u.id | |
| WHERE | |
| (i.transcript LIKE ? OR | |
| i.summary LIKE ?) | |
| """ | |
| params = [search_term, search_term] | |
| if user_id: | |
| sql += " AND i.owner_id = ?" | |
| params.append(user_id) | |
| sql += " ORDER BY i.created_at DESC LIMIT ?" | |
| params.append(limit) | |
| cursor = conn.execute(sql, params) | |
| interactions = [] | |
| for row in cursor: | |
| interaction = dict(row) | |
| try: | |
| if interaction.get('metadata'): | |
| interaction['metadata'] = json.loads(interaction['metadata']) | |
| except json.JSONDecodeError: | |
| interaction['metadata'] = {} | |
| interactions.append(interaction) | |
| return interactions |