Spaces:
Sleeping
Sleeping
| # agent.py - AR Collection Agent implementation | |
| from google import genai | |
| from google.genai import types | |
| from datetime import datetime | |
| from typing import Dict, List | |
| from config import GEMINI_API_KEY, SYSTEM_PROMPT, EMAIL_TEMPLATES | |
| from database import query_database as execute_query, log_activity, store_mock_email | |
| class ARCollectionAgent: | |
| def __init__(self): | |
| self.email_history = [] | |
| self.current_date = datetime.now() | |
| self.chat_history = [] | |
| # Initialize Gemini client | |
| self.client = genai.Client(api_key=GEMINI_API_KEY) | |
| # Create tools for function calling | |
| self.tools = self._create_tools() | |
| # Set system instruction | |
| self.system_instruction = SYSTEM_PROMPT.format( | |
| current_date=self.current_date.strftime("%Y-%m-%d") | |
| ) | |
| def _create_tools(self): | |
| """Create tool definitions for the new Gemini API.""" | |
| return [ | |
| self.query_database, | |
| self.create_mock_email, | |
| self.send_bulk_collection_emails, | |
| self.get_current_datetime | |
| ] | |
| def query_database(self, query: str) -> Dict: | |
| """Execute database query.""" | |
| return execute_query(query) | |
| def create_mock_email( | |
| self, | |
| customer_email: str, | |
| customer_name: str, | |
| subject: str, | |
| invoice_details: Dict, | |
| tone: str = "friendly" | |
| ) -> Dict: | |
| """Generate mock collection email.""" | |
| # Get appropriate template | |
| template = EMAIL_TEMPLATES.get(tone, EMAIL_TEMPLATES["friendly"]) | |
| # Format email body | |
| body = template.format( | |
| customer_name=customer_name, | |
| invoice_id=invoice_details.get("invoice_id", "N/A"), | |
| amount=float(invoice_details.get("amount", 0)), | |
| days_overdue=invoice_details.get("days_overdue", 0), | |
| due_date=invoice_details.get("due_date", "N/A") | |
| ) | |
| # Create email record | |
| email_record = { | |
| "timestamp": datetime.now().isoformat(), | |
| "recipient": customer_email, | |
| "subject": subject, | |
| "body": body, | |
| "status": "MOCK - NOT SENT", | |
| "tone": tone, | |
| "invoice_id": invoice_details.get("invoice_id") | |
| } | |
| # Add to history | |
| self.email_history.append(email_record) | |
| # Store in dedicated mock_emails table | |
| store_mock_email(email_record) | |
| # Log activity in demo_activity_log | |
| log_activity( | |
| "mock_email_created", | |
| customer_email.split('@')[0], | |
| email_record | |
| ) | |
| return email_record | |
| def _select_email_tone(self, days_past_due: int, vip_flag: bool, num_late_12m: int, prior_promises_broken: int) -> str: | |
| """Select appropriate email tone based on customer risk profile.""" | |
| # Always gentle with VIP customers | |
| if vip_flag: | |
| if days_past_due > 45: | |
| return "friendly" # Still gentle but noting urgency | |
| return "friendly" | |
| # Non-VIP customers - escalate based on behavior | |
| if days_past_due > 60 or prior_promises_broken > 2: | |
| return "final" # Final notice for seriously overdue or promise breakers | |
| elif days_past_due > 30 or num_late_12m > 2: | |
| return "firm" # Firm approach for repeat offenders | |
| else: | |
| return "friendly" # Standard friendly approach | |
| def send_bulk_collection_emails(self, target_segments: str = "all") -> Dict: | |
| """Send collection emails to overdue customers based on target segments. | |
| Args: | |
| target_segments: "all", "vip", "high_risk", "nordic", or country names like "sweden" | |
| """ | |
| from database import query_database as db_query | |
| try: | |
| # Query for overdue customers | |
| overdue_query_result = db_query("overdue invoices") | |
| if not overdue_query_result.get("success", False): | |
| return { | |
| "success": False, | |
| "error": "Failed to query overdue customers", | |
| "emails_sent": 0 | |
| } | |
| overdue_data = overdue_query_result.get("data", []) | |
| if not overdue_data: | |
| return { | |
| "success": True, | |
| "message": "No overdue customers found", | |
| "emails_sent": 0 | |
| } | |
| # Filter based on target segments | |
| filtered_customers = [] | |
| target_lower = target_segments.lower() | |
| for customer in overdue_data: | |
| include = False | |
| if target_lower == "all": | |
| include = True | |
| elif target_lower == "vip" and customer.get("vip_flag", False): | |
| include = True | |
| elif target_lower == "high_risk" and (customer.get("num_late_12m", 0) > 2 or customer.get("days_past_due", 0) >= 45): | |
| include = True | |
| elif target_lower in ["nordic", "sweden", "norway", "denmark"]: | |
| customer_country = customer.get("country", "").lower() | |
| if target_lower == "nordic" or target_lower in customer_country: | |
| include = True | |
| if include: | |
| filtered_customers.append(customer) | |
| # Generate emails for filtered customers | |
| emails_generated = [] | |
| for customer in filtered_customers: | |
| # Select appropriate tone | |
| tone = self._select_email_tone( | |
| customer.get("days_past_due", 0), | |
| customer.get("vip_flag", False), | |
| customer.get("num_late_12m", 0), | |
| customer.get("prior_promises_broken", 0) | |
| ) | |
| # Create subject based on tone and VIP status | |
| if customer.get("vip_flag", False): | |
| subject = f"Gentle Reminder: Invoice {customer.get('invoice_id')} - Valued Customer" | |
| elif tone == "final": | |
| subject = f"FINAL NOTICE: Invoice {customer.get('invoice_id')} - Immediate Action Required" | |
| elif tone == "firm": | |
| subject = f"Second Notice: Invoice {customer.get('invoice_id')} - Payment Due" | |
| else: | |
| subject = f"Payment Reminder: Invoice {customer.get('invoice_id')}" | |
| # Prepare invoice details | |
| invoice_details = { | |
| "invoice_id": customer.get("invoice_id"), | |
| "amount": customer.get("amount"), | |
| "days_overdue": customer.get("days_past_due", 0), | |
| "due_date": customer.get("due_date") | |
| } | |
| # Generate the email | |
| email_record = self.create_mock_email( | |
| customer_email=customer.get("customer_email", ""), | |
| customer_name=customer.get("company_name", ""), | |
| subject=subject, | |
| invoice_details=invoice_details, | |
| tone=tone | |
| ) | |
| emails_generated.append(email_record) | |
| return { | |
| "success": True, | |
| "message": f"Successfully generated {len(emails_generated)} collection emails for {target_segments} customers", | |
| "emails_sent": len(emails_generated), | |
| "target_segments": target_segments, | |
| "email_details": [ | |
| { | |
| "recipient": email["recipient"], | |
| "subject": email["subject"], | |
| "tone": email["tone"], | |
| "invoice_id": email["invoice_id"] | |
| } | |
| for email in emails_generated[:5] # Return first 5 for summary | |
| ] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error in bulk email generation: {str(e)}", | |
| "emails_sent": 0 | |
| } | |
| def get_current_datetime(self) -> Dict: | |
| """Return current datetime for calculations.""" | |
| return { | |
| "current_date": datetime.now().strftime("%Y-%m-%d"), | |
| "current_time": datetime.now().strftime("%H:%M:%S"), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def process_message(self, message: str) -> str: | |
| """Process user message through Gemini.""" | |
| try: | |
| # Add user message to chat history | |
| self.chat_history.append({"role": "user", "content": message}) | |
| # Prepare contents for the API call | |
| contents = [] | |
| for msg in self.chat_history: | |
| if msg["role"] == "user": | |
| contents.append(types.UserContent(parts=[types.Part.from_text(text=msg["content"])])) | |
| elif msg["role"] == "assistant": | |
| contents.append(types.ModelContent(parts=[types.Part.from_text(text=msg["content"])])) | |
| # Generate content with tools | |
| response = self.client.models.generate_content( | |
| model='gemini-2.5-flash', | |
| contents=contents, | |
| config=types.GenerateContentConfig( | |
| system_instruction=self.system_instruction, | |
| tools=self.tools, | |
| temperature=0.1, | |
| max_output_tokens=4000 | |
| ) | |
| ) | |
| # Get the response text | |
| response_text = response.text if response.text else "I apologize, but I couldn't generate a response." | |
| # Add assistant response to chat history | |
| self.chat_history.append({"role": "assistant", "content": response_text}) | |
| return response_text | |
| except Exception as e: | |
| error_msg = f"Error processing request: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def get_email_history(self) -> List[Dict]: | |
| """Get email history for display.""" | |
| return self.email_history | |
| def clear_history(self): | |
| """Clear chat and email history.""" | |
| self.email_history = [] | |
| self.chat_history = [] |