ARCollectionAgent / database.py
faerazo's picture
Commit to HFS
3bb6958 verified
# database.py - Updated for new schema with view option
from supabase import create_client
from typing import Dict, List, Optional
import pandas as pd
from datetime import datetime
from config import SUPABASE_URL, SUPABASE_KEY
# Initialize Supabase client
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
def ensure_days_past_due_current():
"""
Update days_past_due to current values.
Call this at startup or periodically to keep data fresh.
"""
try:
# This runs the UPDATE query to refresh days_past_due
supabase.rpc('refresh_overdue_days', {}).execute()
except:
# If the function doesn't exist, we can skip it
# The data will use whatever days_past_due values are in the table
pass
def query_database(query: str) -> Dict:
"""Execute AR queries against single ar_data table"""
try:
query_lower = query.lower()
# 1. Get all late payment customers
if "late" in query_lower and "customer" in query_lower:
result = supabase.table('ar_data')\
.select('*')\
.gt('days_past_due', 0)\
.is_('paid_date', 'null')\
.execute()
if result.data:
df = pd.DataFrame(result.data)
# Group by customer (simple grouping)
summary = df.groupby(['customer_id', 'company_name', 'customer_email']).agg({
'amount': 'sum',
'days_past_due': 'max',
'invoice_id': 'count'
}).reset_index()
summary.columns = ['customer_id', 'company_name', 'email', 'total_overdue', 'max_days_overdue', 'invoice_count']
return {
"success": True,
"data": summary.to_dict('records'),
"row_count": len(summary)
}
# 2. Get invoices overdue by X days
elif "invoice" in query_lower and "days" in query_lower:
import re
days_match = re.search(r'(\d+)\s*days?', query_lower)
days = int(days_match.group(1)) if days_match else 30
result = supabase.table('ar_data')\
.select('*')\
.gt('days_past_due', days)\
.is_('paid_date', 'null')\
.order('days_past_due', desc=True)\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
# 3. Get VIP customers with unpaid invoices
elif "vip" in query_lower:
result = supabase.table('ar_data')\
.select('*')\
.eq('vip_flag', True)\
.is_('paid_date', 'null')\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
# 4. Get repeat late payers
elif "repeat" in query_lower or "12" in query_lower:
result = supabase.table('ar_data')\
.select('*')\
.gt('num_late_12m', 2)\
.is_('paid_date', 'null')\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
# 5. Calculate total outstanding amount
elif "total" in query_lower and ("outstanding" in query_lower or "money" in query_lower):
result = supabase.table('ar_data')\
.select('amount, customer_id')\
.is_('paid_date', 'null')\
.execute()
if result.data:
df = pd.DataFrame(result.data)
total = df['amount'].sum()
customer_count = df['customer_id'].nunique()
return {
"success": True,
"data": [{
"total_outstanding": float(total),
"invoice_count": len(result.data),
"customer_count": customer_count,
"average_per_invoice": float(total / len(result.data))
}],
"row_count": 1
}
# 6. Get top N customers at risk
elif "top" in query_lower or "risk" in query_lower or "default" in query_lower:
import re
num_match = re.search(r'top\s*(\d+)', query_lower)
limit = int(num_match.group(1)) if num_match else 5
result = supabase.table('ar_data')\
.select('*')\
.is_('paid_date', 'null')\
.gt('days_past_due', 0)\
.execute()
if result.data:
df = pd.DataFrame(result.data)
# Calculate risk score (simplified)
df['risk_score'] = (
df['days_past_due'] * 0.3 +
df['num_late_12m'] * 20 +
df['prior_promises_broken'] * 30 +
(df['amount'] / 1000) * 0.1
)
# Group by customer
risk_summary = df.groupby(['customer_id', 'company_name', 'country']).agg({
'amount': 'sum',
'days_past_due': 'max',
'risk_score': 'sum',
'num_late_12m': 'max'
}).reset_index()
# Rename for consistency
risk_summary.rename(columns={'days_past_due': 'max_days_overdue'}, inplace=True)
# Sort by risk score and get top N
risk_summary = risk_summary.nlargest(limit, 'risk_score')
return {
"success": True,
"data": risk_summary.to_dict('records'),
"row_count": len(risk_summary)
}
# 7. Get by country
elif any(country in query_lower for country in ['sweden', 'norway', 'denmark', 'swedish', 'norwegian', 'danish']):
country_map = {
'swedish': 'Sweden', 'norwegian': 'Norway', 'danish': 'Denmark',
'sweden': 'Sweden', 'norway': 'Norway', 'denmark': 'Denmark'
}
country = None
for key, value in country_map.items():
if key in query_lower:
country = value
break
if country:
result = supabase.table('ar_data')\
.select('*')\
.eq('country', country)\
.is_('paid_date', 'null')\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
# 8. Get most overdue account(s)
elif "most" in query_lower and "overdue" in query_lower:
result = supabase.table('ar_data')\
.select('*')\
.gt('days_past_due', 0)\
.is_('paid_date', 'null')\
.order('days_past_due', desc=True)\
.limit(1)\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
# Default: Get all overdue invoices
else:
result = supabase.table('ar_data')\
.select('*')\
.gt('days_past_due', 0)\
.is_('paid_date', 'null')\
.order('days_past_due', desc=True)\
.execute()
return {
"success": True,
"data": result.data,
"row_count": len(result.data) if result.data else 0
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": [],
"row_count": 0
}
def get_sample_data() -> tuple:
"""Get sample data for UI preview from single ar_data table."""
try:
# Get sample AR data (simplified - no need for separate customer/invoice queries)
ar_sample = supabase.table('ar_data')\
.select('customer_id, company_name, country, segment, vip_flag, invoice_id, amount, due_date, days_past_due')\
.limit(5)\
.order('days_past_due', desc=True)\
.execute()
# Split into customer and invoice views for backward compatibility
if ar_sample.data:
df = pd.DataFrame(ar_sample.data)
# Customer view (unique customers only)
df_customers = df[['customer_id', 'company_name', 'country', 'segment', 'vip_flag']].drop_duplicates()
# Invoice view
df_invoices = df[['invoice_id', 'amount', 'due_date', 'days_past_due', 'company_name']].copy()
df_invoices.rename(columns={'days_past_due': 'days_overdue'}, inplace=True)
return df_customers, df_invoices
else:
return pd.DataFrame(), pd.DataFrame()
except Exception as e:
print(f"Error getting sample data: {e}")
return pd.DataFrame(), pd.DataFrame()
def get_full_customers(page: int = 0, page_size: int = 50, search: str = "") -> Dict:
"""Get customer data from ar_data table (unique customers only)."""
try:
# Get all customer data, then deduplicate in Python
query = supabase.table('ar_data').select('customer_id, representative_name, customer_email, company_name, country, segment, vip_flag')
# Add search filter if provided
if search:
query = query.or_(f'company_name.ilike.%{search}%,country.ilike.%{search}%,segment.ilike.%{search}%')
result = query.execute()
if result.data:
# Deduplicate customers using pandas
df = pd.DataFrame(result.data)
unique_customers = df.drop_duplicates(subset=['customer_id']).to_dict('records')
# Apply pagination to deduplicated results
start_idx = page * page_size
end_idx = start_idx + page_size
paginated_customers = unique_customers[start_idx:end_idx]
return {
"success": True,
"data": paginated_customers,
"total_count": len(unique_customers),
"page": page,
"page_size": page_size,
"total_pages": (len(unique_customers) + page_size - 1) // page_size
}
else:
return {
"success": True,
"data": [],
"total_count": 0,
"page": page,
"page_size": page_size,
"total_pages": 0
}
except Exception as e:
return {"success": False, "error": str(e), "data": []}
def get_full_invoices(page: int = 0, page_size: int = 50, search: str = "") -> Dict:
"""Get invoice data from ar_data table with pagination and search."""
try:
# Query ar_data table directly
query = supabase.table('ar_data').select('*')
# Add search filter if provided
if search:
query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%')
# Add pagination
start_idx = page * page_size
end_idx = start_idx + page_size - 1
result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute()
# Get total count from same table
count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute()
total_count = count_result.count or 0
return {
"success": True,
"data": result.data or [],
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
except Exception as e:
return {"success": False, "error": str(e), "data": []}
def get_email_activity(page: int = 0, page_size: int = 50) -> Dict:
"""Get email activity from dedicated mock_emails table with pagination."""
try:
# Get data directly from mock_emails table for better performance
start_idx = page * page_size
end_idx = start_idx + page_size - 1
result = supabase.table('mock_emails')\
.select('*')\
.range(start_idx, end_idx)\
.order('created_at', desc=True)\
.execute()
# Get total count
count_result = supabase.table('mock_emails')\
.select('id', count='exact')\
.execute()
total_count = count_result.count or 0
# Format the data for display
formatted_data = []
for record in result.data or []:
formatted_data.append({
"timestamp": record.get('created_at', ''),
"recipient": record.get('recipient', ''),
"subject": record.get('subject', ''),
"status": record.get('status', 'UNKNOWN'),
"invoice_id": record.get('invoice_id', ''),
"tone": record.get('tone', ''),
"body": record.get('body', '')
})
return {
"success": True,
"data": formatted_data,
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
except Exception as e:
# Fallback to demo_activity_log if mock_emails table fails
try:
result = supabase.table('demo_activity_log')\
.select('*')\
.eq('action_type', 'mock_email_created')\
.range(start_idx, end_idx)\
.order('created_at', desc=True)\
.execute()
count_result = supabase.table('demo_activity_log')\
.select('id', count='exact')\
.eq('action_type', 'mock_email_created')\
.execute()
total_count = count_result.count or 0
formatted_data = []
for record in result.data or []:
details = record.get('details', {})
formatted_data.append({
"timestamp": details.get('timestamp', record.get('created_at', '')),
"recipient": details.get('recipient', ''),
"subject": details.get('subject', ''),
"status": details.get('status', 'UNKNOWN'),
"invoice_id": details.get('invoice_id', ''),
"tone": details.get('tone', ''),
"body": details.get('body', '')
})
return {
"success": True,
"data": formatted_data,
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
except:
return {"success": False, "error": f"Email retrieval failed: {str(e)}", "data": []}
# Removed get_consolidated_ar_data - replaced by get_basic_ar_data using single ar_data table
def get_activity_log(page: int = 0, page_size: int = 50) -> Dict:
"""Get all activity log data with pagination."""
try:
start_idx = page * page_size
end_idx = start_idx + page_size - 1
result = supabase.table('demo_activity_log')\
.select('*')\
.range(start_idx, end_idx)\
.order('created_at', desc=True)\
.execute()
# Get total count
count_result = supabase.table('demo_activity_log')\
.select('id', count='exact')\
.execute()
total_count = count_result.count or 0
return {
"success": True,
"data": result.data or [],
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
except Exception as e:
return {"success": False, "error": str(e), "data": []}
def store_mock_email(email_record: Dict) -> None:
"""Store mock email in dedicated mock_emails table."""
try:
supabase.table('mock_emails').insert({
"recipient": email_record.get("recipient", ""),
"subject": email_record.get("subject", ""),
"body": email_record.get("body", ""),
"status": email_record.get("status", "MOCK - NOT SENT"),
"tone": email_record.get("tone", "friendly"),
"invoice_id": email_record.get("invoice_id", ""),
"customer_id": email_record.get("recipient", "").split('@')[0] # Extract customer ID from email
}).execute()
except Exception as e:
print(f"Mock email storage error: {e}")
def log_activity(action_type: str, customer_id: str, details: Dict) -> None:
"""Log agent activity for demo purposes."""
try:
supabase.table('demo_activity_log').insert({
"action_type": action_type,
"customer_id": customer_id,
"details": details,
"simulated": True
}).execute()
except Exception as e:
print(f"Activity logging error: {e}")
def get_basic_ar_data(page: int = 0, page_size: int = 50, search: str = "") -> Dict:
"""Get AR data from single ar_data table - ultra-simple approach."""
try:
# Query the single ar_data table - no JOINs needed!
query = supabase.table('ar_data').select('*')
# Simple search across key fields
if search:
query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%,country.ilike.%{search}%,customer_id.ilike.%{search}%')
# Get data with pagination, ordered by most overdue first
start_idx = page * page_size
end_idx = start_idx + page_size - 1
result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute()
# Simple data formatting - no complex processing needed
formatted_data = []
for record in result.data or []:
formatted_record = {
'Invoice ID': record.get('invoice_id', ''),
'Company Name': record.get('company_name', ''),
'Email': record.get('customer_email', ''),
'Country': record.get('country', ''),
'Amount': f"€{record.get('amount', 0):,.2f}",
'Due Date': record.get('due_date', ''),
'Days Overdue': record.get('days_past_due', 0),
'VIP': 'Yes' if record.get('vip_flag', False) else 'No',
'Status': 'Paid' if record.get('paid_date') else 'Outstanding',
'Segment': record.get('segment', ''),
'Rep': record.get('representative_name', '')
}
formatted_data.append(formatted_record)
# Get total count from same table
count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute()
total_count = count_result.count or 0
return {
"success": True,
"data": formatted_data,
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
except Exception as e:
return {"success": False, "error": str(e), "data": []}
def validate_database_setup() -> Dict:
"""Validate database connection and required objects exist - single table approach."""
validation_results = {
"success": True,
"errors": [],
"warnings": [],
"info": []
}
try:
# Test basic connection using the main ar_data table
result = supabase.table('ar_data').select('invoice_id').limit(1).execute()
validation_results["info"].append(f"βœ… Database connection: OK ({len(result.data)} AR records found)")
# Test required tables exist (simplified structure)
tables_to_check = ['ar_data', 'demo_activity_log', 'mock_emails']
for table in tables_to_check:
try:
result = supabase.table(table).select('*').limit(1).execute()
validation_results["info"].append(f"βœ… Table '{table}': OK")
except Exception as e:
validation_results["errors"].append(f"❌ Table '{table}': Missing or inaccessible ({str(e)[:50]})")
validation_results["success"] = False
# Check if AR data exists
try:
ar_result = supabase.table('ar_data').select('invoice_id', count='exact').execute()
ar_count = ar_result.count or 0
if ar_count == 0:
validation_results["warnings"].append("⚠️ No AR data found. Run seeds.sql to populate demo data")
else:
validation_results["info"].append(f"βœ… Data check: {ar_count} AR records in single table")
# Check data distribution
vip_result = supabase.table('ar_data').select('invoice_id', count='exact').eq('vip_flag', True).execute()
overdue_result = supabase.table('ar_data').select('invoice_id', count='exact').gt('days_past_due', 0).execute()
vip_count = vip_result.count or 0
overdue_count = overdue_result.count or 0
validation_results["info"].append(f"βœ… Data breakdown: {vip_count} VIP records, {overdue_count} overdue records")
except Exception as e:
validation_results["warnings"].append(f"⚠️ Could not check AR data counts: {str(e)[:50]}")
except Exception as e:
validation_results["errors"].append(f"❌ Critical database connection error: {str(e)}")
validation_results["success"] = False
return validation_results