ARCollectionAgent / agent.py
faerazo's picture
Commit to HFS
3bb6958 verified
# agent.py - AR Collection Agent implementation
from google import genai
from google.genai import types
from datetime import datetime
from typing import Dict, List
from config import GEMINI_API_KEY, SYSTEM_PROMPT, EMAIL_TEMPLATES
from database import query_database as execute_query, log_activity, store_mock_email
class ARCollectionAgent:
def __init__(self):
self.email_history = []
self.current_date = datetime.now()
self.chat_history = []
# Initialize Gemini client
self.client = genai.Client(api_key=GEMINI_API_KEY)
# Create tools for function calling
self.tools = self._create_tools()
# Set system instruction
self.system_instruction = SYSTEM_PROMPT.format(
current_date=self.current_date.strftime("%Y-%m-%d")
)
def _create_tools(self):
"""Create tool definitions for the new Gemini API."""
return [
self.query_database,
self.create_mock_email,
self.send_bulk_collection_emails,
self.get_current_datetime
]
def query_database(self, query: str) -> Dict:
"""Execute database query."""
return execute_query(query)
def create_mock_email(
self,
customer_email: str,
customer_name: str,
subject: str,
invoice_details: Dict,
tone: str = "friendly"
) -> Dict:
"""Generate mock collection email."""
# Get appropriate template
template = EMAIL_TEMPLATES.get(tone, EMAIL_TEMPLATES["friendly"])
# Format email body
body = template.format(
customer_name=customer_name,
invoice_id=invoice_details.get("invoice_id", "N/A"),
amount=float(invoice_details.get("amount", 0)),
days_overdue=invoice_details.get("days_overdue", 0),
due_date=invoice_details.get("due_date", "N/A")
)
# Create email record
email_record = {
"timestamp": datetime.now().isoformat(),
"recipient": customer_email,
"subject": subject,
"body": body,
"status": "MOCK - NOT SENT",
"tone": tone,
"invoice_id": invoice_details.get("invoice_id")
}
# Add to history
self.email_history.append(email_record)
# Store in dedicated mock_emails table
store_mock_email(email_record)
# Log activity in demo_activity_log
log_activity(
"mock_email_created",
customer_email.split('@')[0],
email_record
)
return email_record
def _select_email_tone(self, days_past_due: int, vip_flag: bool, num_late_12m: int, prior_promises_broken: int) -> str:
"""Select appropriate email tone based on customer risk profile."""
# Always gentle with VIP customers
if vip_flag:
if days_past_due > 45:
return "friendly" # Still gentle but noting urgency
return "friendly"
# Non-VIP customers - escalate based on behavior
if days_past_due > 60 or prior_promises_broken > 2:
return "final" # Final notice for seriously overdue or promise breakers
elif days_past_due > 30 or num_late_12m > 2:
return "firm" # Firm approach for repeat offenders
else:
return "friendly" # Standard friendly approach
def send_bulk_collection_emails(self, target_segments: str = "all") -> Dict:
"""Send collection emails to overdue customers based on target segments.
Args:
target_segments: "all", "vip", "high_risk", "nordic", or country names like "sweden"
"""
from database import query_database as db_query
try:
# Query for overdue customers
overdue_query_result = db_query("overdue invoices")
if not overdue_query_result.get("success", False):
return {
"success": False,
"error": "Failed to query overdue customers",
"emails_sent": 0
}
overdue_data = overdue_query_result.get("data", [])
if not overdue_data:
return {
"success": True,
"message": "No overdue customers found",
"emails_sent": 0
}
# Filter based on target segments
filtered_customers = []
target_lower = target_segments.lower()
for customer in overdue_data:
include = False
if target_lower == "all":
include = True
elif target_lower == "vip" and customer.get("vip_flag", False):
include = True
elif target_lower == "high_risk" and (customer.get("num_late_12m", 0) > 2 or customer.get("days_past_due", 0) >= 45):
include = True
elif target_lower in ["nordic", "sweden", "norway", "denmark"]:
customer_country = customer.get("country", "").lower()
if target_lower == "nordic" or target_lower in customer_country:
include = True
if include:
filtered_customers.append(customer)
# Generate emails for filtered customers
emails_generated = []
for customer in filtered_customers:
# Select appropriate tone
tone = self._select_email_tone(
customer.get("days_past_due", 0),
customer.get("vip_flag", False),
customer.get("num_late_12m", 0),
customer.get("prior_promises_broken", 0)
)
# Create subject based on tone and VIP status
if customer.get("vip_flag", False):
subject = f"Gentle Reminder: Invoice {customer.get('invoice_id')} - Valued Customer"
elif tone == "final":
subject = f"FINAL NOTICE: Invoice {customer.get('invoice_id')} - Immediate Action Required"
elif tone == "firm":
subject = f"Second Notice: Invoice {customer.get('invoice_id')} - Payment Due"
else:
subject = f"Payment Reminder: Invoice {customer.get('invoice_id')}"
# Prepare invoice details
invoice_details = {
"invoice_id": customer.get("invoice_id"),
"amount": customer.get("amount"),
"days_overdue": customer.get("days_past_due", 0),
"due_date": customer.get("due_date")
}
# Generate the email
email_record = self.create_mock_email(
customer_email=customer.get("customer_email", ""),
customer_name=customer.get("company_name", ""),
subject=subject,
invoice_details=invoice_details,
tone=tone
)
emails_generated.append(email_record)
return {
"success": True,
"message": f"Successfully generated {len(emails_generated)} collection emails for {target_segments} customers",
"emails_sent": len(emails_generated),
"target_segments": target_segments,
"email_details": [
{
"recipient": email["recipient"],
"subject": email["subject"],
"tone": email["tone"],
"invoice_id": email["invoice_id"]
}
for email in emails_generated[:5] # Return first 5 for summary
]
}
except Exception as e:
return {
"success": False,
"error": f"Error in bulk email generation: {str(e)}",
"emails_sent": 0
}
def get_current_datetime(self) -> Dict:
"""Return current datetime for calculations."""
return {
"current_date": datetime.now().strftime("%Y-%m-%d"),
"current_time": datetime.now().strftime("%H:%M:%S"),
"timestamp": datetime.now().isoformat()
}
async def process_message(self, message: str) -> str:
"""Process user message through Gemini."""
try:
# Add user message to chat history
self.chat_history.append({"role": "user", "content": message})
# Prepare contents for the API call
contents = []
for msg in self.chat_history:
if msg["role"] == "user":
contents.append(types.UserContent(parts=[types.Part.from_text(text=msg["content"])]))
elif msg["role"] == "assistant":
contents.append(types.ModelContent(parts=[types.Part.from_text(text=msg["content"])]))
# Generate content with tools
response = self.client.models.generate_content(
model='gemini-2.5-flash',
contents=contents,
config=types.GenerateContentConfig(
system_instruction=self.system_instruction,
tools=self.tools,
temperature=0.1,
max_output_tokens=4000
)
)
# Get the response text
response_text = response.text if response.text else "I apologize, but I couldn't generate a response."
# Add assistant response to chat history
self.chat_history.append({"role": "assistant", "content": response_text})
return response_text
except Exception as e:
error_msg = f"Error processing request: {str(e)}"
print(error_msg)
return error_msg
def get_email_history(self) -> List[Dict]:
"""Get email history for display."""
return self.email_history
def clear_history(self):
"""Clear chat and email history."""
self.email_history = []
self.chat_history = []