Spaces:
Running
Running
| from googleapiclient.discovery import build | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.auth.transport.requests import Request | |
| from email.mime.text import MIMEText | |
| import pickle | |
| import os.path | |
| import base64 | |
| import email | |
| # Define los 谩mbitos que necesitas (ajusta seg煤n tus necesidades) | |
| SCOPES = ['https://mail.google.com/', 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/gmail.modify'] | |
| def gmail_tool(accion, parametros={}): | |
| """Interact煤a con la API de Gmail.""" | |
| creds = None | |
| if os.path.exists('token.pickle'): | |
| with open('token.pickle', 'rb') as token: | |
| creds = pickle.load(token) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| with open('token.pickle', 'wb') as token: | |
| pickle.dump(creds, token) | |
| service = build('gmail', 'v1', credentials=creds) | |
| result = {} # Inicializar result | |
| if accion == "leer_correos": | |
| # ... (implementaci贸n para leer correos) ... | |
| results = service.users().messages().list(userId='me', maxResults=10).execute() | |
| messages = results.get('messages', []) | |
| for message in messages: | |
| msg = service.users().messages().get(userId='me', id=message['id']).execute() | |
| # Decodificar el mensaje (puede ser multipart) | |
| payload = msg['payload'] | |
| parts = payload.get('parts', []) # Verificar si hay partes | |
| body = "" | |
| if parts: | |
| for part in parts: | |
| if part.get('mimeType') == 'text/plain': | |
| data = part['body'].get('data', '') | |
| body += base64.urlsafe_b64decode(data).decode() | |
| else: # Si no hay partes, el cuerpo est谩 en payload['body'] | |
| data = payload['body'].get('data','') | |
| body+= base64.urlsafe_b64decode(data).decode() | |
| print(body) | |
| result["message_id"] = send_message["id"] # Almacenar el ID del mensaje en el resultado | |
| elif accion == "enviar_correo": | |
| # ... (implementaci贸n para enviar correo) ... | |
| message = MIMEText('Este es el cuerpo del correo.') # Importa MIMEText de email.mime.text | |
| message['to'] = 'destinatario@example.com' #!CAMBIAR DESTINATARIO | |
| message['from'] = 'tu_correo@gmail.com'#!CAMBIAR TU CORREO | |
| message['subject'] = 'Asunto del correo' | |
| create_message = {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()} | |
| send_message = service.users().messages().send(userId='me', body=create_message).execute() | |
| print(F'Message Id: {send_message["id"]}') | |
| elif accion == "verificar_almacenamiento": | |
| try: | |
| drive_service = build('drive', 'v3', credentials=creds) | |
| about = drive_service.about().get(fields="storageQuota").execute() | |
| storage_quota = about.get('storageQuota') | |
| # print(storage_quota) # Para depurar | |
| return storage_quota # Devuelve la informaci贸n del almacenamiento | |
| except Exception as e: | |
| print(f"Error al verificar el almacenamiento: {e}") | |
| return {"error": "Error al verificar el almacenamiento."} | |
| # ... (otras acciones) | |
| return result | |
| # ... (Integraci贸n con el prompt del sistema) |