Spaces:
Sleeping
Sleeping
| """ | |
| environments/trace_env/tools/gmail_tool.py | |
| Gmail integration β searches the user's Gmail for messages. | |
| Handles credential loading with proper path resolution and token refresh. | |
| Extended with: | |
| - fetch_gmail_attachments(message_id): download image/PDF attachments | |
| - search_gmail_with_attachments(query): search + auto-analyse image attachments | |
| """ | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| import base64 | |
| import os | |
| import pickle | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"] | |
| # Resolve credential paths relative to the project root (trace/ directory) | |
| _PROJECT_ROOT = os.path.abspath( | |
| os.path.join(os.path.dirname(__file__), "..", "..", "..") | |
| ) | |
| _CREDENTIALS_PATH = os.path.join(_PROJECT_ROOT, "credentials.json") | |
| _TOKEN_PATH = os.path.join(_PROJECT_ROOT, "token_gmail.pkl") | |
| # MIME types we treat as images for analysis | |
| _IMAGE_MIME_TYPES = { | |
| "image/jpeg", "image/jpg", "image/png", "image/gif", | |
| "image/webp", "image/bmp", "image/tiff", | |
| } | |
| # MIME types for documents that doc_tool can parse | |
| _DOCUMENT_MIME_TYPES = { | |
| "application/pdf", | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # .docx | |
| "application/msword", # .doc | |
| "application/vnd.openxmlformats-officedocument.presentationml.presentation", # .pptx | |
| } | |
| # All MIME types we can analyse (images + documents) | |
| _ANALYSABLE_MIME_TYPES = _IMAGE_MIME_TYPES | _DOCUMENT_MIME_TYPES | |
| def get_gmail_service(): | |
| import base64 | |
| # Reconstruct files from Hugging Face Secrets if available | |
| # Support both naming conventions: GMAIL_TOKEN_B64 and TOKEN_GMAIL_B64 | |
| gmail_token_b64 = os.environ.get("GMAIL_TOKEN_B64") or os.environ.get("TOKEN_GMAIL_B64") | |
| if gmail_token_b64 and not os.path.exists(_TOKEN_PATH): | |
| try: | |
| with open(_TOKEN_PATH, "wb") as f: | |
| f.write(base64.b64decode(gmail_token_b64)) | |
| logger.info(f"[GMAIL_TOOL] Reconstructed token_gmail.pkl from HF Secret") | |
| except Exception as e: | |
| logger.error(f"Failed to decode Gmail token secret: {e}") | |
| gcp_creds_b64 = os.environ.get("GCP_CREDENTIALS_B64") | |
| if gcp_creds_b64 and not os.path.exists(_CREDENTIALS_PATH): | |
| try: | |
| with open(_CREDENTIALS_PATH, "wb") as f: | |
| f.write(base64.b64decode(gcp_creds_b64)) | |
| logger.info(f"[GMAIL_TOOL] Reconstructed credentials.json from HF Secret") | |
| except Exception as e: | |
| logger.error(f"Failed to decode GCP_CREDENTIALS_B64: {e}") | |
| creds = None | |
| # Token saved after first login | |
| if os.path.exists(_TOKEN_PATH): | |
| with open(_TOKEN_PATH, "rb") as f: | |
| creds = pickle.load(f) | |
| # Refresh expired credentials or run auth flow | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| try: | |
| creds.refresh(Request()) | |
| except Exception: | |
| # Refresh failed β re-authenticate | |
| creds = None | |
| if not creds: | |
| if not os.path.exists(_CREDENTIALS_PATH): | |
| raise FileNotFoundError( | |
| f"Google credentials not found at {_CREDENTIALS_PATH}. " | |
| f"Download from Google Cloud Console and place as credentials.json " | |
| f"in the trace/ directory." | |
| ) | |
| flow = InstalledAppFlow.from_client_secrets_file( | |
| _CREDENTIALS_PATH, SCOPES | |
| ) | |
| creds = flow.run_local_server(port=0) | |
| with open(_TOKEN_PATH, "wb") as f: | |
| pickle.dump(creds, f) | |
| return build("gmail", "v1", credentials=creds) | |
| def search_gmail(query: str, max_results: int = 10) -> list[dict]: | |
| """ | |
| Search Gmail and return simplified message dicts. | |
| query examples: "receipt 2022", "invoice after:2022/1/1 before:2023/1/1" | |
| """ | |
| try: | |
| service = get_gmail_service() | |
| results = service.users().messages().list( | |
| userId="me", q=query, maxResults=max_results | |
| ).execute() | |
| messages = results.get("messages", []) | |
| output = [] | |
| for msg in messages: | |
| detail = service.users().messages().get( | |
| userId="me", id=msg["id"], format="metadata", | |
| metadataHeaders=["From", "Subject", "Date"] | |
| ).execute() | |
| headers = {h["name"]: h["value"] for h in detail["payload"]["headers"]} | |
| output.append({ | |
| "id": msg["id"], | |
| "date": headers.get("Date", ""), | |
| "from": headers.get("From", ""), | |
| "subject": headers.get("Subject", ""), | |
| "snippet": detail.get("snippet", ""), | |
| }) | |
| return output | |
| except FileNotFoundError as e: | |
| print(f"[GMAIL_TOOL] {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"[GMAIL_TOOL] Error searching Gmail: {e}") | |
| return [] | |
| def fetch_gmail_attachments(message_id: str, analyse_images: bool = True) -> list[dict]: | |
| """ | |
| Download attachments from a specific Gmail message. | |
| Args: | |
| message_id: Gmail message ID (from search_gmail result). | |
| analyse_images: If True, automatically analyse image attachments using | |
| the image_tool (LLaMA 4 Scout via HF Inference API). | |
| Returns: | |
| List of attachment dicts: | |
| { | |
| "attachment_id": str, | |
| "filename": str, | |
| "mime_type": str, | |
| "size_bytes": int, | |
| "data": bytes, # raw decoded bytes | |
| "image_analysis": dict | None, # populated if analyse_images=True | |
| } | |
| """ | |
| attachments = [] | |
| body_text = "" | |
| try: | |
| service = get_gmail_service() | |
| # Get full message to inspect payload parts | |
| message = service.users().messages().get( | |
| userId="me", id=message_id, format="full" | |
| ).execute() | |
| parts = _flatten_parts(message.get("payload", {})) | |
| for part in parts: | |
| filename = part.get("filename", "") | |
| mime_type = part.get("mimeType", "") | |
| body = part.get("body", {}) | |
| attachment_id = body.get("attachmentId") | |
| size_bytes = body.get("size", 0) | |
| # Extract email body text (plain or HTML) | |
| if mime_type in ("text/plain", "text/html") and body.get("data"): | |
| try: | |
| text_bytes = base64.urlsafe_b64decode(body["data"] + "==") | |
| text = text_bytes.decode("utf-8", errors="ignore") | |
| if mime_type == "text/html": | |
| import re | |
| # Strip basic HTML tags to get plain text | |
| text = re.sub(r'<[^>]+>', ' ', text) | |
| # Unescape basic HTML entities | |
| text = text.replace(" ", " ").replace("&", "&").replace("<", "<").replace(">", ">") | |
| # Compress whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| body_text += text + "\n" | |
| except Exception: | |
| pass | |
| # ββ Handle forwarded email containers (message/rfc822) βββββββββββββ | |
| # The nested RFC822 email is stored as an attachment in Gmail. | |
| # We download it, parse it with Python's email module, and extract | |
| # any PDF/doc parts inside (e.g. Uber receipt, IndiGo invoice). | |
| if mime_type == "message/rfc822" and attachment_id: | |
| try: | |
| att_data = service.users().messages().attachments().get( | |
| userId="me", messageId=message_id, id=attachment_id | |
| ).execute() | |
| raw_b64 = att_data.get("data", "") | |
| eml_bytes = base64.urlsafe_b64decode(raw_b64 + "==") | |
| import email as _email | |
| nested_msg = _email.message_from_bytes(eml_bytes) | |
| # Walk the nested message to find PDF/doc parts | |
| for nested_part in nested_msg.walk(): | |
| nested_ct = nested_part.get_content_type() | |
| nested_fn = nested_part.get_filename() or "" | |
| payload = nested_part.get_payload(decode=True) | |
| if not payload: | |
| continue | |
| # Also accumulate plain text and html text from the nested email body | |
| if nested_ct in ("text/plain", "text/html") and not nested_fn: | |
| try: | |
| text_bytes = nested_part.get_payload(decode=True) | |
| text = text_bytes.decode("utf-8", errors="ignore") | |
| if nested_ct == "text/html": | |
| import re | |
| text = re.sub(r'<[^>]+>', ' ', text) | |
| text = text.replace(" ", " ").replace("&", "&").replace("<", "<").replace(">", ">") | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| body_text += text + "\n" | |
| except Exception: | |
| pass | |
| continue | |
| if nested_ct not in _DOCUMENT_MIME_TYPES and nested_ct not in _IMAGE_MIME_TYPES: | |
| continue | |
| # Give it a fallback filename based on content type | |
| if not nested_fn: | |
| ext_map = {"application/pdf": ".pdf", "image/jpeg": ".jpg", "image/png": ".png"} | |
| nested_fn = f"nested_attachment{ext_map.get(nested_ct, '.bin')}" | |
| nested_att = { | |
| "attachment_id": f"nested_{nested_fn}", | |
| "filename": nested_fn, | |
| "mime_type": nested_ct, | |
| "size_bytes": len(payload), | |
| "data": payload, | |
| "image_analysis": None, | |
| "doc_analysis": None, | |
| } | |
| if nested_ct in _DOCUMENT_MIME_TYPES: | |
| try: | |
| from .doc_tool import extract_document | |
| logger.info(f"[GMAIL_TOOL] Extracting nested doc: {nested_fn}") | |
| doc_result = extract_document( | |
| file_bytes=payload, | |
| filename=nested_fn, | |
| mime_type=nested_ct, | |
| analyse_images=False, | |
| ) | |
| nested_att["doc_analysis"] = doc_result | |
| logger.info( | |
| f"[GMAIL_TOOL] Nested doc extracted: parser={doc_result.get('parser_used')}, " | |
| f"text_len={len(doc_result.get('extracted_text', ''))}, " | |
| f"amounts={doc_result.get('entities', {}).get('amounts', [])}" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"[GMAIL_TOOL] Nested doc extraction failed: {e}") | |
| attachments.append(nested_att) | |
| except Exception as e: | |
| logger.warning(f"[GMAIL_TOOL] Failed to parse nested RFC822 message: {e}") | |
| continue # Skip the normal attachment flow for rfc822 parts | |
| # Only process named standard attachments | |
| if not filename or not attachment_id: | |
| continue | |
| # Download attachment bytes | |
| att_data = service.users().messages().attachments().get( | |
| userId="me", messageId=message_id, id=attachment_id | |
| ).execute() | |
| raw_b64 = att_data.get("data", "") | |
| # Gmail uses URL-safe base64 | |
| raw_bytes = base64.urlsafe_b64decode(raw_b64 + "==") | |
| attachment = { | |
| "attachment_id": attachment_id, | |
| "filename": filename, | |
| "mime_type": mime_type, | |
| "size_bytes": size_bytes, | |
| "data": raw_bytes, | |
| "image_analysis": None, | |
| "doc_analysis": None, | |
| } | |
| # Analyse image attachments with Ollama VLM pipeline | |
| # NOTE: Disabled by default β very slow on CPU (30-60s per image). | |
| # Amounts come from PDF text layer, not image logos. | |
| # Enable only if you specifically need VLM image OCR. | |
| if False and analyse_images and mime_type in _IMAGE_MIME_TYPES: | |
| try: | |
| from .image_tool import analyse_image_from_bytes | |
| logger.info( | |
| f"[GMAIL_TOOL] Analysing image attachment: {filename} ({mime_type})" | |
| ) | |
| analysis = analyse_image_from_bytes( | |
| image_bytes=raw_bytes, | |
| mime_type=mime_type, | |
| filename=filename, | |
| ) | |
| attachment["image_analysis"] = analysis | |
| logger.info( | |
| f"[GMAIL_TOOL] Image analysis complete for {filename}: " | |
| f"extracted {len(analysis.get('extracted_text', ''))} chars" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"[GMAIL_TOOL] Image analysis failed for {filename}: {e}") | |
| # Extract text/images from document attachments (PDF, DOCX, PPTX) | |
| if mime_type in _DOCUMENT_MIME_TYPES: | |
| try: | |
| from .doc_tool import extract_document | |
| logger.info( | |
| f"[GMAIL_TOOL] Extracting document: {filename} ({mime_type})" | |
| ) | |
| doc_result = extract_document( | |
| file_bytes=raw_bytes, | |
| filename=filename, | |
| mime_type=mime_type, | |
| analyse_images=analyse_images, | |
| ) | |
| attachment["doc_analysis"] = doc_result | |
| logger.info( | |
| f"[GMAIL_TOOL] Document extraction complete for {filename}: " | |
| f"parser={doc_result.get('parser_used')}, " | |
| f"text_len={len(doc_result.get('extracted_text', ''))}, " | |
| f"amounts={doc_result.get('entities', {}).get('amounts', [])}" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"[GMAIL_TOOL] Document extraction failed for {filename}: {e}") | |
| attachments.append(attachment) | |
| logger.info(f"[GMAIL_TOOL] Downloaded attachment: {filename} ({size_bytes} bytes)") | |
| except FileNotFoundError as e: | |
| logger.error(f"[GMAIL_TOOL] {e}") | |
| except Exception as e: | |
| logger.error(f"[GMAIL_TOOL] Error fetching attachments for {message_id}: {e}") | |
| return {"attachments": attachments, "body_text": body_text.strip()} | |
| def search_gmail_with_attachments( | |
| query: str, | |
| max_results: int = 10, | |
| analyse_images: bool = True, | |
| ) -> list[dict]: | |
| """ | |
| Search Gmail and automatically download + analyse attachments in PARALLEL. | |
| """ | |
| messages = search_gmail(query, max_results) | |
| if not messages: | |
| return [] | |
| from concurrent.futures import ThreadPoolExecutor | |
| def process_single_msg(msg): | |
| msg_copy = dict(msg) | |
| msg_copy["attachments"] = [] | |
| msg_copy["image_analyses"] = [] | |
| msg_copy["doc_analyses"] = [] | |
| try: | |
| fetch_result = fetch_gmail_attachments(msg["id"], analyse_images=analyse_images) | |
| attachments = fetch_result["attachments"] | |
| if fetch_result["body_text"]: | |
| msg_copy["body_text"] = fetch_result["body_text"][:2000] | |
| msg_copy["attachments"] = [{k: v for k, v in a.items() if k != "data"} for a in attachments] | |
| for att in attachments: | |
| if att.get("image_analysis") and not att["image_analysis"].get("error"): | |
| msg_copy["image_analyses"].append(att["image_analysis"]) | |
| if att.get("doc_analysis") and not att["doc_analysis"].get("error"): | |
| msg_copy["doc_analyses"].append(att["doc_analysis"]) | |
| except Exception as e: | |
| logger.warning(f"[GMAIL_TOOL] Error in parallel fetch for {msg['id']}: {e}") | |
| return msg_copy | |
| # Use 4 workers to stay safe on 16GB RAM while boosting speed | |
| logger.info(f"[GMAIL_TOOL] Starting parallel processing of {len(messages)} messages with 4 workers...") | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| enriched = list(executor.map(process_single_msg, messages)) | |
| return enriched | |
| # ββ Internal helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _flatten_parts(payload: dict) -> list[dict]: | |
| """ | |
| Recursively flatten multipart message payload into a list of leaf parts. | |
| Handles: | |
| - multipart/* : recurse into 'parts' list | |
| - message/rfc822 : forwarded email β recurse into nested payload's 'parts' | |
| - leaf parts : return as-is (text/plain, application/pdf, image/*, etc.) | |
| """ | |
| parts = [] | |
| mime_type = payload.get("mimeType", "") | |
| if "parts" in payload: | |
| # multipart/* β recurse into each child part | |
| for part in payload["parts"]: | |
| parts.extend(_flatten_parts(part)) | |
| elif mime_type == "message/rfc822": | |
| # Forwarded email container β the nested message is in body.attachmentId | |
| # but its parts are accessible via the nested payload (if present) | |
| nested = payload.get("body", {}) | |
| if nested.get("attachmentId"): | |
| # The nested email itself is an attachment; expose it so | |
| # fetch_gmail_attachments can download and parse it. | |
| parts.append(payload) | |
| else: | |
| # Leaf part (text, pdf, image, etc.) | |
| parts.append(payload) | |
| return parts |