Agent_studio / modules /emailer.py
Corin1998's picture
Update modules/emailer.py
d7db711 verified
import os, smtplib, hmac, hashlib, json, base64
from email.mime.text import MIMEText
from email.utils import formataddr
from modules.utils import log_event
def build_tracking_url(identifier: str, payload: dict) -> str:
"""
クエリパラメータ ?t=TOKEN で計測。
payload 例: {"id": "lead-123", "redirect": "https://your.landing.page"}
"""
secret = os.getenv("TRACKING_SECRET", "dev")
data = json.dumps(payload, ensure_ascii=False)
sig = hmac.new(secret.encode(), data.encode(), hashlib.sha256).digest()
token = base64.urlsafe_b64encode(data.encode() + b"." + sig).decode()
base = os.getenv("PUBLIC_BASE_URL", "https://hf.space")
# /?t=TOKEN の形式にする
joiner = "&" if "?" in base else "?"
return f"{base}{joiner}t={token}"
def send_email(to_email: str, subject: str, body_text: str):
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT", "587"))
user = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASSWORD")
from_name = os.getenv("SMTP_FROM_NAME", "Agent Studio")
from_email = os.getenv("SMTP_FROM_EMAIL", user)
if not (host and user and password and from_email):
raise RuntimeError("SMTP設定が不足しています。")
msg = MIMEText(body_text, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = formataddr((from_name, from_email))
msg["To"] = to_email
with smtplib.SMTP(host, port) as server:
server.starttls()
server.login(user, password)
server.sendmail(from_email, [to_email], msg.as_string())
log_event("email_sent", {"to": to_email, "subject": subject})