sifars-chatbot-demo / src /utils /_email_client.py
Aryan Jain
migrate to langchain pinecone and use groq tool call model
6bd76d2
import os
from email.message import EmailMessage
import aiosmtplib
from ._config import logger
class EmailClient:
def __init__(self):
self.message = EmailMessage()
self.message["From"] = os.getenv("SENDER_EMAIL")
self.message["To"] = os.getenv("RECIPIENT_EMAIL")
self.smtp_host = os.getenv("SMTP_HOST")
self.smtp_port = int(os.getenv("SMTP_PORT"))
self.smtp_username = os.getenv("SMTP_USERNAME")
self.smtp_password = os.getenv("SMTP_PASSWORD")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def send_email(self, details: dict, attachment_paths=[]):
logger.info("Sending email...")
subject = details.get("subject")
name = details.get("name")
email = details.get("email")
phone_number = details.get("phone_number")
reason_for_contact = details.get("reason_for_contact")
subject = details.get("subject")
if not name or not email or not phone_number or not reason_for_contact:
return "All parameters are required. Please ask missing parameters from the user."
self.message["Subject"] = subject
content = f"""Hello Team, \n\n{name} wants to connect with us regarding the following reason: \n{reason_for_contact}. \n\nContact Details:\nEmail: {email}\nPhone Number: {phone_number}\n\nThanks for your consideration."""
self.message.set_content(content)
if attachment_paths:
for attachment_path in attachment_paths:
with open(attachment_path, "rb") as f:
file_data = f.read()
file_name = os.path.basename(attachment_path)
self.message.add_attachment(
file_data,
maintype="application",
subtype="octet-stream",
filename=file_name,
)
response = await aiosmtplib.send(
self.message,
hostname=self.smtp_host,
port=self.smtp_port,
start_tls=True,
username=self.smtp_username,
password=self.smtp_password,
)
logger.info(response)
return "Email sent successfully."
async def report_missing_context(self, details: dict, attachment_paths=[]):
logger.info("Sending missing context report...")
unresolved_query = details.get("unresolved_query")
if not unresolved_query:
return "Please pass a valid unresolved query."
subject = "Missing Context Report"
content = f"Hello Team, \n\nThis is the query that user asked for our chatbot could not find an answer in our knowledge base:\n\nUnresolved Query: {unresolved_query}\n\nThanks for your consideration."
self.message["Subject"] = subject
self.message.set_content(content)
if attachment_paths:
for attachment_path in attachment_paths:
with open(attachment_path, "rb") as f:
file_data = f.read()
file_name = os.path.basename(attachment_path)
self.message.add_attachment(
file_data,
maintype="application",
subtype="octet-stream",
filename=file_name,
)
response = await aiosmtplib.send(
self.message,
hostname=self.smtp_host,
port=self.smtp_port,
start_tls=True,
username=self.smtp_username,
password=self.smtp_password,
)
logger.info(response)
return "Missing Context Report sent successfully."