Spaces:
Runtime error
Runtime error
File size: 4,528 Bytes
129cd69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | """Gmail tool utils."""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING, List, Optional, Tuple
if TYPE_CHECKING:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import Resource
from googleapiclient.discovery import build as build_resource
logger = logging.getLogger(__name__)
def import_google() -> Tuple[Request, Credentials]:
"""Import google libraries.
Returns:
Tuple[Request, Credentials]: Request and Credentials classes.
"""
# google-auth-httplib2
try:
from google.auth.transport.requests import Request # noqa: F401
from google.oauth2.credentials import Credentials # noqa: F401
except ImportError:
raise ImportError(
"You need to install google-auth-httplib2 to use this toolkit. "
"Try running pip install --upgrade google-auth-httplib2"
)
return Request, Credentials
def import_installed_app_flow() -> InstalledAppFlow:
"""Import InstalledAppFlow class.
Returns:
InstalledAppFlow: InstalledAppFlow class.
"""
try:
from google_auth_oauthlib.flow import InstalledAppFlow
except ImportError:
raise ImportError(
"You need to install google-auth-oauthlib to use this toolkit. "
"Try running pip install --upgrade google-auth-oauthlib"
)
return InstalledAppFlow
def import_googleapiclient_resource_builder() -> build_resource:
"""Import googleapiclient.discovery.build function.
Returns:
build_resource: googleapiclient.discovery.build function.
"""
try:
from googleapiclient.discovery import build
except ImportError:
raise ImportError(
"You need to install googleapiclient to use this toolkit. "
"Try running pip install --upgrade google-api-python-client"
)
return build
DEFAULT_SCOPES = ["https://mail.google.com/"]
DEFAULT_CREDS_TOKEN_FILE = "token.json"
DEFAULT_CLIENT_SECRETS_FILE = "credentials.json"
def get_gmail_credentials(
token_file: Optional[str] = None,
client_secrets_file: Optional[str] = None,
scopes: Optional[List[str]] = None,
) -> Credentials:
"""Get credentials."""
# From https://developers.google.com/gmail/api/quickstart/python
Request, Credentials = import_google()
InstalledAppFlow = import_installed_app_flow()
creds = None
scopes = scopes or DEFAULT_SCOPES
token_file = token_file or DEFAULT_CREDS_TOKEN_FILE
client_secrets_file = client_secrets_file or DEFAULT_CLIENT_SECRETS_FILE
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(token_file):
creds = Credentials.from_authorized_user_file(token_file, scopes)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
# https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application # noqa
flow = InstalledAppFlow.from_client_secrets_file(
client_secrets_file, scopes
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(token_file, "w") as token:
token.write(creds.to_json())
return creds
def build_resource_service(
credentials: Optional[Credentials] = None,
service_name: str = "gmail",
service_version: str = "v1",
) -> Resource:
"""Build a Gmail service."""
credentials = credentials or get_gmail_credentials()
builder = import_googleapiclient_resource_builder()
return builder(service_name, service_version, credentials=credentials)
def clean_email_body(body: str) -> str:
"""Clean email body."""
try:
from bs4 import BeautifulSoup
try:
soup = BeautifulSoup(str(body), "html.parser")
body = soup.get_text()
return str(body)
except Exception as e:
logger.error(e)
return str(body)
except ImportError:
logger.warning("BeautifulSoup not installed. Skipping cleaning.")
return str(body)
|