Spaces:
Sleeping
Sleeping
| # database.py - Updated for new schema with view option | |
| from supabase import create_client | |
| from typing import Dict, List, Optional | |
| import pandas as pd | |
| from datetime import datetime | |
| from config import SUPABASE_URL, SUPABASE_KEY | |
| # Initialize Supabase client | |
| supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| def ensure_days_past_due_current(): | |
| """ | |
| Update days_past_due to current values. | |
| Call this at startup or periodically to keep data fresh. | |
| """ | |
| try: | |
| # This runs the UPDATE query to refresh days_past_due | |
| supabase.rpc('refresh_overdue_days', {}).execute() | |
| except: | |
| # If the function doesn't exist, we can skip it | |
| # The data will use whatever days_past_due values are in the table | |
| pass | |
| def query_database(query: str) -> Dict: | |
| """Execute AR queries against single ar_data table""" | |
| try: | |
| query_lower = query.lower() | |
| # 1. Get all late payment customers | |
| if "late" in query_lower and "customer" in query_lower: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .gt('days_past_due', 0)\ | |
| .is_('paid_date', 'null')\ | |
| .execute() | |
| if result.data: | |
| df = pd.DataFrame(result.data) | |
| # Group by customer (simple grouping) | |
| summary = df.groupby(['customer_id', 'company_name', 'customer_email']).agg({ | |
| 'amount': 'sum', | |
| 'days_past_due': 'max', | |
| 'invoice_id': 'count' | |
| }).reset_index() | |
| summary.columns = ['customer_id', 'company_name', 'email', 'total_overdue', 'max_days_overdue', 'invoice_count'] | |
| return { | |
| "success": True, | |
| "data": summary.to_dict('records'), | |
| "row_count": len(summary) | |
| } | |
| # 2. Get invoices overdue by X days | |
| elif "invoice" in query_lower and "days" in query_lower: | |
| import re | |
| days_match = re.search(r'(\d+)\s*days?', query_lower) | |
| days = int(days_match.group(1)) if days_match else 30 | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .gt('days_past_due', days)\ | |
| .is_('paid_date', 'null')\ | |
| .order('days_past_due', desc=True)\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| # 3. Get VIP customers with unpaid invoices | |
| elif "vip" in query_lower: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .eq('vip_flag', True)\ | |
| .is_('paid_date', 'null')\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| # 4. Get repeat late payers | |
| elif "repeat" in query_lower or "12" in query_lower: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .gt('num_late_12m', 2)\ | |
| .is_('paid_date', 'null')\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| # 5. Calculate total outstanding amount | |
| elif "total" in query_lower and ("outstanding" in query_lower or "money" in query_lower): | |
| result = supabase.table('ar_data')\ | |
| .select('amount, customer_id')\ | |
| .is_('paid_date', 'null')\ | |
| .execute() | |
| if result.data: | |
| df = pd.DataFrame(result.data) | |
| total = df['amount'].sum() | |
| customer_count = df['customer_id'].nunique() | |
| return { | |
| "success": True, | |
| "data": [{ | |
| "total_outstanding": float(total), | |
| "invoice_count": len(result.data), | |
| "customer_count": customer_count, | |
| "average_per_invoice": float(total / len(result.data)) | |
| }], | |
| "row_count": 1 | |
| } | |
| # 6. Get top N customers at risk | |
| elif "top" in query_lower or "risk" in query_lower or "default" in query_lower: | |
| import re | |
| num_match = re.search(r'top\s*(\d+)', query_lower) | |
| limit = int(num_match.group(1)) if num_match else 5 | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .is_('paid_date', 'null')\ | |
| .gt('days_past_due', 0)\ | |
| .execute() | |
| if result.data: | |
| df = pd.DataFrame(result.data) | |
| # Calculate risk score (simplified) | |
| df['risk_score'] = ( | |
| df['days_past_due'] * 0.3 + | |
| df['num_late_12m'] * 20 + | |
| df['prior_promises_broken'] * 30 + | |
| (df['amount'] / 1000) * 0.1 | |
| ) | |
| # Group by customer | |
| risk_summary = df.groupby(['customer_id', 'company_name', 'country']).agg({ | |
| 'amount': 'sum', | |
| 'days_past_due': 'max', | |
| 'risk_score': 'sum', | |
| 'num_late_12m': 'max' | |
| }).reset_index() | |
| # Rename for consistency | |
| risk_summary.rename(columns={'days_past_due': 'max_days_overdue'}, inplace=True) | |
| # Sort by risk score and get top N | |
| risk_summary = risk_summary.nlargest(limit, 'risk_score') | |
| return { | |
| "success": True, | |
| "data": risk_summary.to_dict('records'), | |
| "row_count": len(risk_summary) | |
| } | |
| # 7. Get by country | |
| elif any(country in query_lower for country in ['sweden', 'norway', 'denmark', 'swedish', 'norwegian', 'danish']): | |
| country_map = { | |
| 'swedish': 'Sweden', 'norwegian': 'Norway', 'danish': 'Denmark', | |
| 'sweden': 'Sweden', 'norway': 'Norway', 'denmark': 'Denmark' | |
| } | |
| country = None | |
| for key, value in country_map.items(): | |
| if key in query_lower: | |
| country = value | |
| break | |
| if country: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .eq('country', country)\ | |
| .is_('paid_date', 'null')\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| # 8. Get most overdue account(s) | |
| elif "most" in query_lower and "overdue" in query_lower: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .gt('days_past_due', 0)\ | |
| .is_('paid_date', 'null')\ | |
| .order('days_past_due', desc=True)\ | |
| .limit(1)\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| # Default: Get all overdue invoices | |
| else: | |
| result = supabase.table('ar_data')\ | |
| .select('*')\ | |
| .gt('days_past_due', 0)\ | |
| .is_('paid_date', 'null')\ | |
| .order('days_past_due', desc=True)\ | |
| .execute() | |
| return { | |
| "success": True, | |
| "data": result.data, | |
| "row_count": len(result.data) if result.data else 0 | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "data": [], | |
| "row_count": 0 | |
| } | |
| def get_sample_data() -> tuple: | |
| """Get sample data for UI preview from single ar_data table.""" | |
| try: | |
| # Get sample AR data (simplified - no need for separate customer/invoice queries) | |
| ar_sample = supabase.table('ar_data')\ | |
| .select('customer_id, company_name, country, segment, vip_flag, invoice_id, amount, due_date, days_past_due')\ | |
| .limit(5)\ | |
| .order('days_past_due', desc=True)\ | |
| .execute() | |
| # Split into customer and invoice views for backward compatibility | |
| if ar_sample.data: | |
| df = pd.DataFrame(ar_sample.data) | |
| # Customer view (unique customers only) | |
| df_customers = df[['customer_id', 'company_name', 'country', 'segment', 'vip_flag']].drop_duplicates() | |
| # Invoice view | |
| df_invoices = df[['invoice_id', 'amount', 'due_date', 'days_past_due', 'company_name']].copy() | |
| df_invoices.rename(columns={'days_past_due': 'days_overdue'}, inplace=True) | |
| return df_customers, df_invoices | |
| else: | |
| return pd.DataFrame(), pd.DataFrame() | |
| except Exception as e: | |
| print(f"Error getting sample data: {e}") | |
| return pd.DataFrame(), pd.DataFrame() | |
| def get_full_customers(page: int = 0, page_size: int = 50, search: str = "") -> Dict: | |
| """Get customer data from ar_data table (unique customers only).""" | |
| try: | |
| # Get all customer data, then deduplicate in Python | |
| query = supabase.table('ar_data').select('customer_id, representative_name, customer_email, company_name, country, segment, vip_flag') | |
| # Add search filter if provided | |
| if search: | |
| query = query.or_(f'company_name.ilike.%{search}%,country.ilike.%{search}%,segment.ilike.%{search}%') | |
| result = query.execute() | |
| if result.data: | |
| # Deduplicate customers using pandas | |
| df = pd.DataFrame(result.data) | |
| unique_customers = df.drop_duplicates(subset=['customer_id']).to_dict('records') | |
| # Apply pagination to deduplicated results | |
| start_idx = page * page_size | |
| end_idx = start_idx + page_size | |
| paginated_customers = unique_customers[start_idx:end_idx] | |
| return { | |
| "success": True, | |
| "data": paginated_customers, | |
| "total_count": len(unique_customers), | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (len(unique_customers) + page_size - 1) // page_size | |
| } | |
| else: | |
| return { | |
| "success": True, | |
| "data": [], | |
| "total_count": 0, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": 0 | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e), "data": []} | |
| def get_full_invoices(page: int = 0, page_size: int = 50, search: str = "") -> Dict: | |
| """Get invoice data from ar_data table with pagination and search.""" | |
| try: | |
| # Query ar_data table directly | |
| query = supabase.table('ar_data').select('*') | |
| # Add search filter if provided | |
| if search: | |
| query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%') | |
| # Add pagination | |
| start_idx = page * page_size | |
| end_idx = start_idx + page_size - 1 | |
| result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute() | |
| # Get total count from same table | |
| count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() | |
| total_count = count_result.count or 0 | |
| return { | |
| "success": True, | |
| "data": result.data or [], | |
| "total_count": total_count, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total_count + page_size - 1) // page_size | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e), "data": []} | |
| def get_email_activity(page: int = 0, page_size: int = 50) -> Dict: | |
| """Get email activity from dedicated mock_emails table with pagination.""" | |
| try: | |
| # Get data directly from mock_emails table for better performance | |
| start_idx = page * page_size | |
| end_idx = start_idx + page_size - 1 | |
| result = supabase.table('mock_emails')\ | |
| .select('*')\ | |
| .range(start_idx, end_idx)\ | |
| .order('created_at', desc=True)\ | |
| .execute() | |
| # Get total count | |
| count_result = supabase.table('mock_emails')\ | |
| .select('id', count='exact')\ | |
| .execute() | |
| total_count = count_result.count or 0 | |
| # Format the data for display | |
| formatted_data = [] | |
| for record in result.data or []: | |
| formatted_data.append({ | |
| "timestamp": record.get('created_at', ''), | |
| "recipient": record.get('recipient', ''), | |
| "subject": record.get('subject', ''), | |
| "status": record.get('status', 'UNKNOWN'), | |
| "invoice_id": record.get('invoice_id', ''), | |
| "tone": record.get('tone', ''), | |
| "body": record.get('body', '') | |
| }) | |
| return { | |
| "success": True, | |
| "data": formatted_data, | |
| "total_count": total_count, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total_count + page_size - 1) // page_size | |
| } | |
| except Exception as e: | |
| # Fallback to demo_activity_log if mock_emails table fails | |
| try: | |
| result = supabase.table('demo_activity_log')\ | |
| .select('*')\ | |
| .eq('action_type', 'mock_email_created')\ | |
| .range(start_idx, end_idx)\ | |
| .order('created_at', desc=True)\ | |
| .execute() | |
| count_result = supabase.table('demo_activity_log')\ | |
| .select('id', count='exact')\ | |
| .eq('action_type', 'mock_email_created')\ | |
| .execute() | |
| total_count = count_result.count or 0 | |
| formatted_data = [] | |
| for record in result.data or []: | |
| details = record.get('details', {}) | |
| formatted_data.append({ | |
| "timestamp": details.get('timestamp', record.get('created_at', '')), | |
| "recipient": details.get('recipient', ''), | |
| "subject": details.get('subject', ''), | |
| "status": details.get('status', 'UNKNOWN'), | |
| "invoice_id": details.get('invoice_id', ''), | |
| "tone": details.get('tone', ''), | |
| "body": details.get('body', '') | |
| }) | |
| return { | |
| "success": True, | |
| "data": formatted_data, | |
| "total_count": total_count, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total_count + page_size - 1) // page_size | |
| } | |
| except: | |
| return {"success": False, "error": f"Email retrieval failed: {str(e)}", "data": []} | |
| # Removed get_consolidated_ar_data - replaced by get_basic_ar_data using single ar_data table | |
| def get_activity_log(page: int = 0, page_size: int = 50) -> Dict: | |
| """Get all activity log data with pagination.""" | |
| try: | |
| start_idx = page * page_size | |
| end_idx = start_idx + page_size - 1 | |
| result = supabase.table('demo_activity_log')\ | |
| .select('*')\ | |
| .range(start_idx, end_idx)\ | |
| .order('created_at', desc=True)\ | |
| .execute() | |
| # Get total count | |
| count_result = supabase.table('demo_activity_log')\ | |
| .select('id', count='exact')\ | |
| .execute() | |
| total_count = count_result.count or 0 | |
| return { | |
| "success": True, | |
| "data": result.data or [], | |
| "total_count": total_count, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total_count + page_size - 1) // page_size | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e), "data": []} | |
| def store_mock_email(email_record: Dict) -> None: | |
| """Store mock email in dedicated mock_emails table.""" | |
| try: | |
| supabase.table('mock_emails').insert({ | |
| "recipient": email_record.get("recipient", ""), | |
| "subject": email_record.get("subject", ""), | |
| "body": email_record.get("body", ""), | |
| "status": email_record.get("status", "MOCK - NOT SENT"), | |
| "tone": email_record.get("tone", "friendly"), | |
| "invoice_id": email_record.get("invoice_id", ""), | |
| "customer_id": email_record.get("recipient", "").split('@')[0] # Extract customer ID from email | |
| }).execute() | |
| except Exception as e: | |
| print(f"Mock email storage error: {e}") | |
| def log_activity(action_type: str, customer_id: str, details: Dict) -> None: | |
| """Log agent activity for demo purposes.""" | |
| try: | |
| supabase.table('demo_activity_log').insert({ | |
| "action_type": action_type, | |
| "customer_id": customer_id, | |
| "details": details, | |
| "simulated": True | |
| }).execute() | |
| except Exception as e: | |
| print(f"Activity logging error: {e}") | |
| def get_basic_ar_data(page: int = 0, page_size: int = 50, search: str = "") -> Dict: | |
| """Get AR data from single ar_data table - ultra-simple approach.""" | |
| try: | |
| # Query the single ar_data table - no JOINs needed! | |
| query = supabase.table('ar_data').select('*') | |
| # Simple search across key fields | |
| if search: | |
| query = query.or_(f'invoice_id.ilike.%{search}%,company_name.ilike.%{search}%,customer_email.ilike.%{search}%,country.ilike.%{search}%,customer_id.ilike.%{search}%') | |
| # Get data with pagination, ordered by most overdue first | |
| start_idx = page * page_size | |
| end_idx = start_idx + page_size - 1 | |
| result = query.range(start_idx, end_idx).order('days_past_due', desc=True).execute() | |
| # Simple data formatting - no complex processing needed | |
| formatted_data = [] | |
| for record in result.data or []: | |
| formatted_record = { | |
| 'Invoice ID': record.get('invoice_id', ''), | |
| 'Company Name': record.get('company_name', ''), | |
| 'Email': record.get('customer_email', ''), | |
| 'Country': record.get('country', ''), | |
| 'Amount': f"β¬{record.get('amount', 0):,.2f}", | |
| 'Due Date': record.get('due_date', ''), | |
| 'Days Overdue': record.get('days_past_due', 0), | |
| 'VIP': 'Yes' if record.get('vip_flag', False) else 'No', | |
| 'Status': 'Paid' if record.get('paid_date') else 'Outstanding', | |
| 'Segment': record.get('segment', ''), | |
| 'Rep': record.get('representative_name', '') | |
| } | |
| formatted_data.append(formatted_record) | |
| # Get total count from same table | |
| count_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() | |
| total_count = count_result.count or 0 | |
| return { | |
| "success": True, | |
| "data": formatted_data, | |
| "total_count": total_count, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total_count + page_size - 1) // page_size | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e), "data": []} | |
| def validate_database_setup() -> Dict: | |
| """Validate database connection and required objects exist - single table approach.""" | |
| validation_results = { | |
| "success": True, | |
| "errors": [], | |
| "warnings": [], | |
| "info": [] | |
| } | |
| try: | |
| # Test basic connection using the main ar_data table | |
| result = supabase.table('ar_data').select('invoice_id').limit(1).execute() | |
| validation_results["info"].append(f"β Database connection: OK ({len(result.data)} AR records found)") | |
| # Test required tables exist (simplified structure) | |
| tables_to_check = ['ar_data', 'demo_activity_log', 'mock_emails'] | |
| for table in tables_to_check: | |
| try: | |
| result = supabase.table(table).select('*').limit(1).execute() | |
| validation_results["info"].append(f"β Table '{table}': OK") | |
| except Exception as e: | |
| validation_results["errors"].append(f"β Table '{table}': Missing or inaccessible ({str(e)[:50]})") | |
| validation_results["success"] = False | |
| # Check if AR data exists | |
| try: | |
| ar_result = supabase.table('ar_data').select('invoice_id', count='exact').execute() | |
| ar_count = ar_result.count or 0 | |
| if ar_count == 0: | |
| validation_results["warnings"].append("β οΈ No AR data found. Run seeds.sql to populate demo data") | |
| else: | |
| validation_results["info"].append(f"β Data check: {ar_count} AR records in single table") | |
| # Check data distribution | |
| vip_result = supabase.table('ar_data').select('invoice_id', count='exact').eq('vip_flag', True).execute() | |
| overdue_result = supabase.table('ar_data').select('invoice_id', count='exact').gt('days_past_due', 0).execute() | |
| vip_count = vip_result.count or 0 | |
| overdue_count = overdue_result.count or 0 | |
| validation_results["info"].append(f"β Data breakdown: {vip_count} VIP records, {overdue_count} overdue records") | |
| except Exception as e: | |
| validation_results["warnings"].append(f"β οΈ Could not check AR data counts: {str(e)[:50]}") | |
| except Exception as e: | |
| validation_results["errors"].append(f"β Critical database connection error: {str(e)}") | |
| validation_results["success"] = False | |
| return validation_results | |