Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import logging | |
| import smtplib | |
| import ssl | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| logger = logging.getLogger(__name__) | |
| # Email configuration | |
| SMTP_SERVER = os.getenv("SMTP_SERVER", "127.0.0.1") | |
| SMTP_PORT = int(os.getenv("SMTP_PORT", "1025")) | |
| # Prioritize EMAIL_ID and EMAIL_PASSWORD if available | |
| SMTP_USERNAME = os.getenv("EMAIL_ID") or os.getenv("SMTP_USERNAME", "sender@domain.com") | |
| SMTP_PASSWORD = os.getenv("EMAIL_PASSWORD") or os.getenv("SMTP_PASSWORD", "yourpassword") | |
| # Auto-configure for Gmail if using defaults and gmail address | |
| if SMTP_SERVER == "127.0.0.1" and "gmail.com" in SMTP_USERNAME: | |
| SMTP_SERVER = "smtp.gmail.com" | |
| SMTP_PORT = 465 | |
| SMTP_SENDER = os.getenv("SMTP_SENDER", SMTP_USERNAME) | |
| class GmailService: | |
| SCOPES = ['https://www.googleapis.com/auth/gmail.send'] | |
| def __init__(self): | |
| self.creds = None | |
| self.service = None | |
| # Server-side credentials for Gmail API | |
| self.client_id = os.getenv('SERVER_GOOGLE_CLIENT_ID') or os.getenv('GOOGLE_CLIENT_ID') | |
| self.client_secret = os.getenv('SERVER_GOOGLE_CLIENT_SECRET') or os.getenv('GOOGLE_CLIENT_SECRET') | |
| self.refresh_token = os.getenv('SERVER_GOOGLE_REFRESH_TOKEN') or os.getenv('GOOGLE_REFRESH_TOKEN') | |
| self.sender_email = os.getenv('SMTP_SENDER') or os.getenv('EMAIL_ID') | |
| def authenticate(self): | |
| """Authenticate using the refresh token.""" | |
| if not all([self.client_id, self.client_secret, self.refresh_token]): | |
| # logger.error("Missing Google API credentials (CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN)") | |
| return False | |
| try: | |
| # Create credentials object from the refresh token | |
| self.creds = Credentials( | |
| None, # No access token initially | |
| refresh_token=self.refresh_token, | |
| token_uri="https://oauth2.googleapis.com/token", | |
| client_id=self.client_id, | |
| client_secret=self.client_secret, | |
| scopes=self.SCOPES | |
| ) | |
| # Refresh the token (get a new access token) | |
| if self.creds and self.creds.expired and self.creds.refresh_token: | |
| self.creds.refresh(Request()) | |
| self.service = build('gmail', 'v1', credentials=self.creds) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to authenticate with Gmail API: {e}") | |
| return False | |
| def send_email(self, to_email: str, subject: str, body: str) -> bool: | |
| """Send an email using the Gmail API.""" | |
| if not self.service: | |
| if not self.authenticate(): | |
| return False | |
| try: | |
| message = MIMEMultipart() | |
| message['to'] = to_email | |
| # Format sender with display name | |
| sender_name = "AnimateImage" | |
| if self.sender_email and "<" not in self.sender_email: | |
| message['from'] = f"{sender_name} <{self.sender_email}>" | |
| else: | |
| message['from'] = self.sender_email or "unknown@example.com" | |
| message['subject'] = subject | |
| message.attach(MIMEText(body, 'plain')) | |
| raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') | |
| body = {'raw': raw_message} | |
| message = (self.service.users().messages().send(userId="me", body=body).execute()) | |
| logger.info(f"Email sent to {to_email} (Message Id: {message['id']})") | |
| return True | |
| except HttpError as error: | |
| logger.error(f"An error occurred sending email to {to_email}: {error}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unexpected error sending email: {e}") | |
| return False | |
| # Initialize global instance | |
| gmail_service = GmailService() | |
| def send_email(to_email: str, subject: str, body: str): | |
| """ | |
| Send an email using Gmail API with SMTP fallback. | |
| This function is blocking and should be run in a background task. | |
| """ | |
| # Try Gmail API first | |
| if gmail_service.authenticate(): | |
| if gmail_service.send_email(to_email, subject, body): | |
| return True | |
| logger.warning("Gmail API credentials not found or invalid. Falling back to SMTP.") | |
| # Fallback to SMTP (Original Implementation) | |
| try: | |
| message = MIMEMultipart() | |
| message["From"] = SMTP_SENDER | |
| message["To"] = to_email | |
| message["Subject"] = subject | |
| message.attach(MIMEText(body, "plain")) | |
| # Create a secure SSL context | |
| context = ssl.create_default_context() | |
| context.check_hostname = False | |
| context.verify_mode = ssl.CERT_NONE | |
| with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as server: | |
| server.login(SMTP_USERNAME, SMTP_PASSWORD) | |
| server.sendmail(SMTP_SENDER, to_email, message.as_string()) | |
| logger.info(f"Email sent successfully to {to_email} via SMTP") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to send email to {to_email}: {e}") | |
| return False | |