| from datetime import datetime | |
| import json | |
| import base64 | |
| from googleapiclient.discovery import build | |
| from utils.auth import authenticate_google_services | |
| class Gmail_Agent: | |
| def __init__(self, creds=None): | |
| if not creds: | |
| creds = authenticate_google_services() | |
| self.service = build('gmail', 'v1', credentials=creds) | |
| def read_gmail_messages(self, queries, max_results=5): | |
| query = " ".join(queries) | |
| results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute() | |
| messages = results.get('messages', []) | |
| return messages | |
| def format_messages(self, messages): | |
| formatted_content = "\n\n==========\n\n" | |
| for message in messages: | |
| msg = self.service.users().messages().get(userId='me', id=message['id']).execute() | |
| import json | |
| with open('results.json', 'w', encoding='utf-8') as f: | |
| json.dump(msg, f) | |
| internal_Date = int(msg['internalDate']) | |
| formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S') | |
| formatted_content += f"信件日期: {formatted_Date}\n" | |
| content = self._parse_gmail_response(msg['payload']['parts']) | |
| formatted_content+=f"信件內容: {content}\n\n==========\n\n" | |
| return formatted_content | |
| def send_email(self, to, subject, message_content): | |
| try: | |
| from email.message import EmailMessage | |
| message = EmailMessage() | |
| message.set_content(message_content) | |
| message["To"] = ", ".join(to) | |
| message["Subject"] = subject | |
| encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() | |
| create_message = {"raw": encoded_message} | |
| send_message = ( | |
| self.service.users() | |
| .messages() | |
| .send(userId="me", body=create_message) | |
| .execute() | |
| ) | |
| print(f'Message Id: {send_message["id"]}') | |
| formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!" | |
| except Exception as error: | |
| print(f"An error occurred: {error}") | |
| send_message = None | |
| formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}" | |
| return formatted_content | |
| def get_all_tools(self): | |
| LLM_tools = [] | |
| LLM_tools.append({ | |
| "name": "query_gmail_tool", | |
| "description": "query the gmails with specified queries", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "queries": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:abd@gmail.com'])", | |
| }, | |
| }, | |
| "required": ["queries"] | |
| }, | |
| }) | |
| LLM_tools.append({ | |
| "name": "send_email_tool", | |
| "description": "Send an email to a specified recipient with a specified subject and message", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "to": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "An array of email addresses of all the recipients" | |
| }, | |
| "subject": { | |
| "type": "string", | |
| "description": "The subject of the email" | |
| }, | |
| "message_content": { | |
| "type": "string", | |
| "description": "The content of the email" | |
| } | |
| }, | |
| "required": ["to", "subject", "message_content"] | |
| } | |
| }) | |
| return LLM_tools | |
| def _parse_gmail_response(self, parts): | |
| for part in parts: | |
| if "parts" in part: | |
| content = self._parse_gmail_response(part["parts"]) | |
| if content: | |
| return content | |
| if part['mimeType'] == 'text/plain' and part['body']['size'] != 0: | |
| decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') | |
| return decoded_data | |
| return None | |