Spaces:
Sleeping
Sleeping
| import requests | |
| import base64 | |
| from bs4 import BeautifulSoup | |
| import re | |
| import jwt | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| from cryptography.hazmat.backends import default_backend | |
| import os | |
| import hashlib | |
| class GmailDataExtractor: | |
| def __init__(self,jwt:str , user_input: str = None) -> None: | |
| if jwt is None : | |
| self.error = "Error" | |
| else: | |
| self.__jwt = jwt | |
| self.__user_input = user_input | |
| self.error = None | |
| self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU' | |
| def __validate_jwt_token(self): | |
| try: | |
| payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) | |
| access_token = payload.get("access_token") | |
| if access_token: | |
| return access_token | |
| else: | |
| raise ValueError("Invalid JWT token: Missing access token") | |
| except jwt.ExpiredSignatureError: | |
| raise ValueError("Invalid JWT token: Expired token") | |
| except jwt.InvalidTokenError: | |
| raise ValueError("Invalid JWT token: Token verification failed") | |
| def __fetch_messages(self) -> list: | |
| """ | |
| Fetches messages from the Gmail API. | |
| Args: | |
| gmail_url (str): The URL for the Gmail API request. | |
| access_token (str): The access token for authenticating with Gmail API. | |
| Returns: | |
| list: A list of message objects retrieved from the Gmail API. | |
| Raises: | |
| RuntimeError: If there is an issue while fetching messages from the Gmail API. | |
| """ | |
| """currently not implementing jwt for testing purposes | |
| replace every access_token with jwt function directly which returns the access token""" | |
| access_token = self.__jwt | |
| print("access token") | |
| print(access_token) | |
| receipt_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases)' | |
| # if self.__user_input is not None: | |
| # receipt_query = f'((subject:"your order" OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice OR category:purchases) AND subject:{self.__user_input})&maxResults=15' | |
| gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}&maxResults=10" | |
| gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) | |
| gmail_data = gmail_response.json() | |
| messages=[] | |
| messages.extend(gmail_data.get("messages",[])) | |
| # def __fetch_page(url): | |
| # response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"}) | |
| # response.raise_for_status() # Raise error if the request fails | |
| # data = response.json() | |
| # return data.get("messages", []), data.get("nextPageToken") | |
| # messages = [] | |
| # page_token = None | |
| # try: | |
| # while True: | |
| # url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url | |
| # page_messages, page_token = __fetch_page(url) | |
| # messages.extend(page_messages) | |
| # if not page_token: | |
| # break | |
| # except requests.RequestException as e: | |
| # raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}") | |
| print(len(messages)) | |
| return messages | |
| def __fetch_message_data(self, message_id: str) -> dict: | |
| """ | |
| Fetches message data from the Gmail API. | |
| Args: | |
| message_id (str): The ID of the message to fetch. | |
| Returns: | |
| dict: Message data retrieved from the Gmail API. | |
| Raises: | |
| RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
| """ | |
| print("fetch_message_data") | |
| message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" | |
| try: | |
| response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
| response.raise_for_status() # Raise error if the request fails | |
| return response.json() | |
| except requests.RequestException as e: | |
| raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") | |
| def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: | |
| """ | |
| Fetches attachment data from the Gmail API. | |
| Args: | |
| message_id (str): The ID of the message containing the attachment. | |
| attachment_id (str): The ID of the attachment to fetch. | |
| Returns: | |
| dict: Attachment data retrieved from the Gmail API. | |
| Raises: | |
| RuntimeError: If there is an issue while fetching attachment data from the Gmail API. | |
| """ | |
| print("fetch_attachment_data") | |
| attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" | |
| try: | |
| response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) | |
| response.raise_for_status() # Raise error if the request fails | |
| return response.json() | |
| except requests.RequestException as e: | |
| raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") | |
| def __process_message(self, message: dict) -> tuple: | |
| """ | |
| Processes a single message. | |
| Args: | |
| message (dict): The message to process. | |
| Returns: | |
| tuple: A tuple containing the subject (str), body (str), links (list of str), | |
| and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. | |
| Raises: | |
| RuntimeError: If there is an issue while fetching message data from the Gmail API. | |
| """ | |
| print("process_messages") | |
| message_id = message.get("id") | |
| encrypted_message_id = self.encrypt_message_id(message_id) | |
| if not message_id: | |
| return None, None, [], False | |
| subject='' | |
| message_data = self.__fetch_message_data(message_id) | |
| if 'payload' in message_data and 'headers' in message_data['payload']: | |
| headers = message_data['payload']['headers'] | |
| for header in headers: | |
| if header['name'] == 'Subject': | |
| subject = header['value'] | |
| body = '' | |
| text='' | |
| links = [] | |
| has_attachment = False | |
| company_from_gmail = 'others' | |
| if 'payload' in message_data and 'parts' in message_data['payload']: | |
| parts = message_data['payload']['parts'] | |
| payload = message_data['payload']['headers'] | |
| print("printing headers response") | |
| print(payload) | |
| #Extracting the domain name from the senders email | |
| for fromdata in payload: | |
| if fromdata['name'] == 'From': | |
| company_from_gmail = self.extract_domain_from_email(fromdata['value']) | |
| break | |
| if 'chanel' in subject.lower(): | |
| company_from_gmail = 'chanel' | |
| if 'louis vuitton' in subject.lower(): | |
| company_from_gmail = 'Louis Vuitton' | |
| for part in parts: | |
| if 'mimeType' not in part: | |
| continue | |
| mime_type = part['mimeType'] | |
| if mime_type == 'text/plain' or mime_type == 'text/html': | |
| body_data = part['body'].get('data', '') | |
| body = base64.urlsafe_b64decode(body_data) | |
| text= self.extract_text(body) | |
| if 'body' in part and 'attachmentId' in part['body']: | |
| attachment_id = part['body']['attachmentId'] | |
| attachment_data = self.__fetch_attachment_data(message_id, attachment_id) | |
| data = attachment_data.get("data", "") | |
| filename = part.get("filename", "untitled.txt") | |
| if data: | |
| # Save only the first 10 characters of the attachment data | |
| return subject,text ,{"filename":filename , "data":data} , company_from_gmail , message_id | |
| return subject, text,None , company_from_gmail , message_id | |
| def encrypt_message_id(self,message_id:str): | |
| key = os.getenv('AES_KEY').encode('utf-8')[:32] | |
| message_id_bytes = message_id.encode('utf-8') | |
| iv = os.urandom(16) | |
| # Initialize AES cipher with the key and CBC mode | |
| cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) | |
| # Create a encryptor object | |
| encryptor = cipher.encryptor() | |
| # Pad the message_id to be a multiple of 16 bytes (AES block size) | |
| # This is necessary for AES encryption | |
| message_id_padded = message_id_bytes.ljust(32, b'\0') | |
| # Encrypt the padded message_id | |
| ciphertext = encryptor.update(message_id_padded) + encryptor.finalize() | |
| return ciphertext | |
| def extract_domain_from_email(self,email_string): | |
| # Extracting the email address using regex | |
| email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() | |
| # Extracting the domain name from the email address | |
| domain = email_address.split('@')[-1].split('.')[0] | |
| if email_address and domain : | |
| return domain | |
| else: | |
| return None | |
| def extract_text(self,html_content:str): | |
| """ | |
| Extracts text and links from HTML content. | |
| Args: | |
| html_content (str): The HTML content to process. | |
| Returns: | |
| tuple: A tuple containing the extracted text (str) and links (list of tuples). | |
| Raises: | |
| ValueError: If the input HTML content is empty or None. | |
| """ | |
| if not html_content: | |
| raise ValueError("HTML content is empty or None") | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Extract text | |
| text = soup.get_text(separator=' ') | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| print("Printing the extracted text from the html") | |
| print(text) | |
| print() | |
| print() | |
| # Extract links | |
| links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] | |
| return text | |
| def extract_messages(self) -> dict: | |
| """ | |
| Extracts messages based on the provided brand name. | |
| Args: | |
| brand_name (str): The brand name to search for in email subjects. | |
| jwt_token (str): The JWT token for authentication. | |
| Returns: | |
| dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. | |
| format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]} | |
| """ | |
| print("entered the extract messages") | |
| messages = self.__fetch_messages() | |
| results = [] | |
| for message in messages: | |
| subject, body, attachment_data , company_name , encrypt_mssg_id = self.__process_message(message) | |
| """ Handling None values """ | |
| body = body if body is not None else '' | |
| attachment_data = attachment_data if attachment_data is not None else {} | |
| company_associated = company_name if company_name is not None else '' | |
| en_msg_id = encrypt_mssg_id if encrypt_mssg_id is not None else None | |
| results.append({"body": body, "attachment_data": [attachment_data] ,'company_associated':company_associated , "message_id":en_msg_id}) | |
| return {"results": results} | |
| # obj = GmailDataExtractor("abcd","user_input") | |
| # print(obj.error) |