Spaces:
Running
Running
| import os | |
| import json | |
| import pandas as pd | |
| from google.cloud import bigquery | |
| from google.oauth2 import service_account | |
| import streamlit as st | |
| import requests | |
| import smtplib | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| from queries import queries | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from google.auth import exceptions | |
| from google.auth import exceptions | |
| from google.auth.transport.requests import Request | |
| from google.auth.exceptions import DefaultCredentialsError | |
| from google.auth import credentials | |
| from google.auth import default | |
| from google.auth import exceptions | |
| from google.auth import default | |
| from google.auth.transport.requests import Request | |
| # Define your email server details | |
| EMAIL_HOST = 'smtp.gmail.com' | |
| EMAIL_PORT = 587 | |
| EMAIL_HOST_USER = 'alerter.response@gmail.com' | |
| EMAIL_HOST_PASSWORD = 'tmid yqlu eglt sfzv' | |
| SLACK_BOT_TOKEN = 'xoxb-2151238541-7506161157329-qYCCaocyGDJwwwtOOWLY2SMR' | |
| #Authenticate BigQuery | |
| def authenticate_bigquery(): | |
| creds = load_gcp_credentials() | |
| if not creds: | |
| st.error("Unable to load GCP credentials for BigQuery authentication.") | |
| return None | |
| return creds | |
| def authenticate_bigquery_updated(): | |
| gcp_credentials = load_gcp_credentials() | |
| if gcp_credentials: | |
| gcp_credentials = gcp_credentials.with_scopes([ | |
| "https://www.googleapis.com/auth/cloud-platform", | |
| "https://www.googleapis.com/auth/drive" | |
| ]) | |
| return gcp_credentials | |
| return None | |
| #Load GCP credentials | |
| def load_gcp_credentials(): | |
| try: | |
| # Retrieve GCP credentials from the environment variable | |
| gcp_credentials_str = os.getenv('GCP_CREDENTIALS') | |
| if not gcp_credentials_str: | |
| raise ValueError("GCP_CREDENTIALS environment variable not defined") | |
| # Parse the secret (assuming it's a JSON string) | |
| gcp_credentials = json.loads(gcp_credentials_str) | |
| # Save to a temporary file (Google Cloud uses a JSON file for authentication) | |
| with open("gcp_credentials.json", "w") as f: | |
| json.dump(gcp_credentials, f) | |
| # Authenticate using Google Cloud SDK | |
| credentials_from_file = service_account.Credentials.from_service_account_file("gcp_credentials.json") | |
| # Return the credentials to be used later | |
| return credentials_from_file | |
| except Exception as e: | |
| print(f"Error retrieving or loading GCP credentials: {str(e)}") | |
| return None | |
| # Upload to BQ | |
| def upload_to_bigquery(df, table_id): | |
| try: | |
| # Load the GCP credentials from Hugging Face secret | |
| bigquery_creds = load_gcp_credentials() | |
| if not bigquery_creds: | |
| st.error("Unable to load GCP credentials.") | |
| return | |
| # Initialize BigQuery client with the loaded credentials | |
| client = bigquery.Client(credentials=bigquery_creds) | |
| # Convert the DataFrame to a list of dictionaries | |
| records = df.to_dict(orient='records') | |
| # Prepare the table schema if needed (optional) | |
| job_config = bigquery.LoadJobConfig( | |
| write_disposition="WRITE_APPEND", # Use WRITE_TRUNCATE to overwrite, WRITE_APPEND to append | |
| ) | |
| # Load the data to BigQuery | |
| load_job = client.load_table_from_json(records, table_id, job_config=job_config) | |
| load_job.result() # Wait for the job to complete | |
| st.success("Data submitted") | |
| except Exception as e: | |
| st.error(f"An error occurred while uploading to BigQuery: {e}") | |
| def preprocess_csv(file_path): | |
| # Load the CSV file | |
| df = pd.read_csv(file_path) | |
| # Define columns to be converted | |
| date_columns = ['Order_Date', 'State_Date', 'Entry_Month'] | |
| if 'bag_id_cn' in df.columns: | |
| df['bag_id_cn'] = df['bag_id_cn'].replace({'\..*': ''}, regex=True).astype('Int64') | |
| # Convert specified columns from DD/MM/YY to 'YYYY-MM-DD 00:00:00 UTC' | |
| for column in date_columns: | |
| if column in df.columns: | |
| df[column] = pd.to_datetime(df[column], format='%d/%m/%y', errors='coerce').dt.strftime('%Y-%m-%d 00:00:00 UTC') | |
| # Save the preprocessed CSV | |
| preprocessed_file_path = 'preprocessed_' + os.path.basename(file_path) | |
| df.to_csv(preprocessed_file_path, index=False) | |
| return preprocessed_file_path | |
| # Function to read files from local path | |
| def read_file(path): | |
| try: | |
| with open(path, 'rb') as file: | |
| return file.read() | |
| except Exception as e: | |
| st.error(f"Failed to read file from {path}: {str(e)}") | |
| return None | |
| # Function to get file content type based on file extension | |
| def get_content_type(file_path): | |
| if file_path.lower().endswith('.pdf'): | |
| return 'application/pdf' | |
| elif file_path.lower().endswith('.csv'): | |
| return 'text/csv' | |
| elif file_path.lower().endswith('.xlsx'): | |
| return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' | |
| else: | |
| return 'application/octet-stream' | |
| # Encode the image to Base64 | |
| def get_base64_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode() | |
| def send_message_via_email(message, email_address, files, subject=None, body=None): | |
| try: | |
| # Set up the server | |
| server = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT) | |
| server.starttls() | |
| server.login(EMAIL_HOST_USER, EMAIL_HOST_PASSWORD) | |
| # Create the email | |
| msg = MIMEMultipart() | |
| msg['From'] = EMAIL_HOST_USER | |
| msg['To'] = email_address | |
| msg['Subject'] = subject if subject else "🚨 Alerter" | |
| # Attach the message body | |
| msg.attach(MIMEText(body if body else message, 'plain')) | |
| # Attach each file if provided | |
| if files: | |
| for uploaded_file in files: | |
| part = MIMEBase('application', 'octet-stream') | |
| part.set_payload(uploaded_file.read()) | |
| encoders.encode_base64(part) | |
| part.add_header('Content-Disposition', f'attachment; filename={uploaded_file.name}') | |
| msg.attach(part) | |
| # Send the email | |
| server.sendmail(EMAIL_HOST_USER, email_address, msg.as_string()) | |
| server.quit() | |
| return True, "Message sent successfully" | |
| except Exception as e: | |
| return False, str(e) | |
| def send_message_via_webhook(message, webhook_url): | |
| try: | |
| payload = {"text": message} | |
| response = requests.post(webhook_url, json=payload) | |
| if response.status_code == 200: | |
| return True, "Message sent successfully" | |
| else: | |
| return False, f"Error {response.status_code}: {response.text}" | |
| except Exception as e: | |
| return False, str(e) | |
| def send_file_to_slack(file, webhook_url): | |
| files = { | |
| 'file': (file.name, file, 'application/octet-stream') # Send as binary | |
| } | |
| response = requests.post( | |
| webhook_url, | |
| files=files, | |
| headers={"Content-Type": "multipart/form-data"} | |
| ) | |
| return response | |
| def send_file_to_slack_up(file, webhook_url, channel_id): | |
| # Slack API URL for file upload | |
| slack_api_url = 'https://slack.com/api/files.upload' | |
| # Prepare headers for the API call | |
| headers = { | |
| 'Authorization': 'Bearer ' + 'SLACK_BOT_TOKEN', # Replace with your actual Slack bot token | |
| } | |
| # Prepare the payload and file in binary format | |
| files = { | |
| 'file': (file.name, file, 'application/octet-stream'), # Send the file in its original binary format | |
| 'channels': channel_id, # Specify the Slack channel ID where the file should be uploaded | |
| } | |
| # Make the POST request to upload the file to Slack | |
| response = requests.post(slack_api_url, headers=headers, files=files) | |
| return response | |
| def check_duplicates(client): | |
| """Check for duplicates using BigQuery with the provided credentials file.""" | |
| results = {} | |
| bigquery_creds = authenticate_bigquery() | |
| client = bigquery.Client(credentials=bigquery_creds) | |
| for i, (query_name, query) in enumerate(queries.items()): | |
| query_job = client.query(query) | |
| df = query_job.result().to_dataframe() | |
| # For debugging, write the DataFrame to the Streamlit app | |
| st.write(f"{query_name}:", df) | |
| button_styles = """ | |
| <style> | |
| div.stButton > button { | |
| color: #ffffff; /* Text color */ | |
| font-size: 30px; | |
| background-image: linear-gradient(to right, #800000, #ff0000); /* Maroon to light red gradient */ | |
| border: none; | |
| padding: 10px 20px; | |
| cursor: pointer; | |
| border-radius: 15px; | |
| display: inline-block; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1), 0 8px 15px rgba(0, 0, 0, 0.1); /* Box shadow */ | |
| transition: all 0.3s ease; /* Smooth transition on hover */ | |
| } | |
| div.stButton > button:hover { | |
| background-color: #00ff00; /* Hover background color */ | |
| color: #ff0000; /* Hover text color */ | |
| box-shadow: 0 6px 10px rgba(0, 0, 0, 0.2), 0 12px 20px rgba(0, 0, 0, 0.2); /* Box shadow on hover */ | |
| } | |
| </style> | |
| """ | |
| st.markdown(button_styles, unsafe_allow_html=True) | |
| if st.button(f"Copy Query", key=f"copy_query_{i}"): | |
| pyperclip.copy(query) | |
| st.success('Query copied to clipboard!') | |
| if not df.empty: | |
| duplicate_count = len(df) | |
| results[query_name] = duplicate_count | |
| return results | |
| def upload_to_drive(file_path, folder_id): | |
| try: | |
| # Authenticate with Google Drive using Hugging Face secrets | |
| creds = authenticate_google_drive() | |
| if not creds: | |
| return | |
| # Build the Google Drive service | |
| service = build('drive', 'v3', credentials=creds) | |
| # Define the file metadata | |
| file_metadata = {'name': os.path.basename(file_path), 'parents': [folder_id]} | |
| # Determine MIME type based on file extension | |
| mime_type = 'application/vnd.ms-excel' if file_path.endswith('.xlsx') else 'text/csv' | |
| media = MediaFileUpload(file_path, mimetype=mime_type) | |
| # Upload the file to Google Drive | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| st.write("") | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| st.error("Ensure the folder ID is correct and the service account has permission to access the folder.") | |
| def authenticate_google_drive(): | |
| creds = load_gcp_credentials() | |
| if not creds: | |
| st.error("Unable to load GCP credentials for Google Drive authentication.") | |
| return None | |
| return creds | |
| def get_oauth_token(): | |
| try: | |
| # Define the required scopes for Dataform | |
| required_scopes = [ | |
| "https://www.googleapis.com/auth/cloud-platform", # General GCP access | |
| "https://www.googleapis.com/auth/bigquery", # BigQuery access | |
| "https://www.googleapis.com/auth/dataform" | |
| ] | |
| # Load the credentials with the specified scopes | |
| creds, _ = default(scopes=required_scopes) | |
| if creds is None: | |
| raise exceptions.DefaultCredentialsError("No valid credentials found.") | |
| # Refresh the credentials to get the latest access token | |
| creds.refresh(Request()) | |
| return creds.token | |
| except exceptions.GoogleAuthError as e: | |
| print(f"Authentication error: {e}") | |
| return None | |
| def get_task_logs(task_name="Tally_backup_Ninad"): | |
| logs = eventlog.read('Microsoft-Windows-TaskScheduler/Operational', limit=100) | |
| task_logs = [] | |
| for log in logs: | |
| if task_name in log['Message']: | |
| task_logs.append(log['Message']) | |
| return task_logs |