Avinash
integrate real backend api
4a5269c
from __future__ import annotations
import os
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple
from dotenv import load_dotenv
from .gmail_client import GmailClient
from .openai_classifier import classify_with_openai
from .pdf_render import render_pdf_to_pngs
# Force load repo_root/backend/.env (single source of truth)
REPO_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(REPO_ROOT / "backend" / ".env", override=True)
@dataclass
class Settings:
creds_path: Path
token_path: Path
label_incoming: str
label_known: str
label_unknown: str
label_train: str
# Rep email for UNKNOWN detection
rep_notify_to: str
notify_from: str
# OpenAI
openai_api_key: str
openai_model: str
poll_seconds: int
max_messages_per_poll: int
render_pages: int
render_dpi: int
trainer_base_url: str
def load_settings() -> Settings:
base = Path(__file__).resolve().parents[1] # backend/
creds = Path(os.environ.get("GMAIL_CREDENTIALS_JSON", str(base / "credentials.json")))
token = Path(os.environ.get("GMAIL_TOKEN_JSON", str(base / "token.json")))
openai_api_key = (os.environ.get("OPENAI_API_KEY_TEST") or os.environ.get("OPENAI_API_KEY") or "").strip()
openai_model = (os.environ.get("OPENAI_MODEL") or "gpt-4o-mini").strip()
return Settings(
creds_path=creds,
token_path=token,
label_incoming=os.environ.get("PDF_PIPELINE_LABEL_INCOMING", "PDF_PIPELINE/INCOMING"),
label_known=os.environ.get("PDF_PIPELINE_LABEL_KNOWN", "PDF_PIPELINE/KNOWN"),
label_unknown=os.environ.get("PDF_PIPELINE_LABEL_UNKNOWN", "PDF_PIPELINE/UNKNOWN"),
label_train=os.environ.get("PDF_PIPELINE_LABEL_TRAIN", "PDF_PIPELINE/TRAIN"),
notify_from=(os.environ.get("PDF_PIPELINE_NOTIFY_FROM") or "").strip(),
rep_notify_to=(os.environ.get("PDF_PIPELINE_NOTIFY_TO") or "").strip(),
openai_api_key=openai_api_key,
openai_model=openai_model,
poll_seconds=int(os.environ.get("PDF_PIPELINE_POLL_SECONDS", "20")),
max_messages_per_poll=int(os.environ.get("PDF_PIPELINE_MAX_PER_POLL", "5")),
render_pages=int(os.environ.get("PDF_PIPELINE_RENDER_PAGES", "2")),
render_dpi=int(os.environ.get("PDF_PIPELINE_RENDER_DPI", "200")),
trainer_base_url=(os.environ.get("PDF_TRAINER_BASE_URL") or "http://localhost:5173").strip(),
)
def _safe_name(s: str) -> str:
return "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in s).strip()
def _write_pipeline_pdf(root_worker_dir: Path, filename: str, pdf_bytes: bytes) -> Tuple[str, Path]:
"""
Persist PDF for the trainer to fetch later.
Returns (pdf_id, pdf_path_on_disk).
"""
uploads_dir = root_worker_dir / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
pdf_id = uuid.uuid4().hex
pdf_path = uploads_dir / f"{pdf_id}.pdf"
name_path = uploads_dir / f"{pdf_id}.name.txt"
pdf_path.write_bytes(pdf_bytes)
name_path.write_text(filename, encoding="utf-8")
return pdf_id, pdf_path
def _process_train_label(gmail: GmailClient, s: Settings, root: Path) -> None:
"""
TRAIN behavior:
- Pull unread PDFs from TRAIN label
- Store into uploads/ and print trainer link
- Mark read
- Do NOT classify
- Do NOT move labels
"""
msgs = gmail.search_unread_pdf_messages(s.label_train, max_results=s.max_messages_per_poll)
if not msgs:
return
for m in msgs:
msg_full = gmail.get_message_full(m.msg_id)
pdf_atts = gmail.list_pdf_attachments(msg_full)
if not pdf_atts:
gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True)
continue
for (filename, att_id) in pdf_atts:
filename = _safe_name(filename or "attachment.pdf")
pdf_bytes = gmail.download_attachment(m.msg_id, att_id)
pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes)
trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}"
gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True)
print(
f"[worker][TRAIN] stored PDF msg={m.msg_id} file={filename} "
f"pdf_id={pdf_id} stored={stored_pdf_path}"
)
print(f"[worker][TRAIN] open: {trainer_link}")
def main():
s = load_settings()
# Validate settings
if not s.rep_notify_to:
raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_TO (rep email for UNKNOWN detection)")
if not s.notify_from:
raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_FROM (OAuth Gmail account email)")
if not s.trainer_base_url:
raise RuntimeError("Missing PDF_TRAINER_BASE_URL (base URL for trainer link)")
if not s.openai_api_key:
raise RuntimeError("Missing OPENAI_API_KEY_TEST (or OPENAI_API_KEY) in backend/.env")
gmail = GmailClient(s.creds_path, s.token_path)
# Ensure labels exist
gmail.ensure_label(s.label_incoming)
gmail.ensure_label(s.label_known)
gmail.ensure_label(s.label_unknown)
gmail.ensure_label(s.label_train)
root = Path(__file__).resolve().parents[0] # backend/worker
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
print(f"[worker] Watching label: {s.label_incoming}")
print(f"[worker] Known label: {s.label_known}")
print(f"[worker] Unknown label: {s.label_unknown}")
print(f"[worker] Train label: {s.label_train}")
print(f"[worker] Rep notify to: {s.rep_notify_to}")
print(f"[worker] OpenAI model: {s.openai_model}")
while True:
try:
# 1) TRAIN lane
_process_train_label(gmail, s, root)
# 2) Main pipeline (INCOMING -> KNOWN/UNKNOWN)
msgs = gmail.search_unread_pdf_messages(s.label_incoming, max_results=s.max_messages_per_poll)
if not msgs:
time.sleep(s.poll_seconds)
continue
for m in msgs:
msg_full = gmail.get_message_full(m.msg_id)
pdf_atts = gmail.list_pdf_attachments(msg_full)
if not pdf_atts:
# Remove INCOMING + mark read so it doesn't loop forever
gmail.move_message(m.msg_id, add_labels=[], remove_labels=[s.label_incoming], mark_read=True)
continue
any_unknown = False
unknown_payloads: List[Tuple[str, bytes]] = []
# Classify all PDF attachments for this message
for (filename, att_id) in pdf_atts:
filename = _safe_name(filename or "attachment.pdf")
pdf_bytes = gmail.download_attachment(m.msg_id, att_id)
stamp = str(int(time.time()))
pdf_path = tmp_dir / f"{stamp}_{m.msg_id}_{filename}"
pdf_path.write_bytes(pdf_bytes)
img_dir = tmp_dir / f"{stamp}_{m.msg_id}_{pdf_path.stem}"
rendered = render_pdf_to_pngs(pdf_path, img_dir, pages=s.render_pages, dpi=s.render_dpi)
image_paths = [str(r.path) for r in rendered]
result = classify_with_openai(
image_paths,
api_key=s.openai_api_key,
model=s.openai_model,
)
template_id = (result.get("template_id") or "UNKNOWN").strip()
conf = float(result.get("confidence") or 0.0)
if template_id == "UNKNOWN":
any_unknown = True
unknown_payloads.append((filename, pdf_bytes))
print(f"[worker] UNKNOWN attachment conf={conf:.3f} msg={m.msg_id} file={filename}")
else:
print(
f"[worker] KNOWN attachment template={template_id} conf={conf:.3f} "
f"msg={m.msg_id} file={filename}"
)
# Apply Gmail label ONCE per message
if any_unknown:
gmail.move_message(
m.msg_id,
add_labels=[s.label_unknown],
remove_labels=[s.label_incoming],
mark_read=True,
)
else:
gmail.move_message(
m.msg_id,
add_labels=[s.label_known],
remove_labels=[s.label_incoming],
mark_read=True,
)
# Notify rep for each unknown PDF attachment
if any_unknown:
for (filename, pdf_bytes) in unknown_payloads:
pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes)
trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}"
subject = "Action required: Unknown PDF format (template not found)"
body = (
"Hi,\n\n"
"We received a PDF that does not match any existing templates in the system.\n\n"
"Please open the PDF Trainer using the link below and create or update the template configuration:\n"
f"{trainer_link}\n\n"
"The original PDF is attached for reference.\n\n"
"Thank you,\n"
"Inserio Automation\n"
)
attachments: List[Tuple[str, bytes]] = []
if len(pdf_bytes) < 20 * 1024 * 1024:
attachments.append((filename, pdf_bytes))
else:
body += "\nNote: The PDF was too large to attach.\n"
gmail.send_email(
to_email=s.rep_notify_to,
from_email=s.notify_from,
subject=subject,
body_text=body,
attachments=attachments,
)
print(
f"[worker] UNKNOWN: emailed rep {s.rep_notify_to} msg={m.msg_id} file={filename} "
f"pdf_id={pdf_id} stored={stored_pdf_path}"
)
except Exception as e:
print(f"[worker] ERROR: {e}")
time.sleep(s.poll_seconds)
if __name__ == "__main__":
main()