# database.py - Updated for new schema with view option from supabase import create_client from typing import Dict, List, Optional import pandas as pd from datetime import datetime from config import SUPABASE_URL, SUPABASE_KEY # Initialize Supabase client supabase = create_client(SUPABASE_URL, SUPABASE_KEY) def ensure_days_past_due_current(): """ Update days_past_due to current values. Call this at startup or periodically to keep data fresh. """ try: # This runs the UPDATE query to refresh days_past_due supabase.rpc('refresh_overdue_days', {}).execute() except: # If the function doesn't exist, we can skip it # The data will use whatever days_past_due values are in the table pass def query_database(query: str) -> Dict: """Execute AR queries against single ar_data table""" try: query_lower = query.lower() # 1. Get all late payment customers if "late" in query_lower and "customer" in query_lower: result = supabase.table('ar_data')\ .select('*')\ .gt('days_past_due', 0)\ .is_('paid_date', 'null')\ .execute() if result.data: df = pd.DataFrame(result.data) # Group by customer (simple grouping) summary = df.groupby(['customer_id', 'company_name', 'customer_email']).agg({ 'amount': 'sum', 'days_past_due': 'max', 'invoice_id': 'count' }).reset_index() summary.columns = ['customer_id', 'company_name', 'email', 'total_overdue', 'max_days_overdue', 'invoice_count'] return { "success": True, "data": summary.to_dict('records'), "row_count": len(summary) } # 2. Get invoices overdue by X days elif "invoice" in query_lower and "days" in query_lower: import re days_match = re.search(r'(\d+)\s*days?', query_lower) days = int(days_match.group(1)) if days_match else 30 result = supabase.table('ar_data')\ .select('*')\ .gt('days_past_due', days)\ .is_('paid_date', 'null')\ .order('days_past_due', desc=True)\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } # 3. Get VIP customers with unpaid invoices elif "vip" in query_lower: result = supabase.table('ar_data')\ .select('*')\ .eq('vip_flag', True)\ .is_('paid_date', 'null')\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } # 4. Get repeat late payers elif "repeat" in query_lower or "12" in query_lower: result = supabase.table('ar_data')\ .select('*')\ .gt('num_late_12m', 2)\ .is_('paid_date', 'null')\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } # 5. Calculate total outstanding amount elif "total" in query_lower and ("outstanding" in query_lower or "money" in query_lower): result = supabase.table('ar_data')\ .select('amount, customer_id')\ .is_('paid_date', 'null')\ .execute() if result.data: df = pd.DataFrame(result.data) total = df['amount'].sum() customer_count = df['customer_id'].nunique() return { "success": True, "data": [{ "total_outstanding": float(total), "invoice_count": len(result.data), "customer_count": customer_count, "average_per_invoice": float(total / len(result.data)) }], "row_count": 1 } # 6. Get top N customers at risk elif "top" in query_lower or "risk" in query_lower or "default" in query_lower: import re num_match = re.search(r'top\s*(\d+)', query_lower) limit = int(num_match.group(1)) if num_match else 5 result = supabase.table('ar_data')\ .select('*')\ .is_('paid_date', 'null')\ .gt('days_past_due', 0)\ .execute() if result.data: df = pd.DataFrame(result.data) # Calculate risk score (simplified) df['risk_score'] = ( df['days_past_due'] * 0.3 + df['num_late_12m'] * 20 + df['prior_promises_broken'] * 30 + (df['amount'] / 1000) * 0.1 ) # Group by customer risk_summary = df.groupby(['customer_id', 'company_name', 'country']).agg({ 'amount': 'sum', 'days_past_due': 'max', 'risk_score': 'sum', 'num_late_12m': 'max' }).reset_index() # Rename for consistency risk_summary.rename(columns={'days_past_due': 'max_days_overdue'}, inplace=True) # Sort by risk score and get top N risk_summary = risk_summary.nlargest(limit, 'risk_score') return { "success": True, "data": risk_summary.to_dict('records'), "row_count": len(risk_summary) } # 7. Get by country elif any(country in query_lower for country in ['sweden', 'norway', 'denmark', 'swedish', 'norwegian', 'danish']): country_map = { 'swedish': 'Sweden', 'norwegian': 'Norway', 'danish': 'Denmark', 'sweden': 'Sweden', 'norway': 'Norway', 'denmark': 'Denmark' } country = None for key, value in country_map.items(): if key in query_lower: country = value break if country: result = supabase.table('ar_data')\ .select('*')\ .eq('country', country)\ .is_('paid_date', 'null')\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } # 8. Get most overdue account(s) elif "most" in query_lower and "overdue" in query_lower: result = supabase.table('ar_data')\ .select('*')\ .gt('days_past_due', 0)\ .is_('paid_date', 'null')\ .order('days_past_due', desc=True)\ .limit(1)\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } # Default: Get all overdue invoices else: result = supabase.table('ar_data')\ .select('*')\ .gt('days_past_due', 0)\ .is_('paid_date', 'null')\ .order('days_past_due', desc=True)\ .execute() return { "success": True, "data": result.data, "row_count": len(result.data) if result.data else 0 } except Exception as e: return { "success": False, "error": str(e), "data": [], "row_count": 0 } def get_sample_data() -> tuple: """Get sample data for UI preview from single ar_data table.""" try: # Get sample AR data (simplified - no need for separate customer/invoice queries) ar_sample = supabase.table('ar_data')\ .select('customer_id, company_name, country, segment, vip_flag, invoice_id, amount, due_date, days_past_due')\ .limit(5)\ .order('days_past_due', desc=True)\ .execute() # Split into customer and invoice views for backward compatibility if ar_sample.data: df = pd.DataFrame(ar_sample.data) # Customer view (unique customers only) df_customers = df[['customer_id', 'company_name', 'country', 'segment', 'vip_flag']].drop_duplicates() # Invoice view df_invoices = df[['invoice_id', 'amount', 'due_date', 'days_past_due', 'company_name']].copy() df_invoices.rename(columns={'days_past_due': 'days_overdue'}, inplace=True) return df_customers, df_invoices else: return pd.DataFrame(), pd.DataFrame() except Exception as e: print(f"Error getting sample data: {e}") return pd.DataFrame(), pd.DataFrame() def get_full_customers(page: int = 0, page_size: int = 50, search: str = "") -> Dict: """Get customer data from ar_data table (unique customers only).""" try: # Get all customer data, then deduplicate in Python query = supabase.table('ar_data').select('customer_id, representative_name, customer_email, company_name, country, segment, vip_flag') # Add search filter if provided if search: query = query.or_(f'company_name.ilike.%{search}%,country.ilike.%{search}%,segment.ilike.%{search}%') result = query.execute() if result.data: # Deduplicate customers using pandas df = pd.DataFrame(result.data) unique_customers = df.drop_duplicates(subset=['customer_id']).to_dict('records') # Apply pagination to deduplicated results start_idx = page * page_size end_idx = start_idx + page_size paginated_customers = unique_customers[start_idx:end_idx] return { "success": True, "data": paginated_customers, "total_count": len(unique_customers), "page": page, "page_size": page_size, "total_pages": (len(unique_customers) + page_size - 1) // page_size } else: return { "success": True, "data": [], "total_count": 0, "page": page, "page_size": page_size, "total_pages": 0 } except Exception as e: return {"success": False, "error": str(e), "data": []} def get_full_invoices(page: int = 0, page_size: int = 50, search: str = "") -> Dict: """Get invoice data from ar_data table with pagination and search.""" try: # Query ar_data table directly query = supabase.table('ar_data').select('*') # Add search filter if provided if search: query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%') # Add pagination start_idx = page * page_size end_idx = start_idx + page_size - 1 result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute() # Get total count from same table count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() total_count = count_result.count or 0 return { "success": True, "data": result.data or [], "total_count": total_count, "page": page, "page_size": page_size, "total_pages": (total_count + page_size - 1) // page_size } except Exception as e: return {"success": False, "error": str(e), "data": []} def get_email_activity(page: int = 0, page_size: int = 50) -> Dict: """Get email activity from dedicated mock_emails table with pagination.""" try: # Get data directly from mock_emails table for better performance start_idx = page * page_size end_idx = start_idx + page_size - 1 result = supabase.table('mock_emails')\ .select('*')\ .range(start_idx, end_idx)\ .order('created_at', desc=True)\ .execute() # Get total count count_result = supabase.table('mock_emails')\ .select('id', count='exact')\ .execute() total_count = count_result.count or 0 # Format the data for display formatted_data = [] for record in result.data or []: formatted_data.append({ "timestamp": record.get('created_at', ''), "recipient": record.get('recipient', ''), "subject": record.get('subject', ''), "status": record.get('status', 'UNKNOWN'), "invoice_id": record.get('invoice_id', ''), "tone": record.get('tone', ''), "body": record.get('body', '') }) return { "success": True, "data": formatted_data, "total_count": total_count, "page": page, "page_size": page_size, "total_pages": (total_count + page_size - 1) // page_size } except Exception as e: # Fallback to demo_activity_log if mock_emails table fails try: result = supabase.table('demo_activity_log')\ .select('*')\ .eq('action_type', 'mock_email_created')\ .range(start_idx, end_idx)\ .order('created_at', desc=True)\ .execute() count_result = supabase.table('demo_activity_log')\ .select('id', count='exact')\ .eq('action_type', 'mock_email_created')\ .execute() total_count = count_result.count or 0 formatted_data = [] for record in result.data or []: details = record.get('details', {}) formatted_data.append({ "timestamp": details.get('timestamp', record.get('created_at', '')), "recipient": details.get('recipient', ''), "subject": details.get('subject', ''), "status": details.get('status', 'UNKNOWN'), "invoice_id": details.get('invoice_id', ''), "tone": details.get('tone', ''), "body": details.get('body', '') }) return { "success": True, "data": formatted_data, "total_count": total_count, "page": page, "page_size": page_size, "total_pages": (total_count + page_size - 1) // page_size } except: return {"success": False, "error": f"Email retrieval failed: {str(e)}", "data": []} # Removed get_consolidated_ar_data - replaced by get_basic_ar_data using single ar_data table def get_activity_log(page: int = 0, page_size: int = 50) -> Dict: """Get all activity log data with pagination.""" try: start_idx = page * page_size end_idx = start_idx + page_size - 1 result = supabase.table('demo_activity_log')\ .select('*')\ .range(start_idx, end_idx)\ .order('created_at', desc=True)\ .execute() # Get total count count_result = supabase.table('demo_activity_log')\ .select('id', count='exact')\ .execute() total_count = count_result.count or 0 return { "success": True, "data": result.data or [], "total_count": total_count, "page": page, "page_size": page_size, "total_pages": (total_count + page_size - 1) // page_size } except Exception as e: return {"success": False, "error": str(e), "data": []} def store_mock_email(email_record: Dict) -> None: """Store mock email in dedicated mock_emails table.""" try: supabase.table('mock_emails').insert({ "recipient": email_record.get("recipient", ""), "subject": email_record.get("subject", ""), "body": email_record.get("body", ""), "status": email_record.get("status", "MOCK - NOT SENT"), "tone": email_record.get("tone", "friendly"), "invoice_id": email_record.get("invoice_id", ""), "customer_id": email_record.get("recipient", "").split('@')[0] # Extract customer ID from email }).execute() except Exception as e: print(f"Mock email storage error: {e}") def log_activity(action_type: str, customer_id: str, details: Dict) -> None: """Log agent activity for demo purposes.""" try: supabase.table('demo_activity_log').insert({ "action_type": action_type, "customer_id": customer_id, "details": details, "simulated": True }).execute() except Exception as e: print(f"Activity logging error: {e}") def get_basic_ar_data(page: int = 0, page_size: int = 50, search: str = "") -> Dict: """Get AR data from single ar_data table - ultra-simple approach.""" try: # Query the single ar_data table - no JOINs needed! query = supabase.table('ar_data').select('*') # Simple search across key fields if search: query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%,country.ilike.%{search}%,customer_id.ilike.%{search}%') # Get data with pagination, ordered by most overdue first start_idx = page * page_size end_idx = start_idx + page_size - 1 result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute() # Simple data formatting - no complex processing needed formatted_data = [] for record in result.data or []: formatted_record = { 'Invoice ID': record.get('invoice_id', ''), 'Company Name': record.get('company_name', ''), 'Email': record.get('customer_email', ''), 'Country': record.get('country', ''), 'Amount': f"€{record.get('amount', 0):,.2f}", 'Due Date': record.get('due_date', ''), 'Days Overdue': record.get('days_past_due', 0), 'VIP': 'Yes' if record.get('vip_flag', False) else 'No', 'Status': 'Paid' if record.get('paid_date') else 'Outstanding', 'Segment': record.get('segment', ''), 'Rep': record.get('representative_name', '') } formatted_data.append(formatted_record) # Get total count from same table count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() total_count = count_result.count or 0 return { "success": True, "data": formatted_data, "total_count": total_count, "page": page, "page_size": page_size, "total_pages": (total_count + page_size - 1) // page_size } except Exception as e: return {"success": False, "error": str(e), "data": []} def validate_database_setup() -> Dict: """Validate database connection and required objects exist - single table approach.""" validation_results = { "success": True, "errors": [], "warnings": [], "info": [] } try: # Test basic connection using the main ar_data table result = supabase.table('ar_data').select('invoice_id').limit(1).execute() validation_results["info"].append(f"✅ Database connection: OK ({len(result.data)} AR records found)") # Test required tables exist (simplified structure) tables_to_check = ['ar_data', 'demo_activity_log', 'mock_emails'] for table in tables_to_check: try: result = supabase.table(table).select('*').limit(1).execute() validation_results["info"].append(f"✅ Table '{table}': OK") except Exception as e: validation_results["errors"].append(f"❌ Table '{table}': Missing or inaccessible ({str(e)[:50]})") validation_results["success"] = False # Check if AR data exists try: ar_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() ar_count = ar_result.count or 0 if ar_count == 0: validation_results["warnings"].append("⚠️ No AR data found. Run seeds.sql to populate demo data") else: validation_results["info"].append(f"✅ Data check: {ar_count} AR records in single table") # Check data distribution vip_result = supabase.table('ar_data').select('invoice_id', count='exact').eq('vip_flag', True).execute() overdue_result = supabase.table('ar_data').select('invoice_id', count='exact').gt('days_past_due', 0).execute() vip_count = vip_result.count or 0 overdue_count = overdue_result.count or 0 validation_results["info"].append(f"✅ Data breakdown: {vip_count} VIP records, {overdue_count} overdue records") except Exception as e: validation_results["warnings"].append(f"⚠️ Could not check AR data counts: {str(e)[:50]}") except Exception as e: validation_results["errors"].append(f"❌ Critical database connection error: {str(e)}") validation_results["success"] = False return validation_results