Spaces:
Build error
Build error
| import os | |
| import imaplib | |
| import email | |
| import re | |
| import zipfile | |
| import shutil | |
| from google.cloud import bigquery | |
| from datetime import date,datetime,timedelta | |
| import pandas as pd | |
| import string | |
| import datetime as dt | |
| import numpy as np | |
| from io import BytesIO | |
| import io | |
| import csv | |
| import urllib.request | |
| import certifi | |
| import email | |
| from email import policy | |
| from email.parser import BytesParser | |
| import streamlit.components.v1 as components | |
| import base64 | |
| import json | |
| from google.oauth2 import service_account | |
| import base64 | |
| gmail_username = "rasikajadhav@gofynd.com" | |
| gmail_password = "wmjl srmk hcpv fati" | |
| insertion_date = datetime.now().strftime('%d-%b-%Y') | |
| def is_attachment_supported(filename): | |
| supported_extensions = ['.zip', '.xlsx', '.xls', '.csv'] | |
| return any(filename.lower().endswith(ext) for ext in supported_extensions) | |
| def extract_attachment_name(filename, label, email_body, code): | |
| # Extract logic to rename the attachment file if needed | |
| # Modify as per your requirements | |
| if code: | |
| filename_parts = filename.split('.') | |
| new_filename = f"{filename_parts[0]}_{code}.{filename_parts[1]}" | |
| return new_filename | |
| return filename | |
| def extract_csv_link(email_content): | |
| # Regular expression for matching URLs | |
| url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') | |
| # Find all matches in the email content | |
| urls = re.findall(url_pattern, email_content) | |
| return urls | |
| def upload_status(label, bq_client, email_subject, imap_server): | |
| project_id = "fynd-db" | |
| dataset_id = "finance_recon_tool_asia" #### Changed | |
| table_id = "COD_Email_Upload_Status" | |
| label_value = label | |
| # Corrected SQL query with the FROM clause | |
| sql_query = f""" | |
| SELECT Subject_Date | |
| FROM `{project_id}.{dataset_id}.{table_id}` | |
| WHERE Label = @label_value""" | |
| # Set query parameters | |
| query_params = [bigquery.ScalarQueryParameter("label_value", "STRING", label_value)] | |
| # Execute the query and convert the result to a Pandas DataFrame | |
| try: | |
| df = bq_client.query(sql_query, job_config=bigquery.QueryJobConfig(query_parameters=query_params)).to_dataframe() | |
| except Exception as e: | |
| print(f"Error executing query: {e}") | |
| value_exists = any(df["Subject_Date"] == email_subject) | |
| if value_exists: | |
| return True | |
| else: | |
| return False | |
| def compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label,log): | |
| try: | |
| log = log + '.\n' + 'comparison' | |
| # table_ref = bq_client.dataset(dataset_id).table(table_id) | |
| # print(table_ref) | |
| table = bq_client.get_table(table_id) | |
| bq_schema_dict = {field.name: field.field_type for field in table.schema} | |
| bq_schema_df = pd.DataFrame(list(bq_schema_dict.items()), columns=['column_name', 'bq_data_type']) | |
| # Compare column names | |
| columns_df1 = set(df.columns) | |
| columns_df2 = set(bq_schema_df['column_name']) | |
| columns_sequence_df1 = tuple(columns_df1) | |
| columns_sequence_df2 = tuple(columns_df2) | |
| # print(columns_df2) | |
| common_columns = columns_df1.intersection(columns_df2) | |
| if len(common_columns) != len(columns_df1): | |
| extra_columns_df1 = columns_df1 - common_columns | |
| data_status = "Extra columns received in the attachment" | |
| return data_status | |
| elif len(common_columns) != len(columns_df2): | |
| extra_columns_df2 = columns_df2 - common_columns | |
| data_status = "Less number columns received in the attachment" | |
| return data_status | |
| elif (len(common_columns) == len(columns_df2)) and (columns_sequence_df1 == columns_sequence_df2): | |
| data_status = "Both DataFrames have the same columns in same sequence." | |
| return data_status | |
| except Exception as e: | |
| print(e) | |
| # bluedart | |
| def Blue_dart_attachment(username, password, label, code, bq_client, email_ids, imap_server): | |
| dataset_id = 'finance_recon_tool_asia' | |
| project_id = 'fynd-db' | |
| table_id = 'COD_bluedart' | |
| # data_status = "Both DataFrames have the same columns in same sequence." | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| log = log + ".\n"+ email_message["Subject"] | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Extract email body text | |
| email_body = "" | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == "text/plain": | |
| email_body += part.get_payload(decode=True).decode("utf-8") | |
| # Iterate over the email parts | |
| for part in email_message.walk(): | |
| # Check if the part has a filename (attachment) | |
| if part.get_filename(): | |
| # Extract the filename and check if it is supported | |
| attachment_filename = part.get_filename() | |
| if attachment_filename and is_attachment_supported(attachment_filename): | |
| # Read the attachment file data into a BytesIO object | |
| csv_file = io.StringIO(part.get_payload(decode=True).decode()) | |
| # Read the CSV file using the csv.reader() function | |
| try: | |
| csv_reader = csv.reader(csv_file) | |
| except Exception as e: | |
| # Code to handle the exception | |
| # print(f"An error occurred: {e}") | |
| log = log +".\n"+ " ERROR - " +str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| file = [] | |
| for i in csv_reader: | |
| file.append(i) | |
| column_names = ['AWBNO', 'ORGLOC', 'DSTLOC', 'AMOUNT', 'Random','REF NO', 'PU DATE', 'DEPOSIT DATE', 'BANK PAY REF NO', 'PAY REF DATE','N','O','P'] | |
| df = pd.DataFrame(file[8:-2],columns=column_names) | |
| # Display the updated dataframe | |
| df =df[['AWBNO', 'ORGLOC', 'DSTLOC', 'AMOUNT','REF NO', 'PU DATE', 'DEPOSIT DATE', 'BANK PAY REF NO', 'PAY REF DATE']].copy() | |
| new_column_names = [] | |
| for column_name in df.columns: | |
| # Remove invalid characters | |
| cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name) | |
| # Check if the cleaned name starts with a number | |
| if cleaned_name[0].isdigit(): | |
| # Prepend an underscore if it starts with a number | |
| cleaned_name = '_' + cleaned_name | |
| # Truncate the name if it exceeds 300 characters | |
| cleaned_name = cleaned_name[:300] | |
| new_column_names.append(cleaned_name) | |
| # Assign the cleaned column names back to the DataFrame | |
| df.columns = new_column_names | |
| # Assign the cleaned column names back to the DataFrame | |
| df = df.replace({np.nan: ''}) | |
| # Rename the attachment if label is 'PG_Combine report' | |
| new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code) | |
| if new_attachment_filename: | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| df=df.astype(str) | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_bluedart' | |
| # Compare DataFrame schema with BigQuery table schema | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| # Insert the data into BigQuery | |
| job_config = bigquery.LoadJobConfig() | |
| schemas = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config) | |
| job.result() # Wait for the job to complete | |
| # print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.") | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schemas) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| # print(f"Attachment '{attachment_filename}' does not require processing.") | |
| log = log + ".\n" + "No processing required." | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| # print (log) | |
| return (log) | |
| # ecom | |
| def Ecom_remittance_attachment(username, password, label, code, bq_client, email_ids, imap_server): | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| dataset_id = 'finance_recon_tool_asia' | |
| project_id = 'fynd-db' | |
| table_id = 'COD_ecom' | |
| # Get the table reference | |
| table_ref = bq_client.dataset(dataset_id).table(table_id) | |
| # Iterate over the email IDs | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| log = log + ".\n"+ email_message["Subject"] | |
| print(email_subject) | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Extract email body text | |
| email_body = "" | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == "text/plain": | |
| email_body += part.get_payload(decode=True).decode("utf-8") | |
| # Iterate over the email parts | |
| for part in email_message.walk(): | |
| # Check if the part has a filename (attachment) | |
| if part.get_filename(): | |
| # Extract the filename and check if it is supported | |
| attachment_filename = part.get_filename() | |
| if attachment_filename and is_attachment_supported(attachment_filename): | |
| # Read the attachment file data into a BytesIO object | |
| attachment_data = BytesIO(part.get_payload(decode=True)) | |
| # Read the file using pandas without saving it locally | |
| try: | |
| df = pd.read_excel(attachment_data) | |
| except Exception as e: | |
| # print(f"An error occurred: {e}") | |
| log = log +".\n"+ " ERROR - " +str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| new_column_names = [] | |
| for column_name in df.columns: | |
| # Remove invalid characters | |
| cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name) | |
| # Check if the cleaned name starts with a number | |
| if cleaned_name[0].isdigit(): | |
| # Prepend an underscore if it starts with a number | |
| cleaned_name = '_' + cleaned_name | |
| # Truncate the name if it exceeds 300 characters | |
| cleaned_name = cleaned_name[:300] | |
| new_column_names.append(cleaned_name) | |
| # Assign the cleaned column names back to the DataFrame | |
| df.columns = new_column_names | |
| df = df.replace({np.nan: ''}) | |
| # Rename the attachment if label is 'PG_Combine report' | |
| new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code) | |
| if new_attachment_filename: | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| # Schema comparison | |
| # table_id = 'fynd-db.rishabh_test.test_ecom' | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_ecom' | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| # Insert the data into BigQuery | |
| job_config = bigquery.LoadJobConfig() | |
| schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| table = bigquery.Table(table_id, schema=schema) | |
| df=df.astype(str) | |
| # table = bq_client.create_table(table) | |
| job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config) | |
| job.result() # Wait for the job to complete | |
| # print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.") | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schema) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| # print(f"Attachment '{attachment_filename}' does not require processing.") | |
| log = log + ".\n" + "No processing required." | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| # print(log) | |
| return (log) | |
| # shadowfax | |
| def Shadowfax_attachment(username, password, label, code, bq_client, email_ids, imap_server): | |
| dataset_id = 'opex_analytics' | |
| project_id = 'fynd-db' | |
| table_id = 'COD_shadowfax' | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| # Get the table reference | |
| table_ref = bq_client.dataset(dataset_id).table(table_id) | |
| # Iterate over the email IDs | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| log = log + ".\n"+ email_message["Subject"] | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Extract email body text | |
| email_body = "" | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == "text/plain": | |
| email_body += part.get_payload(decode=True).decode("utf-8") | |
| # Iterate over the email parts | |
| for part in email_message.walk(): | |
| # Check if the part has a filename (attachment) | |
| if part.get_filename(): | |
| # Extract the filename and check if it is supported | |
| attachment_filename = part.get_filename() | |
| if attachment_filename and is_attachment_supported(attachment_filename): | |
| # Read the attachment file data into a BytesIO object | |
| attachment_data = BytesIO(part.get_payload(decode=True)) | |
| # Read the file using pandas without saving it locally | |
| try: | |
| df = pd.read_excel(attachment_data) | |
| except Exception as e: | |
| # print(f"An error occurred: {e}") | |
| log = log +".\n"+ " ERROR - " + str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| if 'UTR Date' in df.columns: | |
| df['Date of Payment'] = df['UTR Date'] | |
| df=df[['AWB','client_order_id','Request Date','Hub','Client','COD Amount','Delivery Date','Payment Mode','Order Type','Order Status','Date of Payment','UTR']].copy() | |
| else: | |
| df=df[['AWB','client_order_id','Request Date','Hub','Client','COD Amount','Delivery Date','Payment Mode','Order Type','Order Status','Date of Payment','UTR']].copy() | |
| new_column_names = [] | |
| for column_name in df.columns: | |
| # Remove invalid characters | |
| cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name) | |
| # Check if the cleaned name starts with a number | |
| if cleaned_name[0].isdigit(): | |
| # Prepend an underscore if it starts with a number | |
| cleaned_name = '_' + cleaned_name | |
| # Truncate the name if it exceeds 300 characters | |
| cleaned_name = cleaned_name[:300] | |
| new_column_names.append(cleaned_name) | |
| # Assign the cleaned column names back to the DataFrame | |
| df.columns = new_column_names | |
| df = df.replace({np.nan: ''}) | |
| # Rename the attachment if label is 'PG_Combine report' | |
| new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code) | |
| if new_attachment_filename: | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| table_id = 'fynd-db.opex_analytics.COD_shadowfax' | |
| # table_id = 'fynd-db.rishabh_test.test_shadowfax' | |
| # Compare DataFrame schema with BigQuery table schema | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| # Insert the data into BigQuery | |
| job_config = bigquery.LoadJobConfig() | |
| schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| table = bigquery.Table(table_id, schema=schema) | |
| df=df.astype(str) | |
| job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config) | |
| job.result() # Wait for the job to complete | |
| # print(f"Data from attachment '{new_attachment_filename}' inserted into BigQuery table successfully.") | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schema) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| # print(f"Attachment '{attachment_filename}' does not require processing.") | |
| log = log + ".\n" + "No processing required." | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| # print (log) | |
| return(log) | |
| # Delhivery | |
| def Delhivery_collection_attachment(username, password, label, bq_client, email_ids, imap_server): | |
| # data_status = "Both DataFrames have the same columns in same sequence." | |
| # dataset_id = 'rishabh_test' | |
| # project_id = 'fynd-db' | |
| # table_id = 'test_delhivery' | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| dataset_id = 'finance_recon_tool_asia' | |
| project_id = 'fynd-db' | |
| table_id = 'COD_delhivery' | |
| # Iterate over the email IDs | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| # print(repr(email_message)) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| log = log + ".\n"+ email_message["Subject"] | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Get the email body | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == 'text/plain': | |
| email_body = part.get_payload(decode=True).decode('utf-8') | |
| break | |
| else: | |
| email_body = email_message.get_payload(decode=True).decode('utf-8') | |
| # print("getting url") | |
| # Find the link in the email body | |
| start_index = email_body.find("https://") | |
| end_index = email_body.find("\n", start_index) | |
| csv_file_link = email_body[start_index:end_index] | |
| # print(csv_file_link) | |
| # Find the Transaction ID in the email body | |
| start_index = email_body.find("Transaction ID: ") | |
| end_index = email_body.find("\n", start_index) | |
| transaction_id = email_body[start_index + len("Transaction ID: "):end_index] | |
| try: | |
| df = pd.read_csv(csv_file_link) | |
| except Exception as e: | |
| # If the file is not present it will give an error of unknown url | |
| log = log +".\n"+ " ERROR - " +str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| df['Transaction_ID']=transaction_id | |
| start_index = email_body.find("Remittance Date: ") | |
| end_index = email_body.find("\n", start_index) | |
| remittance_date = email_body[start_index + len("Remittance Date: "):end_index] | |
| df['Remittance_Date']=remittance_date | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| # table_id = 'fynd-db.rishabh_test.test_delhivery' | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_delhivery' | |
| # Compare DataFrame schema with BigQuery table schema | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| bq_client = bigquery.Client.from_service_account_json('gcp_credentials.json') | |
| schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| table = bigquery.Table(table_id, schema=schema) | |
| df=df.astype(str) | |
| job_config = bigquery.LoadJobConfig(schema=schema) | |
| job = bq_client.load_table_from_dataframe(df, table, job_config=job_config) | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schema) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| return (log) | |
| # Xpress bees | |
| def BusyBee_attachment(username, password, label, bq_client, email_ids, imap_server): | |
| # data_status = "Both DataFrames have the same columns in same sequence. | |
| # dataset_id = 'rishabh_test' | |
| # project_id = 'fynd-db' | |
| # table_id = 'test_xpressbees' | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| dataset_id = 'finance_recon_tool_asia.' | |
| project_id = 'fynd-db' | |
| table_id = 'COD_xpressbees' | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| log = log + ".\n"+ email_message["Subject"] | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Get the email body | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == 'text/html': | |
| email_body = part.get_payload(decode=True) | |
| charset = part.get_content_charset() | |
| if charset: | |
| email_body = email_body.decode(charset) | |
| else: | |
| email_body = email_body.decode('utf-8', 'ignore') | |
| break | |
| else: | |
| email_body = email_message.get_payload(decode=True) | |
| charset = email_message.get_content_charset() | |
| if charset: | |
| email_body = email_body.decode(charset) | |
| else: | |
| email_body = email_body.decode('utf-8', 'ignore') | |
| csv_file_link=extract_csv_link(email_body) | |
| try: | |
| # Read the CSV file | |
| df = pd.read_csv(csv_file_link[0],compression='zip') | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| log = log +".\n"+ " ERROR - " + str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| new_column_names = [] | |
| for column_name in df.columns: | |
| # Remove invalid characters | |
| cleaned_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name) | |
| # Check if the cleaned name starts with a number | |
| if cleaned_name[0].isdigit(): | |
| # Prepend an underscore if it starts with a number | |
| cleaned_name = '_' + cleaned_name | |
| # Truncate the name if it exceeds 300 characters | |
| cleaned_name = cleaned_name[:300] | |
| new_column_names.append(cleaned_name) | |
| # Assign the cleaned column names back to the DataFrame | |
| df.columns = new_column_names | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| # table_id = 'fynd-db.rishabh_test.test_xpressbees' | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_xpressbees' | |
| # Compare DataFrame schema with BigQuery table schema | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| table = bigquery.Table(table_id, schema=schema) | |
| df=df.astype(str) | |
| job_config = bigquery.LoadJobConfig(schema=schema) | |
| job = bq_client.load_table_from_dataframe(df, table, job_config=job_config) | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' " +"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schema) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| # print (log) | |
| return (log) | |
| # Mega Merge | |
| def download_attachment(username, password, labels, code, log): | |
| log = "" | |
| imap_server = imaplib.IMAP4_SSL('imap.gmail.com') | |
| # Log in to your Gmail account | |
| imap_server.login(gmail_username, gmail_password) | |
| # Authenticate with BigQuery using the credentials file | |
| bq_client = bigquery.Client.from_service_account_json('gcp_credentials.json') | |
| #Iterate over the labels | |
| for label in labels: | |
| # Search for emails on the specified date | |
| imap_server.select('"' + label + '"') | |
| _, email_ids = imap_server.search(None, f'(SENTON "{current_date}")') | |
| if email_ids[0] == b'': | |
| print(f"No matching email found in label '{label}'.") | |
| log = log + ".\n" + f"No matching email found in label '{label}'" | |
| continue | |
| if label=='Blue_dart': | |
| print(label) | |
| log = log + ".\n" + Blue_dart_attachment(username, password, label, code, bq_client, email_ids, imap_server) | |
| elif label=='Ecom_remittance': | |
| print(label) | |
| log = log + ".\n" + Ecom_remittance_attachment(username, password, label, code, bq_client, email_ids, imap_server) | |
| elif label=='Shadowfax': | |
| print(label) | |
| log = log + ".\n" + Shadowfax_attachment(username, password, label, code, bq_client, email_ids, imap_server) | |
| elif label=='Delhivery_Collection': | |
| print(label) | |
| log = log + ".\n" + Delhivery_collection_attachment(username, password, label, bq_client, email_ids, imap_server) | |
| elif label=='BusyBee': | |
| print(label) | |
| log = log + ".\n" + BusyBee_attachment(username, password, label, bq_client, email_ids, imap_server) | |
| # break | |
| imap_server.logout() | |
| return (log) | |
| def get_pipeline_html(): | |
| """Returns the HTML for the pipeline visualization with golden yellow electricity flow, | |
| 3D hover effects, and improved tooltips""" | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Data Pipeline Visualization</title> | |
| <script src="https://cdnjs.cloudflare.com/ajax/libs/react/18.2.0/umd/react.production.min.js"></script> | |
| <script src="https://cdnjs.cloudflare.com/ajax/libs/react-dom/18.2.0/umd/react-dom.production.min.js"></script> | |
| <script src="https://cdnjs.cloudflare.com/ajax/libs/babel-standalone/7.21.4/babel.min.js"></script> | |
| <style> | |
| .pipeline-container { | |
| width: 100%; | |
| overflow-x: auto; | |
| margin-bottom: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); | |
| } | |
| .table-box { | |
| stroke: #800000; | |
| stroke-width: 2px; | |
| rx: 8px; | |
| ry: 8px; | |
| filter: drop-shadow(0 3px 3px rgba(0, 0, 0, 0.15)); | |
| transition: all 0.3s ease; | |
| } | |
| .pipe { | |
| stroke: #888888; | |
| stroke-width: 20px; | |
| stroke-linecap: round; | |
| } | |
| .flow { | |
| stroke-width: 12px; | |
| stroke-linecap: round; | |
| stroke-dasharray: 12 8; | |
| animation: flowAnimation 1.5s linear infinite; | |
| filter: drop-shadow(0 0 3px #FFFF00); | |
| } | |
| @keyframes flowAnimation { | |
| from { stroke-dashoffset: 40; } | |
| to { stroke-dashoffset: 0; } | |
| } | |
| .table-text { | |
| font-family: 'Arial', sans-serif; | |
| font-size: 13px; | |
| font-weight: bold; | |
| fill: white; | |
| text-anchor: middle; | |
| dominant-baseline: middle; | |
| transition: all 0.3s ease; | |
| } | |
| .title-text { | |
| font-family: 'Arial', sans-serif; | |
| font-size: 18px; | |
| font-weight: bold; | |
| fill: #444; | |
| text-anchor: middle; | |
| } | |
| .tooltip-background { | |
| fill: #f5f5f5; | |
| stroke: #666666; | |
| stroke-width: 1px; | |
| rx: 6px; | |
| ry: 6px; | |
| opacity: 0; | |
| transition: opacity 0.3s; | |
| filter: drop-shadow(0 2px 4px rgba(0, 0, 0, 0.25)); | |
| } | |
| .tooltip-text { | |
| font-family: 'Arial', sans-serif; | |
| font-size: 12px; | |
| fill: #333333; | |
| opacity: 0; | |
| transition: opacity 0.3s; | |
| } | |
| .tooltip-arrow { | |
| fill: #666666; | |
| opacity: 0; | |
| transition: opacity 0.3s; | |
| } | |
| .tooltip-trigger { | |
| fill: transparent; | |
| cursor: pointer; | |
| } | |
| .tooltip-group:hover .tooltip-background, | |
| .tooltip-group:hover .tooltip-text, | |
| .tooltip-group:hover .tooltip-arrow { | |
| opacity: 1; | |
| } | |
| .block-group:hover .table-box { | |
| transform: translateY(-5px); | |
| filter: drop-shadow(0 8px 8px rgba(0, 0, 0, 0.25)); | |
| } | |
| .block-group:hover .table-text { | |
| transform: translateY(-5px); | |
| } | |
| /* Electricity effect */ | |
| .electricity-glow { | |
| filter: drop-shadow(0 0 5px #FFD700); | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <div id="pipeline-root"></div> | |
| <script type="text/babel"> | |
| const PipelineVisualization = () => { | |
| const [animationOffset, setAnimationOffset] = React.useState(0); | |
| React.useEffect(() => { | |
| const interval = setInterval(() => { | |
| setAnimationOffset(prev => (prev + 1) % 40); | |
| }, 100); | |
| return () => clearInterval(interval); | |
| }, []); | |
| // SVG dimensions | |
| const svgWidth = 1000; | |
| const svgHeight = 350; // Increased height for tooltips | |
| // Block dimensions | |
| const blockWidth = 180; | |
| const blockHeight = 90; | |
| const blockSpacing = 30; | |
| // Calculate positions | |
| const blockCount = 6; | |
| const totalBlockWidth = blockCount * blockWidth; | |
| const totalSpacingWidth = (blockCount - 1) * blockSpacing; | |
| const totalWidth = totalBlockWidth + totalSpacingWidth; | |
| const startX = (svgWidth - totalWidth) / 2; | |
| // Table definitions with tooltips | |
| const tables = [ | |
| { | |
| id: 1, | |
| name: "COD_Email_Upload_Status", | |
| x: startX, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A log table that contains the timestamp of mails received by Delivery Partner & Payment Gateway" | |
| }, | |
| { | |
| id: 2, | |
| name: "COD_dp_name", | |
| x: startX + blockWidth + blockSpacing, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A table which contains individual DP data from the mail's label. [For e.g. COD_Ecom, COD_Delhivery, etc.]" | |
| }, | |
| { | |
| id: 3, | |
| name: "COD_Awb_wise_details", | |
| x: startX + (blockWidth + blockSpacing) * 2, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A union table that contains data from all the DPs on a consolidated label" | |
| }, | |
| { | |
| id: 4, | |
| name: "COD_Raw_Data", | |
| x: startX + (blockWidth + blockSpacing) * 3, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A table that contains DP data aggregated to the level of awbs" | |
| }, | |
| { | |
| id: 5, | |
| name: "bag_wise COD_data", | |
| x: startX + (blockWidth + blockSpacing) * 4, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A table that contains DP data aggregated to the level of bags" | |
| }, | |
| { | |
| id: 6, | |
| name: "05_partner_collection", | |
| x: startX + (blockWidth + blockSpacing) * 5, | |
| y: 140, | |
| width: blockWidth, | |
| tooltip: "A table that contains unified collections from Fynd" | |
| } | |
| ]; | |
| // Pipe segments | |
| const pipes = []; | |
| for (let i = 0; i < tables.length - 1; i++) { | |
| pipes.push({ | |
| id: i + 1, | |
| x1: tables[i].x + tables[i].width, | |
| y1: tables[i].y + blockHeight/2, | |
| x2: tables[i+1].x, | |
| y2: tables[i+1].y + blockHeight/2 | |
| }); | |
| } | |
| // Function to create tooltip with arrow | |
| const renderTooltip = (table) => { | |
| const tooltipWidth = Math.max(table.width + 60, 240); // Increased width | |
| const tooltipHeight = 70; // Increased height | |
| const tooltipX = table.x + table.width/2 - tooltipWidth/2; | |
| const tooltipY = table.y - 85; | |
| const arrowX = table.x + table.width / 2; | |
| const arrowY = tooltipY + tooltipHeight; | |
| // Split tooltip text into multiple lines if needed | |
| const words = table.tooltip.split(' '); | |
| const maxCharsPerLine = 35; | |
| const lines = []; | |
| let currentLine = ''; | |
| words.forEach(word => { | |
| if ((currentLine + ' ' + word).length <= maxCharsPerLine) { | |
| currentLine = currentLine ? `${currentLine} ${word}` : word; | |
| } else { | |
| lines.push(currentLine); | |
| currentLine = word; | |
| } | |
| }); | |
| if (currentLine) { | |
| lines.push(currentLine); | |
| } | |
| return ( | |
| <g className="tooltip-group" key={`tooltip-${table.id}`}> | |
| {/* Invisible trigger area for the entire box */} | |
| <rect | |
| x={table.x} | |
| y={table.y} | |
| width={table.width} | |
| height={blockHeight} | |
| className="tooltip-trigger" | |
| /> | |
| {/* Tooltip background */} | |
| <rect | |
| x={tooltipX} | |
| y={tooltipY} | |
| width={tooltipWidth} | |
| height={tooltipHeight} | |
| className="tooltip-background" | |
| /> | |
| {/* Arrow pointing to the block */} | |
| <polygon | |
| points={`${arrowX-10},${arrowY} ${arrowX+10},${arrowY} ${arrowX},${arrowY+10}`} | |
| className="tooltip-arrow" | |
| /> | |
| {/* Tooltip text */} | |
| <g className="tooltip-text"> | |
| {lines.map((line, index) => ( | |
| <text | |
| key={`line-${index}`} | |
| x={tooltipX + tooltipWidth/2} | |
| y={tooltipY + 20 + (index * 16)} | |
| textAnchor="middle" | |
| > | |
| {line} | |
| </text> | |
| ))} | |
| </g> | |
| </g> | |
| ); | |
| }; | |
| return ( | |
| <svg width="100%" height={svgHeight} viewBox={`0 0 ${svgWidth} ${svgHeight}`} preserveAspectRatio="xMidYMid meet"> | |
| {/* Title */} | |
| <text x={svgWidth/2} y="30" className="title-text"></text> | |
| {/* Draw pipes first (behind tables) */} | |
| {pipes.map(pipe => ( | |
| <g key={`pipe-${pipe.id}`}> | |
| <line | |
| x1={pipe.x1} y1={pipe.y1} | |
| x2={pipe.x2} y2={pipe.y2} | |
| className="pipe" | |
| /> | |
| <line | |
| x1={pipe.x1} y1={pipe.y1} | |
| x2={pipe.x2} y2={pipe.y2} | |
| className="flow electricity-glow" | |
| strokeDashoffset={animationOffset} | |
| stroke="url(#electricityGradient)" | |
| /> | |
| </g> | |
| ))} | |
| {/* Gradient definitions */} | |
| <defs> | |
| <linearGradient id="blockGradient" x1="0%" y1="0%" x2="100%" y2="0%"> | |
| <stop offset="0%" stopColor="#800000" /> | |
| <stop offset="100%" stopColor="#ff0000" /> | |
| </linearGradient> | |
| <linearGradient id="electricityGradient" x1="0%" y1="0%" x2="100%" y2="0%"> | |
| <stop offset="0%" stopColor="#FFD700" /> | |
| <stop offset="50%" stopColor="#FFFF00" /> | |
| <stop offset="100%" stopColor="#FFD700" /> | |
| </linearGradient> | |
| </defs> | |
| {/* Draw tables with tooltips */} | |
| {tables.map(table => ( | |
| <React.Fragment key={`table-${table.id}`}> | |
| <g className="block-group"> | |
| <rect | |
| x={table.x} | |
| y={table.y} | |
| width={table.width} | |
| height={blockHeight} | |
| className="table-box" | |
| fill="url(#blockGradient)" | |
| /> | |
| <text | |
| x={table.x + table.width/2} | |
| y={table.y + blockHeight/2} | |
| className="table-text" | |
| > | |
| {table.name} | |
| </text> | |
| </g> | |
| {/* Tooltip with arrow */} | |
| {renderTooltip(table)} | |
| </React.Fragment> | |
| ))} | |
| </svg> | |
| ); | |
| }; | |
| ReactDOM.render(<PipelineVisualization />, document.getElementById('pipeline-root')); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| def combined_report(username, password, label, code, bq_client, email_ids, imap_server): | |
| # Get the dataset and project ID | |
| # data_status = "Both DataFrames have the same columns in same sequence." | |
| # dataset_id = 'rishabh_test' | |
| # project_id = 'fynd-db' | |
| # table_id = 'test_pg_combined_report' | |
| log = "" | |
| log = log + ".\n"+".\n" + label | |
| dataset_id = 'finance_recon_tool_asia' | |
| project_id = 'fynd-db' | |
| table_id = 'pg_combined_report' | |
| # Get the table reference | |
| table_ref = bq_client.dataset(dataset_id).table(table_id) | |
| # Iterate over the email IDs | |
| for email_id in email_ids[0].split(): | |
| # Fetch the email content | |
| _, email_data = imap_server.fetch(email_id, '(RFC822)') | |
| # Parse the email message | |
| raw_email = email_data[0][1] | |
| email_message = email.message_from_bytes(raw_email) | |
| email_subject = email_message["Subject"] | |
| log = log + ".\n"+ email_message["from"] | |
| log = log + ".\n"+ email_message["Subject"] | |
| received_time_str = email_message.get("Received") | |
| match = re.search(r"(\d+\s\w+\s\d+\s\d+:\d+:\d+\s[+-]\d+)", received_time_str) | |
| if match: | |
| received_time_str = match.group(1) | |
| received_time = datetime.strptime(received_time_str, "%d %b %Y %H:%M:%S %z") | |
| else: | |
| received_time = None | |
| email_subject = email_subject +" "+str(received_time) | |
| value_check = upload_status (label, bq_client, email_subject, imap_server) | |
| if value_check is False: | |
| # Extract email body text | |
| email_body = "" | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| if part.get_content_type() == "text/html": | |
| email_body += part.get_payload(decode=True).decode("utf-8") | |
| # Iterate over the email parts | |
| for part in email_message.walk(): | |
| # Check if the part has a filename (attachment) | |
| if part.get_filename(): | |
| # Extract the filename and check if it is supported | |
| attachment_filename = part.get_filename() | |
| if attachment_filename and is_attachment_supported(attachment_filename): | |
| # Read the attachment file data into a BytesIO object | |
| attachment_data = BytesIO(part.get_payload(decode=True)) | |
| try: | |
| # Read the file using pandas without saving it locally | |
| df = pd.read_csv(attachment_data, compression='zip') | |
| except Exception as e: | |
| # print(f"An error occurred: {e}") | |
| log = log +".\n"+ " ERROR - " +str(e) | |
| continue | |
| log = log + ".\n" + "file read" | |
| idx = email_body.upper().index("HI SHOPSENSE RETAIL TECHNOLOGIES LIMITED (") + 42 | |
| df = df.replace({np.nan: ''}) | |
| # Rename the attachment if label is 'PG_Combine report' | |
| new_attachment_filename = extract_attachment_name(attachment_filename, label, email_body, code) | |
| if new_attachment_filename: | |
| # Add the extracted code as a new column named "MID" | |
| df['MID'] = email_body[idx:idx+14] | |
| # Add the insertion date column | |
| date = dt.date.today() | |
| df['inserted_dt'] = date | |
| table_id = 'fynd-db.finance_recon_tool_asia.pg_combined_report' | |
| # table_id = 'fynd-db.rishabh_test.test_pg_combined_report' | |
| # Compare DataFrame schema with BigQuery table schema | |
| data_status = compare_bigquery_schema_with_dataframe(bq_client, table_id, df, label, log) | |
| log = log + ".\n" + str(data_status) | |
| if data_status != "Both DataFrames have the same columns in same sequence.": | |
| continue | |
| else: | |
| # Insert the data into BigQuery | |
| job_config = bigquery.LoadJobConfig() | |
| schema = [bigquery.SchemaField(name, 'STRING') for name in df.columns] | |
| table = bigquery.Table(table_id, schema=schema) | |
| df=df.astype(str) | |
| job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config) | |
| job.result() # Wait for the job to complete | |
| log = log + ".\n" + "Data uploaded to: " + table_id + f" with columns '{df.shape[1]}' and rows'{df.shape[0]}' "+"\n" | |
| table_id = 'fynd-db.finance_recon_tool_asia.COD_Email_Upload_Status' | |
| new_row = {'Label': label, 'Date': insertion_date, 'Subject_Date': email_subject} | |
| row_to_insert = [new_row] | |
| table_ref = bigquery.Table(table_id,schema=schema) | |
| try: | |
| bq_client.insert_rows(table_ref, rows = row_to_insert) | |
| except Exception as e: | |
| print(f"Errors : {e}") | |
| else: | |
| log = log + ".\n" + "No processing required." | |
| else: | |
| log = log + ".\n" + "Data has already been uploaded."+"\n" | |
| # print (log) | |
| return (log) | |
| #old utils | |
| def load_gcp_credentials(): | |
| try: | |
| # Retrieve GCP credentials from the environment variable | |
| gcp_credentials_str = os.getenv('GCP_CREDENTIALS') | |
| if not gcp_credentials_str: | |
| raise ValueError("GCP_CREDENTIALS environment variable not defined") | |
| # Parse the secret (assuming it's a JSON string) | |
| gcp_credentials = json.loads(gcp_credentials_str) | |
| # Save to a temporary file (Google Cloud uses a JSON file for authentication) | |
| with open("gcp_credentials.json", "w") as f: | |
| json.dump(gcp_credentials, f) | |
| # Authenticate using Google Cloud SDK | |
| credentials_from_file = service_account.Credentials.from_service_account_file("gcp_credentials.json") | |
| # Return the credentials to be used later | |
| return credentials_from_file | |
| except Exception as e: | |
| print(f"Error retrieving or loading GCP credentials: {str(e)}") | |
| return None | |
| # Upload to BQ | |
| def upload_to_bigquery(df, table_id): | |
| try: | |
| # Load the GCP credentials from Hugging Face secret | |
| bigquery_creds = load_gcp_credentials() | |
| if not bigquery_creds: | |
| st.error("Unable to load GCP credentials.") | |
| return | |
| # Initialize BigQuery client with the loaded credentials | |
| client = bigquery.Client(credentials=bigquery_creds) | |
| # Convert the DataFrame to a list of dictionaries | |
| records = df.to_dict(orient='records') | |
| # Prepare the table schema if needed (optional) | |
| job_config = bigquery.LoadJobConfig( | |
| write_disposition="WRITE_APPEND", # Use WRITE_TRUNCATE to overwrite, WRITE_APPEND to append | |
| ) | |
| # Load the data to BigQuery | |
| load_job = client.load_table_from_json(records, table_id, job_config=job_config) | |
| load_job.result() # Wait for the job to complete | |
| st.success("Data submitted") | |
| except Exception as e: | |
| st.error(f"An error occurred while uploading to BigQuery: {e}") | |
| #Authenticate BigQuery | |
| def authenticate_bigquery(): | |
| creds = load_gcp_credentials() | |
| if not creds: | |
| st.error("Unable to load GCP credentials for BigQuery authentication.") | |
| return None | |
| return creds |