Alerter_v3.0 / mailer_utils.py
Ninad077's picture
Upload 2 files
2aeca3c verified
import os
import imaplib
import email
import re
import zipfile
import shutil
from google.cloud import bigquery
from datetime import date,datetime,timedelta
import pandas as pd
import string
import datetime as dt
import numpy as np
from io import BytesIO
import io
import csv
import urllib.request
import certifi
import email
from email import policy
from email.parser import BytesParser
import streamlit.components.v1 as components
import base64
import json
from google.oauth2 import service_account
import base64
gmail_username = "rasikajadhav@gofynd.com"
gmail_password = "wmjl srmk hcpv fati"
insertion_date = datetime.now().strftime('%d-%b-%Y')
def is_attachment_supported(filename):
supported_extensions = ['.zip', '.xlsx', '.xls', '.csv']
return any(filename.lower().endswith(ext) for ext in supported_extensions)
def extract_attachment_name(filename, label, email_body, code):
# Extract logic to rename the attachment file if needed
# Modify as per your requirements
if code:
filename_parts = filename.split('.')
new_filename = f"{filename_parts[0]}_{code}.{filename_parts[1]}"
return new_filename
return filename
def extract_csv_link(email_content):
# Regular expression for matching URLs
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
# Find all matches in the email content
urls = re.findall(url_pattern, email_content)
return urls
def upload_status(label, bq_client, email_subject, imap_server):
project_id = "fynd-db"
dataset_id = "finance_recon_tool_asia" #### Changed
table_id = "COD_Email_Upload_Status"
label_value = label
# Corrected SQL query with the FROM clause
sql_query = f"""
SELECT Subject_Date
FROM `{project_id}.{dataset_id}.{table_id}`
WHERE Label = @label_value"""
# Set query parameters
query_params = [bigquery.ScalarQueryParameter("label_value", "STRING", label_value)]
# Execute the query and convert the result to a Pandas DataFrame
try:
df = bq_client.query(sql_query, job_config=bigquery.QueryJobConfig(query_parameters=query_params)).to_dataframe()
except Exception as e:
print(f"Error executing query: {e}")
value_exists = any(df["Subject_Date"] == email_subject)
if value_exists:
return True
else:
return False
def compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label,log):
try:
log = log + '.\n' + 'comparison'
# table_ref = bq_client.dataset(dataset_id).table(table_id)
# print(table_ref)
table = bq_client.get_table(table_id)
bq_schema_dict = {field.name: field.field_type for field in table.schema}
bq_schema_df = pd.DataFrame(list(bq_schema_dict.items()), columns=['column_name', 'bq_data_type'])
# Compare column names
columns_df1 = set(df.columns)
columns_df2 = set(bq_schema_df['column_name'])
columns_sequence_df1 = tuple(columns_df1)
columns_sequence_df2 = tuple(columns_df2)
# print(columns_df2)
common_columns = columns_df1.intersection(columns_df2)
if len(common_columns) != len(columns_df1):
extra_columns_df1 = columns_df1 - common_columns
data_status = "Extra columns received in the attachment"
return data_status
elif len(common_columns) != len(columns_df2):
extra_columns_df2 = columns_df2 - common_columns
data_status = "Less number columns received in the attachment"
return data_status
elif (len(common_columns) == len(columns_df2)) and (columns_sequence_df1 == columns_sequence_df2):
data_status = "Both DataFrames have the same columns in same sequence."
return data_status
except Exception as e:
print(e)
# bluedart
def Blue_dart_attachment(username, password, label, code, bq_client, email_ids, imap_server):
dataset_id = 'finance_recon_tool_asia'
project_id = 'fynd-db'
table_id = 'COD_bluedart'
# data_status = "Both DataFrames have the same columns in same sequence."
log = ""
log = log + ".\n"+".\n" + label
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
log = log + ".\n"+ email_message["Subject"]
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Extract email body text
email_body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
email_body += part.get_payload(decode=True).decode("utf-8")
# Iterate over the email parts
for part in email_message.walk():
# Check if the part has a filename (attachment)
if part.get_filename():
# Extract the filename and check if it is supported
attachment_filename = part.get_filename()
if attachment_filename and is_attachment_supported(attachment_filename):
# Read the attachment file data into a BytesIO object
csv_file = io.StringIO(part.get_payload(decode=True).decode())
# Read the CSV file using the csv.reader() function
try:
csv_reader = csv.reader(csv_file)
except Exception as e:
# Code to handle the exception
# print(f"An error occurred: {e}")
log = log +".\n"+ " ERROR - " +str(e)
continue
log = log + ".\n" + "file read"
file = []
for i in csv_reader:
file.append(i)
column_names = ['AWBNO', 'ORGLOC', 'DSTLOC', 'AMOUNT', 'Random','REF NO', 'PU DATE', 'DEPOSIT DATE', 'BANK PAY REF NO', 'PAY REF DATE','N','O','P']
df = pd.DataFrame(file[8:-2],columns=column_names)
# Display the updated dataframe
df =df[['AWBNO', 'ORGLOC', 'DSTLOC', 'AMOUNT','REF NO', 'PU DATE', 'DEPOSIT DATE', 'BANK PAY REF NO', 'PAY REF DATE']].copy()
new_column_names = []
for column_name in df.columns:
# Remove invalid characters
cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)
# Check if the cleaned name starts with a number
if cleaned_name[0].isdigit():
# Prepend an underscore if it starts with a number
cleaned_name = '_' + cleaned_name
# Truncate the name if it exceeds 300 characters
cleaned_name = cleaned_name[:300]
new_column_names.append(cleaned_name)
# Assign the cleaned column names back to the DataFrame
df.columns = new_column_names
# Assign the cleaned column names back to the DataFrame
df = df.replace({np.nan: ''})
# Rename the attachment if label is 'PG_Combine report'
new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code)
if new_attachment_filename:
date = dt.date.today()
df['inserted_dt'] = date
df=df.astype(str)
table_id = 'fynd-db.finance_recon_tool_asia.COD_bluedart'
# Compare DataFrame schema with BigQuery table schema
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
# Insert the data into BigQuery
job_config = bigquery.LoadJobConfig()
schemas = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
# print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.")
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schemas)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
# print(f"Attachment '{attachment_filename}' does not require processing.")
log = log + ".\n" + "No processing required."
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
# print (log)
return (log)
# ecom
def Ecom_remittance_attachment(username, password, label, code, bq_client, email_ids, imap_server):
log = ""
log = log + ".\n"+".\n" + label
dataset_id = 'finance_recon_tool_asia'
project_id = 'fynd-db'
table_id = 'COD_ecom'
# Get the table reference
table_ref = bq_client.dataset(dataset_id).table(table_id)
# Iterate over the email IDs
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
log = log + ".\n"+ email_message["Subject"]
print(email_subject)
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Extract email body text
email_body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
email_body += part.get_payload(decode=True).decode("utf-8")
# Iterate over the email parts
for part in email_message.walk():
# Check if the part has a filename (attachment)
if part.get_filename():
# Extract the filename and check if it is supported
attachment_filename = part.get_filename()
if attachment_filename and is_attachment_supported(attachment_filename):
# Read the attachment file data into a BytesIO object
attachment_data = BytesIO(part.get_payload(decode=True))
# Read the file using pandas without saving it locally
try:
df = pd.read_excel(attachment_data)
except Exception as e:
# print(f"An error occurred: {e}")
log = log +".\n"+ " ERROR - " +str(e)
continue
log = log + ".\n" + "file read"
new_column_names = []
for column_name in df.columns:
# Remove invalid characters
cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)
# Check if the cleaned name starts with a number
if cleaned_name[0].isdigit():
# Prepend an underscore if it starts with a number
cleaned_name = '_' + cleaned_name
# Truncate the name if it exceeds 300 characters
cleaned_name = cleaned_name[:300]
new_column_names.append(cleaned_name)
# Assign the cleaned column names back to the DataFrame
df.columns = new_column_names
df = df.replace({np.nan: ''})
# Rename the attachment if label is 'PG_Combine report'
new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code)
if new_attachment_filename:
date = dt.date.today()
df['inserted_dt'] = date
# Schema comparison
# table_id = 'fynd-db.rishabh_test.test_ecom'
table_id = 'fynd-db.finance_recon_tool_asia.COD_ecom'
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
# Insert the data into BigQuery
job_config = bigquery.LoadJobConfig()
schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
table = bigquery.Table(table_id, schema=schema)
df=df.astype(str)
# table = bq_client.create_table(table)
job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
# print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.")
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schema)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
# print(f"Attachment '{attachment_filename}' does not require processing.")
log = log + ".\n" + "No processing required."
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
# print(log)
return (log)
# shadowfax
def Shadowfax_attachment(username, password, label, code, bq_client, email_ids, imap_server):
dataset_id = 'opex_analytics'
project_id = 'fynd-db'
table_id = 'COD_shadowfax'
log = ""
log = log + ".\n"+".\n" + label
# Get the table reference
table_ref = bq_client.dataset(dataset_id).table(table_id)
# Iterate over the email IDs
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
log = log + ".\n"+ email_message["Subject"]
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Extract email body text
email_body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
email_body += part.get_payload(decode=True).decode("utf-8")
# Iterate over the email parts
for part in email_message.walk():
# Check if the part has a filename (attachment)
if part.get_filename():
# Extract the filename and check if it is supported
attachment_filename = part.get_filename()
if attachment_filename and is_attachment_supported(attachment_filename):
# Read the attachment file data into a BytesIO object
attachment_data = BytesIO(part.get_payload(decode=True))
# Read the file using pandas without saving it locally
try:
df = pd.read_excel(attachment_data)
except Exception as e:
# print(f"An error occurred: {e}")
log = log +".\n"+ " ERROR - " + str(e)
continue
log = log + ".\n" + "file read"
if 'UTR Date' in df.columns:
df['Date of Payment'] = df['UTR Date']
df=df[['AWB','client_order_id','Request Date','Hub','Client','COD Amount','Delivery Date','Payment Mode','Order Type','Order Status','Date of Payment','UTR']].copy()
else:
df=df[['AWB','client_order_id','Request Date','Hub','Client','COD Amount','Delivery Date','Payment Mode','Order Type','Order Status','Date of Payment','UTR']].copy()
new_column_names = []
for column_name in df.columns:
# Remove invalid characters
cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)
# Check if the cleaned name starts with a number
if cleaned_name[0].isdigit():
# Prepend an underscore if it starts with a number
cleaned_name = '_' + cleaned_name
# Truncate the name if it exceeds 300 characters
cleaned_name = cleaned_name[:300]
new_column_names.append(cleaned_name)
# Assign the cleaned column names back to the DataFrame
df.columns = new_column_names
df = df.replace({np.nan: ''})
# Rename the attachment if label is 'PG_Combine report'
new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code)
if new_attachment_filename:
date = dt.date.today()
df['inserted_dt'] = date
table_id = 'fynd-db.opex_analytics.COD_shadowfax'
# table_id = 'fynd-db.rishabh_test.test_shadowfax'
# Compare DataFrame schema with BigQuery table schema
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
# Insert the data into BigQuery
job_config = bigquery.LoadJobConfig()
schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
table = bigquery.Table(table_id, schema=schema)
df=df.astype(str)
job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
# print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.")
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schema)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
# print(f"Attachment '{attachment_filename}' does not require processing.")
log = log + ".\n" + "No processing required."
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
# print (log)
return(log)
# Delhivery
def Delhivery_collection_attachment(username, password, label, bq_client, email_ids, imap_server):
# data_status = "Both DataFrames have the same columns in same sequence."
# dataset_id = 'rishabh_test'
# project_id = 'fynd-db'
# table_id = 'test_delhivery'
log = ""
log = log + ".\n"+".\n" + label
dataset_id = 'finance_recon_tool_asia'
project_id = 'fynd-db'
table_id = 'COD_delhivery'
# Iterate over the email IDs
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
# print(repr(email_message))
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
log = log + ".\n"+ email_message["Subject"]
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Get the email body
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == 'text/plain':
email_body = part.get_payload(decode=True).decode('utf-8')
break
else:
email_body = email_message.get_payload(decode=True).decode('utf-8')
# print("getting url")
# Find the link in the email body
start_index = email_body.find("https://")
end_index = email_body.find("\n", start_index)
csv_file_link = email_body[start_index:end_index]
# print(csv_file_link)
# Find the Transaction ID in the email body
start_index = email_body.find("Transaction ID: ")
end_index = email_body.find("\n", start_index)
transaction_id = email_body[start_index + len("Transaction ID: "):end_index]
try:
df = pd.read_csv(csv_file_link)
except Exception as e:
# If the file is not present it will give an error of unknown url
log = log +".\n"+ " ERROR - " +str(e)
continue
log = log + ".\n" + "file read"
df['Transaction_ID']=transaction_id
start_index = email_body.find("Remittance Date: ")
end_index = email_body.find("\n", start_index)
remittance_date = email_body[start_index + len("Remittance Date: "):end_index]
df['Remittance_Date']=remittance_date
date = dt.date.today()
df['inserted_dt'] = date
# table_id = 'fynd-db.rishabh_test.test_delhivery'
table_id = 'fynd-db.finance_recon_tool_asia.COD_delhivery'
# Compare DataFrame schema with BigQuery table schema
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
bq_client = bigquery.Client.from_service_account_json('gcp_credentials.json')
schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
table = bigquery.Table(table_id, schema=schema)
df=df.astype(str)
job_config = bigquery.LoadJobConfig(schema=schema)
job = bq_client.load_table_from_dataframe(df, table, job_config=job_config)
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schema)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
return (log)
# Xpress bees
def BusyBee_attachment(username, password, label, bq_client, email_ids, imap_server):
# data_status = "Both DataFrames have the same columns in same sequence.
# dataset_id = 'rishabh_test'
# project_id = 'fynd-db'
# table_id = 'test_xpressbees'
log = ""
log = log + ".\n"+".\n" + label
dataset_id = 'finance_recon_tool_asia.'
project_id = 'fynd-db'
table_id = 'COD_xpressbees'
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
log = log + ".\n"+ email_message["Subject"]
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Get the email body
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == 'text/html':
email_body = part.get_payload(decode=True)
charset = part.get_content_charset()
if charset:
email_body = email_body.decode(charset)
else:
email_body = email_body.decode('utf-8', 'ignore')
break
else:
email_body = email_message.get_payload(decode=True)
charset = email_message.get_content_charset()
if charset:
email_body = email_body.decode(charset)
else:
email_body = email_body.decode('utf-8', 'ignore')
csv_file_link=extract_csv_link(email_body)
try:
# Read the CSV file
df = pd.read_csv(csv_file_link[0],compression='zip')
except Exception as e:
print(f"An error occurred: {e}")
log = log +".\n"+ " ERROR - " + str(e)
continue
log = log + ".\n" + "file read"
new_column_names = []
for column_name in df.columns:
# Remove invalid characters
cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)
# Check if the cleaned name starts with a number
if cleaned_name[0].isdigit():
# Prepend an underscore if it starts with a number
cleaned_name = '_' + cleaned_name
# Truncate the name if it exceeds 300 characters
cleaned_name = cleaned_name[:300]
new_column_names.append(cleaned_name)
# Assign the cleaned column names back to the DataFrame
df.columns = new_column_names
date = dt.date.today()
df['inserted_dt'] = date
# table_id = 'fynd-db.rishabh_test.test_xpressbees'
table_id = 'fynd-db.finance_recon_tool_asia.COD_xpressbees'
# Compare DataFrame schema with BigQuery table schema
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
table = bigquery.Table(table_id, schema=schema)
df=df.astype(str)
job_config = bigquery.LoadJobConfig(schema=schema)
job = bq_client.load_table_from_dataframe(df, table, job_config=job_config)
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' " +"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schema)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
# print (log)
return (log)
# Mega Merge
def download_attachment(username, password, labels, code, log):
log = ""
imap_server = imaplib.IMAP4_SSL('imap.gmail.com')
# Log in to your Gmail account
imap_server.login(gmail_username, gmail_password)
# Authenticate with BigQuery using the credentials file
bq_client = bigquery.Client.from_service_account_json('gcp_credentials.json')
#Iterate over the labels
for label in labels:
# Search for emails on the specified date
imap_server.select('"' + label + '"')
_, email_ids = imap_server.search(None, f'(SENTON "{current_date}")')
if email_ids[0] == b'':
print(f"No matching email found in label '{label}'.")
log = log + ".\n" + f"No matching email found in label '{label}'"
continue
if label=='Blue_dart':
print(label)
log = log + ".\n" + Blue_dart_attachment(username, password, label, code, bq_client, email_ids, imap_server)
elif label=='Ecom_remittance':
print(label)
log = log + ".\n" + Ecom_remittance_attachment(username, password, label, code, bq_client, email_ids, imap_server)
elif label=='Shadowfax':
print(label)
log = log + ".\n" + Shadowfax_attachment(username, password, label, code, bq_client, email_ids, imap_server)
elif label=='Delhivery_Collection':
print(label)
log = log + ".\n" + Delhivery_collection_attachment(username, password, label, bq_client, email_ids, imap_server)
elif label=='BusyBee':
print(label)
log = log + ".\n" + BusyBee_attachment(username, password, label, bq_client, email_ids, imap_server)
# break
imap_server.logout()
return (log)
def get_pipeline_html():
"""Returns the HTML for the pipeline visualization with golden yellow electricity flow,
3D hover effects, and improved tooltips"""
return """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Data Pipeline Visualization</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/react/18.2.0/umd/react.production.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/react-dom/18.2.0/umd/react-dom.production.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/babel-standalone/7.21.4/babel.min.js"></script>
<style>
.pipeline-container {
width: 100%;
overflow-x: auto;
margin-bottom: 20px;
border-radius: 10px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}
.table-box {
stroke: #800000;
stroke-width: 2px;
rx: 8px;
ry: 8px;
filter: drop-shadow(0 3px 3px rgba(0, 0, 0, 0.15));
transition: all 0.3s ease;
}
.pipe {
stroke: #888888;
stroke-width: 20px;
stroke-linecap: round;
}
.flow {
stroke-width: 12px;
stroke-linecap: round;
stroke-dasharray: 12 8;
animation: flowAnimation 1.5s linear infinite;
filter: drop-shadow(0 0 3px #FFFF00);
}
@keyframes flowAnimation {
from { stroke-dashoffset: 40; }
to { stroke-dashoffset: 0; }
}
.table-text {
font-family: 'Arial', sans-serif;
font-size: 13px;
font-weight: bold;
fill: white;
text-anchor: middle;
dominant-baseline: middle;
transition: all 0.3s ease;
}
.title-text {
font-family: 'Arial', sans-serif;
font-size: 18px;
font-weight: bold;
fill: #444;
text-anchor: middle;
}
.tooltip-background {
fill: #f5f5f5;
stroke: #666666;
stroke-width: 1px;
rx: 6px;
ry: 6px;
opacity: 0;
transition: opacity 0.3s;
filter: drop-shadow(0 2px 4px rgba(0, 0, 0, 0.25));
}
.tooltip-text {
font-family: 'Arial', sans-serif;
font-size: 12px;
fill: #333333;
opacity: 0;
transition: opacity 0.3s;
}
.tooltip-arrow {
fill: #666666;
opacity: 0;
transition: opacity 0.3s;
}
.tooltip-trigger {
fill: transparent;
cursor: pointer;
}
.tooltip-group:hover .tooltip-background,
.tooltip-group:hover .tooltip-text,
.tooltip-group:hover .tooltip-arrow {
opacity: 1;
}
.block-group:hover .table-box {
transform: translateY(-5px);
filter: drop-shadow(0 8px 8px rgba(0, 0, 0, 0.25));
}
.block-group:hover .table-text {
transform: translateY(-5px);
}
/* Electricity effect */
.electricity-glow {
filter: drop-shadow(0 0 5px #FFD700);
}
</style>
</head>
<body>
<div id="pipeline-root"></div>
<script type="text/babel">
const PipelineVisualization = () => {
const [animationOffset, setAnimationOffset] = React.useState(0);
React.useEffect(() => {
const interval = setInterval(() => {
setAnimationOffset(prev => (prev + 1) % 40);
}, 100);
return () => clearInterval(interval);
}, []);
// SVG dimensions
const svgWidth = 1000;
const svgHeight = 350; // Increased height for tooltips
// Block dimensions
const blockWidth = 180;
const blockHeight = 90;
const blockSpacing = 30;
// Calculate positions
const blockCount = 6;
const totalBlockWidth = blockCount * blockWidth;
const totalSpacingWidth = (blockCount - 1) * blockSpacing;
const totalWidth = totalBlockWidth + totalSpacingWidth;
const startX = (svgWidth - totalWidth) / 2;
// Table definitions with tooltips
const tables = [
{
id: 1,
name: "COD_Email_Upload_Status",
x: startX,
y: 140,
width: blockWidth,
tooltip: "A log table that contains the timestamp of mails received by Delivery Partner & Payment Gateway"
},
{
id: 2,
name: "COD_dp_name",
x: startX + blockWidth + blockSpacing,
y: 140,
width: blockWidth,
tooltip: "A table which contains individual DP data from the mail's label. [For e.g. COD_Ecom, COD_Delhivery, etc.]"
},
{
id: 3,
name: "COD_Awb_wise_details",
x: startX + (blockWidth + blockSpacing) * 2,
y: 140,
width: blockWidth,
tooltip: "A union table that contains data from all the DPs on a consolidated label"
},
{
id: 4,
name: "COD_Raw_Data",
x: startX + (blockWidth + blockSpacing) * 3,
y: 140,
width: blockWidth,
tooltip: "A table that contains DP data aggregated to the level of awbs"
},
{
id: 5,
name: "bag_wise COD_data",
x: startX + (blockWidth + blockSpacing) * 4,
y: 140,
width: blockWidth,
tooltip: "A table that contains DP data aggregated to the level of bags"
},
{
id: 6,
name: "05_partner_collection",
x: startX + (blockWidth + blockSpacing) * 5,
y: 140,
width: blockWidth,
tooltip: "A table that contains unified collections from Fynd"
}
];
// Pipe segments
const pipes = [];
for (let i = 0; i < tables.length - 1; i++) {
pipes.push({
id: i + 1,
x1: tables[i].x + tables[i].width,
y1: tables[i].y + blockHeight/2,
x2: tables[i+1].x,
y2: tables[i+1].y + blockHeight/2
});
}
// Function to create tooltip with arrow
const renderTooltip = (table) => {
const tooltipWidth = Math.max(table.width + 60, 240); // Increased width
const tooltipHeight = 70; // Increased height
const tooltipX = table.x + table.width/2 - tooltipWidth/2;
const tooltipY = table.y - 85;
const arrowX = table.x + table.width / 2;
const arrowY = tooltipY + tooltipHeight;
// Split tooltip text into multiple lines if needed
const words = table.tooltip.split(' ');
const maxCharsPerLine = 35;
const lines = [];
let currentLine = '';
words.forEach(word => {
if ((currentLine + ' ' + word).length <= maxCharsPerLine) {
currentLine = currentLine ? `${currentLine} ${word}` : word;
} else {
lines.push(currentLine);
currentLine = word;
}
});
if (currentLine) {
lines.push(currentLine);
}
return (
<g className="tooltip-group" key={`tooltip-${table.id}`}>
{/* Invisible trigger area for the entire box */}
<rect
x={table.x}
y={table.y}
width={table.width}
height={blockHeight}
className="tooltip-trigger"
/>
{/* Tooltip background */}
<rect
x={tooltipX}
y={tooltipY}
width={tooltipWidth}
height={tooltipHeight}
className="tooltip-background"
/>
{/* Arrow pointing to the block */}
<polygon
points={`${arrowX-10},${arrowY} ${arrowX+10},${arrowY} ${arrowX},${arrowY+10}`}
className="tooltip-arrow"
/>
{/* Tooltip text */}
<g className="tooltip-text">
{lines.map((line, index) => (
<text
key={`line-${index}`}
x={tooltipX + tooltipWidth/2}
y={tooltipY + 20 + (index * 16)}
textAnchor="middle"
>
{line}
</text>
))}
</g>
</g>
);
};
return (
<svg width="100%" height={svgHeight} viewBox={`0 0 ${svgWidth} ${svgHeight}`} preserveAspectRatio="xMidYMid meet">
{/* Title */}
<text x={svgWidth/2} y="30" className="title-text"></text>
{/* Draw pipes first (behind tables) */}
{pipes.map(pipe => (
<g key={`pipe-${pipe.id}`}>
<line
x1={pipe.x1} y1={pipe.y1}
x2={pipe.x2} y2={pipe.y2}
className="pipe"
/>
<line
x1={pipe.x1} y1={pipe.y1}
x2={pipe.x2} y2={pipe.y2}
className="flow electricity-glow"
strokeDashoffset={animationOffset}
stroke="url(#electricityGradient)"
/>
</g>
))}
{/* Gradient definitions */}
<defs>
<linearGradient id="blockGradient" x1="0%" y1="0%" x2="100%" y2="0%">
<stop offset="0%" stopColor="#800000" />
<stop offset="100%" stopColor="#ff0000" />
</linearGradient>
<linearGradient id="electricityGradient" x1="0%" y1="0%" x2="100%" y2="0%">
<stop offset="0%" stopColor="#FFD700" />
<stop offset="50%" stopColor="#FFFF00" />
<stop offset="100%" stopColor="#FFD700" />
</linearGradient>
</defs>
{/* Draw tables with tooltips */}
{tables.map(table => (
<React.Fragment key={`table-${table.id}`}>
<g className="block-group">
<rect
x={table.x}
y={table.y}
width={table.width}
height={blockHeight}
className="table-box"
fill="url(#blockGradient)"
/>
<text
x={table.x + table.width/2}
y={table.y + blockHeight/2}
className="table-text"
>
{table.name}
</text>
</g>
{/* Tooltip with arrow */}
{renderTooltip(table)}
</React.Fragment>
))}
</svg>
);
};
ReactDOM.render(<PipelineVisualization />, document.getElementById('pipeline-root'));
</script>
</body>
</html>
"""
def combined_report(username, password, label, code, bq_client, email_ids, imap_server):
# Get the dataset and project ID
# data_status = "Both DataFrames have the same columns in same sequence."
# dataset_id = 'rishabh_test'
# project_id = 'fynd-db'
# table_id = 'test_pg_combined_report'
log = ""
log = log + ".\n"+".\n" + label
dataset_id = 'finance_recon_tool_asia'
project_id = 'fynd-db'
table_id = 'pg_combined_report'
# Get the table reference
table_ref = bq_client.dataset(dataset_id).table(table_id)
# Iterate over the email IDs
for email_id in email_ids[0].split():
# Fetch the email content
_, email_data = imap_server.fetch(email_id, '(RFC822)')
# Parse the email message
raw_email = email_data[0][1]
email_message = email.message_from_bytes(raw_email)
email_subject = email_message["Subject"]
log = log + ".\n"+ email_message["from"]
log = log + ".\n"+ email_message["Subject"]
received_time_str = email_message.get("Received")
match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str)
if match:
received_time_str = match.group(1)
received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z")
else:
received_time = None
email_subject = email_subject +" "+str(received_time)
value_check = upload_status (label, bq_client, email_subject, imap_server)
if value_check is False:
# Extract email body text
email_body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/html":
email_body += part.get_payload(decode=True).decode("utf-8")
# Iterate over the email parts
for part in email_message.walk():
# Check if the part has a filename (attachment)
if part.get_filename():
# Extract the filename and check if it is supported
attachment_filename = part.get_filename()
if attachment_filename and is_attachment_supported(attachment_filename):
# Read the attachment file data into a BytesIO object
attachment_data = BytesIO(part.get_payload(decode=True))
try:
# Read the file using pandas without saving it locally
df = pd.read_csv(attachment_data, compression='zip')
except Exception as e:
# print(f"An error occurred: {e}")
log = log +".\n"+ " ERROR - " +str(e)
continue
log = log + ".\n" + "file read"
idx = email_body.upper().index("HI SHOPSENSE RETAIL TECHNOLOGIES LIMITED (") + 42
df = df.replace({np.nan: ''})
# Rename the attachment if label is 'PG_Combine report'
new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code)
if new_attachment_filename:
# Add the extracted code as a new column named "MID"
df['MID'] = email_body[idx:idx+14]
# Add the insertion date column
date = dt.date.today()
df['inserted_dt'] = date
table_id = 'fynd-db.finance_recon_tool_asia.pg_combined_report'
# table_id = 'fynd-db.rishabh_test.test_pg_combined_report'
# Compare DataFrame schema with BigQuery table schema
data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log)
log = log + ".\n" + str(data_status)
if data_status != "Both DataFrames have the same columns in same sequence.":
continue
else:
# Insert the data into BigQuery
job_config = bigquery.LoadJobConfig()
schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns]
table = bigquery.Table(table_id, schema=schema)
df=df.astype(str)
job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete
log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n"
table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status'
new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject}
row_to_insert = [new_row]
table_ref = bigquery.Table(table_id,schema=schema)
try:
bq_client.insert_rows(table_ref, rows = row_to_insert)
except Exception as e:
print(f"Errors : {e}")
else:
log = log + ".\n" + "No processing required."
else:
log = log + ".\n" + "Data has already been uploaded."+"\n"
# print (log)
return (log)
#old utils
def load_gcp_credentials():
try:
# Retrieve GCP credentials from the environment variable
gcp_credentials_str = os.getenv('GCP_CREDENTIALS')
if not gcp_credentials_str:
raise ValueError("GCP_CREDENTIALS environment variable not defined")
# Parse the secret (assuming it's a JSON string)
gcp_credentials = json.loads(gcp_credentials_str)
# Save to a temporary file (Google Cloud uses a JSON file for authentication)
with open("gcp_credentials.json", "w") as f:
json.dump(gcp_credentials, f)
# Authenticate using Google Cloud SDK
credentials_from_file = service_account.Credentials.from_service_account_file("gcp_credentials.json")
# Return the credentials to be used later
return credentials_from_file
except Exception as e:
print(f"Error retrieving or loading GCP credentials: {str(e)}")
return None
# Upload to BQ
def upload_to_bigquery(df, table_id):
try:
# Load the GCP credentials from Hugging Face secret
bigquery_creds = load_gcp_credentials()
if not bigquery_creds:
st.error("Unable to load GCP credentials.")
return
# Initialize BigQuery client with the loaded credentials
client = bigquery.Client(credentials=bigquery_creds)
# Convert the DataFrame to a list of dictionaries
records = df.to_dict(orient='records')
# Prepare the table schema if needed (optional)
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND", # Use WRITE_TRUNCATE to overwrite, WRITE_APPEND to append
)
# Load the data to BigQuery
load_job = client.load_table_from_json(records, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
st.success("Data submitted")
except Exception as e:
st.error(f"An error occurred while uploading to BigQuery: {e}")
#Authenticate BigQuery
def authenticate_bigquery():
creds = load_gcp_credentials()
if not creds:
st.error("Unable to load GCP credentials for BigQuery authentication.")
return None
return creds