Spaces:
Sleeping
Sleeping
File size: 5,451 Bytes
2d5e892 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | from __future__ import annotations
import base64
import os
from dataclasses import dataclass
from email.message import EmailMessage
from pathlib import Path
from typing import List, Optional, Tuple
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
SCOPES = [
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/gmail.send",
]
@dataclass
class GmailMessage:
msg_id: str
thread_id: str
class GmailClient:
def __init__(self, credentials_path: Path, token_path: Path):
if not credentials_path.exists():
raise FileNotFoundError(f"Missing OAuth client json: {credentials_path}")
if not token_path.exists():
raise FileNotFoundError(f"Missing token json: {token_path}")
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
self.service = build("gmail", "v1", credentials=creds, cache_discovery=False)
def list_labels(self) -> List[dict]:
resp = self.service.users().labels().list(userId="me").execute()
return resp.get("labels", [])
def get_label_id(self, name: str) -> Optional[str]:
for lbl in self.list_labels():
if lbl.get("name") == name:
return lbl.get("id")
return None
def ensure_label(self, name: str) -> str:
existing = self.get_label_id(name)
if existing:
return existing
body = {
"name": name,
"labelListVisibility": "labelShow",
"messageListVisibility": "show",
}
created = self.service.users().labels().create(userId="me", body=body).execute()
return created["id"]
def search_unread_pdf_messages(self, label_name: str, max_results: int = 10) -> List[GmailMessage]:
# Gmail search query: label + unread + pdf attachments
query = f'label:"{label_name}" is:unread has:attachment filename:pdf'
resp = self.service.users().messages().list(userId="me", q=query, maxResults=max_results).execute()
msgs = resp.get("messages", []) or []
out: List[GmailMessage] = []
for m in msgs:
out.append(GmailMessage(msg_id=m["id"], thread_id=m.get("threadId", "")))
return out
def get_message_full(self, msg_id: str) -> dict:
return self.service.users().messages().get(userId="me", id=msg_id, format="full").execute()
def _walk_parts(self, payload: dict) -> List[dict]:
parts = []
stack = [payload]
while stack:
node = stack.pop()
if not isinstance(node, dict):
continue
if node.get("parts"):
stack.extend(node["parts"])
parts.append(node)
return parts
def list_pdf_attachments(self, msg_full: dict) -> List[Tuple[str, str]]:
"""
Returns [(filename, attachmentId), ...] for application/pdf parts.
"""
payload = msg_full.get("payload", {}) or {}
parts = self._walk_parts(payload)
out: List[Tuple[str, str]] = []
for p in parts:
filename = (p.get("filename") or "").strip()
body = p.get("body") or {}
att_id = body.get("attachmentId")
mime = (p.get("mimeType") or "").lower()
if filename.lower().endswith(".pdf") or mime == "application/pdf":
if filename and att_id:
out.append((filename, att_id))
return out
def download_attachment(self, msg_id: str, attachment_id: str) -> bytes:
att = (
self.service.users()
.messages()
.attachments()
.get(userId="me", messageId=msg_id, id=attachment_id)
.execute()
)
data = att.get("data", "")
return base64.urlsafe_b64decode(data.encode("utf-8"))
def move_message(
self,
msg_id: str,
add_labels: List[str],
remove_labels: List[str],
mark_read: bool = True,
) -> None:
add_ids = [self.ensure_label(n) for n in add_labels]
remove_ids = [self.ensure_label(n) for n in remove_labels]
if mark_read:
remove_ids.append("UNREAD")
body = {"addLabelIds": add_ids, "removeLabelIds": remove_ids}
self.service.users().messages().modify(userId="me", id=msg_id, body=body).execute()
def send_email(self, to_email: str, subject: str, body_text: str, from_email: Optional[str] = None, attachments: Optional[List[Tuple[str, bytes]]] = None) -> None:
msg = EmailMessage()
msg["To"] = to_email
msg["Subject"] = subject
if from_email:
msg["From"] = from_email
msg.set_content(body_text)
attachments = attachments or []
for filename, data in attachments:
# basic content type guess for pdf/json
if filename.lower().endswith(".pdf"):
maintype, subtype = "application", "pdf"
elif filename.lower().endswith(".json"):
maintype, subtype = "application", "json"
else:
maintype, subtype = "application", "octet-stream"
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
self.service.users().messages().send(userId="me", body={"raw": raw}).execute() |