from __future__ import annotations import base64 import os from dataclasses import dataclass from email.message import EmailMessage from pathlib import Path from typing import List, Optional, Tuple from google.oauth2.credentials import Credentials from googleapiclient.discovery import build SCOPES = [ "https://www.googleapis.com/auth/gmail.modify", "https://www.googleapis.com/auth/gmail.send", ] @dataclass class GmailMessage: msg_id: str thread_id: str class GmailClient: def __init__(self, credentials_path: Path, token_path: Path): if not credentials_path.exists(): raise FileNotFoundError(f"Missing OAuth client json: {credentials_path}") if not token_path.exists(): raise FileNotFoundError(f"Missing token json: {token_path}") creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) self.service = build("gmail", "v1", credentials=creds, cache_discovery=False) def list_labels(self) -> List[dict]: resp = self.service.users().labels().list(userId="me").execute() return resp.get("labels", []) def get_label_id(self, name: str) -> Optional[str]: for lbl in self.list_labels(): if lbl.get("name") == name: return lbl.get("id") return None def ensure_label(self, name: str) -> str: existing = self.get_label_id(name) if existing: return existing body = { "name": name, "labelListVisibility": "labelShow", "messageListVisibility": "show", } created = self.service.users().labels().create(userId="me", body=body).execute() return created["id"] def search_unread_pdf_messages(self, label_name: str, max_results: int = 10) -> List[GmailMessage]: # Gmail search query: label + unread + pdf attachments query = f'label:"{label_name}" is:unread has:attachment filename:pdf' resp = self.service.users().messages().list(userId="me", q=query, maxResults=max_results).execute() msgs = resp.get("messages", []) or [] out: List[GmailMessage] = [] for m in msgs: out.append(GmailMessage(msg_id=m["id"], thread_id=m.get("threadId", ""))) return out def get_message_full(self, msg_id: str) -> dict: return self.service.users().messages().get(userId="me", id=msg_id, format="full").execute() def _walk_parts(self, payload: dict) -> List[dict]: parts = [] stack = [payload] while stack: node = stack.pop() if not isinstance(node, dict): continue if node.get("parts"): stack.extend(node["parts"]) parts.append(node) return parts def list_pdf_attachments(self, msg_full: dict) -> List[Tuple[str, str]]: """ Returns [(filename, attachmentId), ...] for application/pdf parts. """ payload = msg_full.get("payload", {}) or {} parts = self._walk_parts(payload) out: List[Tuple[str, str]] = [] for p in parts: filename = (p.get("filename") or "").strip() body = p.get("body") or {} att_id = body.get("attachmentId") mime = (p.get("mimeType") or "").lower() if filename.lower().endswith(".pdf") or mime == "application/pdf": if filename and att_id: out.append((filename, att_id)) return out def download_attachment(self, msg_id: str, attachment_id: str) -> bytes: att = ( self.service.users() .messages() .attachments() .get(userId="me", messageId=msg_id, id=attachment_id) .execute() ) data = att.get("data", "") return base64.urlsafe_b64decode(data.encode("utf-8")) def move_message( self, msg_id: str, add_labels: List[str], remove_labels: List[str], mark_read: bool = True, ) -> None: add_ids = [self.ensure_label(n) for n in add_labels] remove_ids = [self.ensure_label(n) for n in remove_labels] if mark_read: remove_ids.append("UNREAD") body = {"addLabelIds": add_ids, "removeLabelIds": remove_ids} self.service.users().messages().modify(userId="me", id=msg_id, body=body).execute() def send_email(self, to_email: str, subject: str, body_text: str, from_email: Optional[str] = None, attachments: Optional[List[Tuple[str, bytes]]] = None) -> None: msg = EmailMessage() msg["To"] = to_email msg["Subject"] = subject if from_email: msg["From"] = from_email msg.set_content(body_text) attachments = attachments or [] for filename, data in attachments: # basic content type guess for pdf/json if filename.lower().endswith(".pdf"): maintype, subtype = "application", "pdf" elif filename.lower().endswith(".json"): maintype, subtype = "application", "json" else: maintype, subtype = "application", "octet-stream" msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename) raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8") self.service.users().messages().send(userId="me", body={"raw": raw}).execute()