Spaces:
Runtime error
Runtime error
File size: 2,870 Bytes
114ebf7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | # app/core/integrations/email_handler.py
from typing import Optional, List, Dict
import imaplib
import email
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailHandler:
def __init__(self, config):
self.logger = logging.getLogger(__name__)
self.config = config
self.imap = None
self.smtp = None
async def connect(self):
"""Connect to email servers"""
try:
# IMAP connection
self.imap = imaplib.IMAP4_SSL(self.config.IMAP_HOST)
self.imap.login(self.config.EMAIL, self.config.PASSWORD)
# SMTP connection
self.smtp = smtplib.SMTP(self.config.SMTP_HOST, self.config.SMTP_PORT)
self.smtp.starttls()
self.smtp.login(self.config.EMAIL, self.config.PASSWORD)
return True
except Exception as e:
self.logger.error(f"Connection error: {e}")
return False
async def disconnect(self):
"""Close email connections"""
try:
if self.imap:
self.imap.logout()
if self.smtp:
self.smtp.quit()
except Exception as e:
self.logger.error(f"Disconnect error: {e}")
async def fetch_recent_emails(self, folder="INBOX", limit=10) -> List[Dict]:
"""Fetch recent emails from specified folder"""
try:
self.imap.select(folder)
_, messages = self.imap.search(None, "ALL")
email_list = []
for num in messages[0].split()[-limit:]:
_, msg = self.imap.fetch(num, "(RFC822)")
email_message = email.message_from_bytes(msg[0][1])
email_data = {
'subject': email_message['subject'],
'from': email_message['from'],
'date': email_message['date'],
'body': self.get_email_body(email_message)
}
email_list.append(email_data)
return email_list
except Exception as e:
self.logger.error(f"Error fetching emails: {e}")
return []
def get_email_body(self, email_message) -> str:
"""Extract email body from message"""
try:
if email_message.is_multipart():
body = ""
for part in email_message.walk():
if part.get_content_type() == "text/plain":
body += part.get_payload(decode=True).decode()
return body
return email_message.get_payload(decode=True).decode()
except Exception as e:
self.logger.error(f"Error extracting email body: {e}")
return ""
|