vBot-2.1 / app /customer_db.py
Ajit Panday
Update dependencies and Hugging Face configuration
7d72bcf
from sqlalchemy import create_engine, text
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class CustomerDBManager:
def __init__(self, customer):
self.customer = customer
self.engine = None
def get_connection(self):
"""Get database connection for the customer"""
if not self.engine:
try:
# Create database URL from customer's database credentials
db_url = f"mysql+pymysql://{self.customer.db_user}:{self.customer.db_password}@{self.customer.db_host}:{self.customer.db_port}/{self.customer.db_name}"
self.engine = create_engine(db_url)
except Exception as e:
logger.error(f"Failed to create database connection for customer {self.customer.id}: {str(e)}")
raise
return self.engine
def save_call_record(self, call_data):
"""Save call record to customer's database"""
try:
engine = self.get_connection()
# Prepare the SQL query
query = text("""
INSERT INTO call_records (
id, customer_id, caller_number, called_number,
transcription, summary, sentiment, keywords,
created_at, updated_at
) VALUES (
:id, :customer_id, :caller_number, :called_number,
:transcription, :summary, :sentiment, :keywords,
:created_at, :updated_at
)
""")
# Prepare the data
now = datetime.utcnow()
record_data = {
'id': call_data.get('id'),
'customer_id': self.customer.id,
'caller_number': call_data.get('caller_number'),
'called_number': call_data.get('called_number'),
'transcription': call_data.get('transcription'),
'summary': call_data.get('summary'),
'sentiment': call_data.get('sentiment'),
'keywords': call_data.get('keywords'),
'created_at': now,
'updated_at': now
}
# Execute the query
with engine.connect() as connection:
connection.execute(query, record_data)
connection.commit()
logger.info(f"Successfully saved call record {call_data.get('id')} for customer {self.customer.id}")
return True
except Exception as e:
logger.error(f"Failed to save call record for customer {self.customer.id}: {str(e)}")
raise
def get_call_record(self, call_id):
"""Retrieve a call record from customer's database"""
try:
engine = self.get_connection()
query = text("""
SELECT * FROM call_records
WHERE id = :call_id AND customer_id = :customer_id
""")
with engine.connect() as connection:
result = connection.execute(query, {
'call_id': call_id,
'customer_id': self.customer.id
}).fetchone()
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to retrieve call record {call_id} for customer {self.customer.id}: {str(e)}")
raise
def search_call_records(self, filters=None):
"""Search call records in customer's database"""
try:
engine = self.get_connection()
# Build the query based on filters
query = text("""
SELECT * FROM call_records
WHERE customer_id = :customer_id
""")
params = {'customer_id': self.customer.id}
if filters:
if filters.get('start_date'):
query = query.text + " AND created_at >= :start_date"
params['start_date'] = filters['start_date']
if filters.get('end_date'):
query = query.text + " AND created_at <= :end_date"
params['end_date'] = filters['end_date']
if filters.get('caller_number'):
query = query.text + " AND caller_number = :caller_number"
params['caller_number'] = filters['caller_number']
if filters.get('called_number'):
query = query.text + " AND called_number = :called_number"
params['called_number'] = filters['called_number']
query = query.text + " ORDER BY created_at DESC"
with engine.connect() as connection:
results = connection.execute(query, params).fetchall()
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Failed to search call records for customer {self.customer.id}: {str(e)}")
raise