Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import os | |
| from dataclasses import dataclass | |
| from email.message import EmailMessage | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| SCOPES = [ | |
| "https://www.googleapis.com/auth/gmail.modify", | |
| "https://www.googleapis.com/auth/gmail.send", | |
| ] | |
| class GmailMessage: | |
| msg_id: str | |
| thread_id: str | |
| class GmailClient: | |
| def __init__(self, credentials_path: Path, token_path: Path): | |
| if not credentials_path.exists(): | |
| raise FileNotFoundError(f"Missing OAuth client json: {credentials_path}") | |
| if not token_path.exists(): | |
| raise FileNotFoundError(f"Missing token json: {token_path}") | |
| creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) | |
| self.service = build("gmail", "v1", credentials=creds, cache_discovery=False) | |
| def list_labels(self) -> List[dict]: | |
| resp = self.service.users().labels().list(userId="me").execute() | |
| return resp.get("labels", []) | |
| def get_label_id(self, name: str) -> Optional[str]: | |
| for lbl in self.list_labels(): | |
| if lbl.get("name") == name: | |
| return lbl.get("id") | |
| return None | |
| def ensure_label(self, name: str) -> str: | |
| existing = self.get_label_id(name) | |
| if existing: | |
| return existing | |
| body = { | |
| "name": name, | |
| "labelListVisibility": "labelShow", | |
| "messageListVisibility": "show", | |
| } | |
| created = self.service.users().labels().create(userId="me", body=body).execute() | |
| return created["id"] | |
| def search_unread_pdf_messages(self, label_name: str, max_results: int = 10) -> List[GmailMessage]: | |
| # Gmail search query: label + unread + pdf attachments | |
| query = f'label:"{label_name}" is:unread has:attachment filename:pdf' | |
| resp = self.service.users().messages().list(userId="me", q=query, maxResults=max_results).execute() | |
| msgs = resp.get("messages", []) or [] | |
| out: List[GmailMessage] = [] | |
| for m in msgs: | |
| out.append(GmailMessage(msg_id=m["id"], thread_id=m.get("threadId", ""))) | |
| return out | |
| def get_message_full(self, msg_id: str) -> dict: | |
| return self.service.users().messages().get(userId="me", id=msg_id, format="full").execute() | |
| def _walk_parts(self, payload: dict) -> List[dict]: | |
| parts = [] | |
| stack = [payload] | |
| while stack: | |
| node = stack.pop() | |
| if not isinstance(node, dict): | |
| continue | |
| if node.get("parts"): | |
| stack.extend(node["parts"]) | |
| parts.append(node) | |
| return parts | |
| def list_pdf_attachments(self, msg_full: dict) -> List[Tuple[str, str]]: | |
| """ | |
| Returns [(filename, attachmentId), ...] for application/pdf parts. | |
| """ | |
| payload = msg_full.get("payload", {}) or {} | |
| parts = self._walk_parts(payload) | |
| out: List[Tuple[str, str]] = [] | |
| for p in parts: | |
| filename = (p.get("filename") or "").strip() | |
| body = p.get("body") or {} | |
| att_id = body.get("attachmentId") | |
| mime = (p.get("mimeType") or "").lower() | |
| if filename.lower().endswith(".pdf") or mime == "application/pdf": | |
| if filename and att_id: | |
| out.append((filename, att_id)) | |
| return out | |
| def download_attachment(self, msg_id: str, attachment_id: str) -> bytes: | |
| att = ( | |
| self.service.users() | |
| .messages() | |
| .attachments() | |
| .get(userId="me", messageId=msg_id, id=attachment_id) | |
| .execute() | |
| ) | |
| data = att.get("data", "") | |
| return base64.urlsafe_b64decode(data.encode("utf-8")) | |
| def move_message( | |
| self, | |
| msg_id: str, | |
| add_labels: List[str], | |
| remove_labels: List[str], | |
| mark_read: bool = True, | |
| ) -> None: | |
| add_ids = [self.ensure_label(n) for n in add_labels] | |
| remove_ids = [self.ensure_label(n) for n in remove_labels] | |
| if mark_read: | |
| remove_ids.append("UNREAD") | |
| body = {"addLabelIds": add_ids, "removeLabelIds": remove_ids} | |
| self.service.users().messages().modify(userId="me", id=msg_id, body=body).execute() | |
| def send_email(self, to_email: str, subject: str, body_text: str, from_email: Optional[str] = None, attachments: Optional[List[Tuple[str, bytes]]] = None) -> None: | |
| msg = EmailMessage() | |
| msg["To"] = to_email | |
| msg["Subject"] = subject | |
| if from_email: | |
| msg["From"] = from_email | |
| msg.set_content(body_text) | |
| attachments = attachments or [] | |
| for filename, data in attachments: | |
| # basic content type guess for pdf/json | |
| if filename.lower().endswith(".pdf"): | |
| maintype, subtype = "application", "pdf" | |
| elif filename.lower().endswith(".json"): | |
| maintype, subtype = "application", "json" | |
| else: | |
| maintype, subtype = "application", "octet-stream" | |
| msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename) | |
| raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8") | |
| self.service.users().messages().send(userId="me", body={"raw": raw}).execute() |