File size: 5,451 Bytes
4a5269c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from __future__ import annotations
import base64
import os
from dataclasses import dataclass
from email.message import EmailMessage
from pathlib import Path
from typing import List, Optional, Tuple

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build


SCOPES = [
    "https://www.googleapis.com/auth/gmail.modify",
    "https://www.googleapis.com/auth/gmail.send",
]


@dataclass
class GmailMessage:
    msg_id: str
    thread_id: str


class GmailClient:
    def __init__(self, credentials_path: Path, token_path: Path):
        if not credentials_path.exists():
            raise FileNotFoundError(f"Missing OAuth client json: {credentials_path}")
        if not token_path.exists():
            raise FileNotFoundError(f"Missing token json: {token_path}")

        creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
        self.service = build("gmail", "v1", credentials=creds, cache_discovery=False)

    def list_labels(self) -> List[dict]:
        resp = self.service.users().labels().list(userId="me").execute()
        return resp.get("labels", [])

    def get_label_id(self, name: str) -> Optional[str]:
        for lbl in self.list_labels():
            if lbl.get("name") == name:
                return lbl.get("id")
        return None

    def ensure_label(self, name: str) -> str:
        existing = self.get_label_id(name)
        if existing:
            return existing

        body = {
            "name": name,
            "labelListVisibility": "labelShow",
            "messageListVisibility": "show",
        }
        created = self.service.users().labels().create(userId="me", body=body).execute()
        return created["id"]

    def search_unread_pdf_messages(self, label_name: str, max_results: int = 10) -> List[GmailMessage]:
        # Gmail search query: label + unread + pdf attachments
        query = f'label:"{label_name}" is:unread has:attachment filename:pdf'
        resp = self.service.users().messages().list(userId="me", q=query, maxResults=max_results).execute()
        msgs = resp.get("messages", []) or []
        out: List[GmailMessage] = []
        for m in msgs:
            out.append(GmailMessage(msg_id=m["id"], thread_id=m.get("threadId", "")))
        return out

    def get_message_full(self, msg_id: str) -> dict:
        return self.service.users().messages().get(userId="me", id=msg_id, format="full").execute()

    def _walk_parts(self, payload: dict) -> List[dict]:
        parts = []
        stack = [payload]
        while stack:
            node = stack.pop()
            if not isinstance(node, dict):
                continue
            if node.get("parts"):
                stack.extend(node["parts"])
            parts.append(node)
        return parts

    def list_pdf_attachments(self, msg_full: dict) -> List[Tuple[str, str]]:
        """
        Returns [(filename, attachmentId), ...] for application/pdf parts.
        """
        payload = msg_full.get("payload", {}) or {}
        parts = self._walk_parts(payload)

        out: List[Tuple[str, str]] = []
        for p in parts:
            filename = (p.get("filename") or "").strip()
            body = p.get("body") or {}
            att_id = body.get("attachmentId")
            mime = (p.get("mimeType") or "").lower()

            if filename.lower().endswith(".pdf") or mime == "application/pdf":
                if filename and att_id:
                    out.append((filename, att_id))
        return out

    def download_attachment(self, msg_id: str, attachment_id: str) -> bytes:
        att = (
            self.service.users()
            .messages()
            .attachments()
            .get(userId="me", messageId=msg_id, id=attachment_id)
            .execute()
        )
        data = att.get("data", "")
        return base64.urlsafe_b64decode(data.encode("utf-8"))

    def move_message(
        self,
        msg_id: str,
        add_labels: List[str],
        remove_labels: List[str],
        mark_read: bool = True,
    ) -> None:
        add_ids = [self.ensure_label(n) for n in add_labels]
        remove_ids = [self.ensure_label(n) for n in remove_labels]

        if mark_read:
            remove_ids.append("UNREAD")

        body = {"addLabelIds": add_ids, "removeLabelIds": remove_ids}
        self.service.users().messages().modify(userId="me", id=msg_id, body=body).execute()

    def send_email(self, to_email: str, subject: str, body_text: str, from_email: Optional[str] = None, attachments: Optional[List[Tuple[str, bytes]]] = None) -> None:
        msg = EmailMessage()
        msg["To"] = to_email
        msg["Subject"] = subject
        if from_email:
            msg["From"] = from_email
        msg.set_content(body_text)

        attachments = attachments or []
        for filename, data in attachments:
            # basic content type guess for pdf/json
            if filename.lower().endswith(".pdf"):
                maintype, subtype = "application", "pdf"
            elif filename.lower().endswith(".json"):
                maintype, subtype = "application", "json"
            else:
                maintype, subtype = "application", "octet-stream"
            msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)

        raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
        self.service.users().messages().send(userId="me", body={"raw": raw}).execute()