pdf-trainer-api / backend /worker /gmail_client.py
Avinash
integrate real backend api
4a5269c
from __future__ import annotations
import base64
import os
from dataclasses import dataclass
from email.message import EmailMessage
from pathlib import Path
from typing import List, Optional, Tuple
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
SCOPES = [
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/gmail.send",
]
@dataclass
class GmailMessage:
msg_id: str
thread_id: str
class GmailClient:
def __init__(self, credentials_path: Path, token_path: Path):
if not credentials_path.exists():
raise FileNotFoundError(f"Missing OAuth client json: {credentials_path}")
if not token_path.exists():
raise FileNotFoundError(f"Missing token json: {token_path}")
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
self.service = build("gmail", "v1", credentials=creds, cache_discovery=False)
def list_labels(self) -> List[dict]:
resp = self.service.users().labels().list(userId="me").execute()
return resp.get("labels", [])
def get_label_id(self, name: str) -> Optional[str]:
for lbl in self.list_labels():
if lbl.get("name") == name:
return lbl.get("id")
return None
def ensure_label(self, name: str) -> str:
existing = self.get_label_id(name)
if existing:
return existing
body = {
"name": name,
"labelListVisibility": "labelShow",
"messageListVisibility": "show",
}
created = self.service.users().labels().create(userId="me", body=body).execute()
return created["id"]
def search_unread_pdf_messages(self, label_name: str, max_results: int = 10) -> List[GmailMessage]:
# Gmail search query: label + unread + pdf attachments
query = f'label:"{label_name}" is:unread has:attachment filename:pdf'
resp = self.service.users().messages().list(userId="me", q=query, maxResults=max_results).execute()
msgs = resp.get("messages", []) or []
out: List[GmailMessage] = []
for m in msgs:
out.append(GmailMessage(msg_id=m["id"], thread_id=m.get("threadId", "")))
return out
def get_message_full(self, msg_id: str) -> dict:
return self.service.users().messages().get(userId="me", id=msg_id, format="full").execute()
def _walk_parts(self, payload: dict) -> List[dict]:
parts = []
stack = [payload]
while stack:
node = stack.pop()
if not isinstance(node, dict):
continue
if node.get("parts"):
stack.extend(node["parts"])
parts.append(node)
return parts
def list_pdf_attachments(self, msg_full: dict) -> List[Tuple[str, str]]:
"""
Returns [(filename, attachmentId), ...] for application/pdf parts.
"""
payload = msg_full.get("payload", {}) or {}
parts = self._walk_parts(payload)
out: List[Tuple[str, str]] = []
for p in parts:
filename = (p.get("filename") or "").strip()
body = p.get("body") or {}
att_id = body.get("attachmentId")
mime = (p.get("mimeType") or "").lower()
if filename.lower().endswith(".pdf") or mime == "application/pdf":
if filename and att_id:
out.append((filename, att_id))
return out
def download_attachment(self, msg_id: str, attachment_id: str) -> bytes:
att = (
self.service.users()
.messages()
.attachments()
.get(userId="me", messageId=msg_id, id=attachment_id)
.execute()
)
data = att.get("data", "")
return base64.urlsafe_b64decode(data.encode("utf-8"))
def move_message(
self,
msg_id: str,
add_labels: List[str],
remove_labels: List[str],
mark_read: bool = True,
) -> None:
add_ids = [self.ensure_label(n) for n in add_labels]
remove_ids = [self.ensure_label(n) for n in remove_labels]
if mark_read:
remove_ids.append("UNREAD")
body = {"addLabelIds": add_ids, "removeLabelIds": remove_ids}
self.service.users().messages().modify(userId="me", id=msg_id, body=body).execute()
def send_email(self, to_email: str, subject: str, body_text: str, from_email: Optional[str] = None, attachments: Optional[List[Tuple[str, bytes]]] = None) -> None:
msg = EmailMessage()
msg["To"] = to_email
msg["Subject"] = subject
if from_email:
msg["From"] = from_email
msg.set_content(body_text)
attachments = attachments or []
for filename, data in attachments:
# basic content type guess for pdf/json
if filename.lower().endswith(".pdf"):
maintype, subtype = "application", "pdf"
elif filename.lower().endswith(".json"):
maintype, subtype = "application", "json"
else:
maintype, subtype = "application", "octet-stream"
msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
self.service.users().messages().send(userId="me", body={"raw": raw}).execute()