import pandas as pd import numpy as np from pydrive.auth import GoogleAuth from pydrive.drive import GoogleDrive from google.auth.exceptions import RefreshError from pyarrow.lib import ArrowInvalid import streamlit as st import os import json import base64 import re from datetime import datetime, timedelta import time import pytz from io import BytesIO from cryptography.fernet import Fernet import requests import cv2 def clean_file_name(file_name): # Define the restricted characters in Google Drive filenames restricted_chars = r'[\/:*?"<>|]' # Replace the restricted characters with an empty string cleaned_name = re.sub(restricted_chars, '', file_name) return cleaned_name # Authenticate Google Drive def authenticate_google_drive(): key = os.getenv("CREDS_KEY") # Decrypt the credentials with open("creds/credentials.json.enc", "rb") as encrypted_file: encrypted_data = encrypted_file.read() fernet = Fernet(key) decrypted_data = fernet.decrypt(encrypted_data) creds_path = "creds/credentials.json" # os.makedirs("raflesia/creds", exist_ok=True) with open(creds_path, "wb") as creds_file: creds_file.write(decrypted_data) gauth = GoogleAuth() # gauth.LocalWebserverAuth() # Creates a local web server for authentication try: gauth.LoadCredentialsFile("creds/credentials.json") except FileNotFoundError: print("Credentials file not found. You need to authenticate for the first time.") if gauth.credentials is None: # Authenticate if credentials are not available (first-time use) gauth.LocalWebserverAuth() elif gauth.access_token_expired: # Refresh the access token if it has expired try: gauth.Refresh() except RefreshError as e: print(f"Token refresh failed: {e}. Re-authenticating...") else: # Use the saved credentials gauth.Authorize() return GoogleDrive(gauth) # Upload file to Google Drive and get the link def upload_file_to_drive(file, file_name, drive_folder_id, is_table=True): drive = authenticate_google_drive() jakarta_tz = pytz.timezone('Asia/Jakarta') current_datetime_jakarta = datetime.now(jakarta_tz) formatted_datetime = current_datetime_jakarta.strftime("%Y_%m_%d-%H_%M_%S") if is_table: if '/' in file_name: file_name = file_name.split('/')[-1] file_list = drive.ListFile({'q': f"title = '{file_name}' and '{drive_folder_id}' in parents and trashed = false"}).GetList() file_drive = file_list[0] # to overwrite file_drive.SetContentFile(f'data/{file_name}') # Set the content of the file file_drive.Upload() else: temp_file_path = os.path.join("/tmp", file.name) # For deployment # temp_file_path = os.path.join("tmp", file.name) # For testing with open(temp_file_path, "wb") as f: f.write(file.getbuffer()) ready_file_name = clean_file_name(file_name).replace(" ","_") file_drive = drive.CreateFile({'title': f"{formatted_datetime}_{ready_file_name}", 'parents': [{'id': drive_folder_id}]}) file_drive.SetContentFile(temp_file_path) file_drive.Upload() return file_drive['alternateLink'] def get_data_gdrive(local_filename,file_id='1ti_IyICHZI5BOxEBSK6Iq0Kr0dFczsx_'): # ini file id data_keuangan drive = authenticate_google_drive() local_filename = f"data/{local_filename}" file_drive = drive.CreateFile({'id': file_id}) file_drive.GetContentFile(local_filename) try: df = pd.read_parquet(local_filename, engine='pyarrow') except ArrowInvalid: df = pd.read_csv(local_filename) finally: return df def get_data_github(file_name): timestamp = int(time.time()) url = f"https://github.com/ikhbarfirman/raflesia/raw/refs/heads/main/dataset/{file_name}" response = requests.get(url) local_filename = f"data/{file_name}" # # Delete the local file if it exists # if os.path.exists(local_filename): # os.remove(local_filename) # Check if the request was successful if response.status_code == 200: # Load the Parquet file using pyarrow engine # Write the content to a local file with open(local_filename, "wb") as f: f.write(response.content) df = pd.read_parquet(BytesIO(response.content),engine='pyarrow') # df = pd.read_parquet(local_filename, engine='pyarrow') return df else: print(f"Failed to fetch file: {response.status_code}") return None def push_data_github(file_local,file_github, token): REPO = 'ikhbarfirman/raflesia' # Replace with your GitHub repository FILE_PATH = f'dataset/{file_github}' # Path where you want to save it GITHUB_API_URL = f'https://api.github.com/repos/{REPO}/contents/{FILE_PATH}' # Step 4: Get the existing file SHA if it exists response = requests.get(GITHUB_API_URL, headers={'Authorization': f'token {token}'}) sha = None if response.status_code == 200: sha = response.json().get('sha') # Get the SHA of the existing file # Step 4: Read the file and encode it in base64 with open(file_local, 'rb') as file: content = file.read() content_base64 = base64.b64encode(content).decode('utf-8') # Step 5: Prepare the request payload payload = { 'message': 'Updated Parquet file with modified data', 'content': content_base64 } # Include SHA if it exists (for overwriting) if sha: payload['sha'] = sha # Step 6: Make the request to the GitHub API headers = {'Authorization': f'token {token}'} response = requests.put(GITHUB_API_URL, headers=headers, data=json.dumps(payload)) # Step 7: Check the response if response.status_code in [201, 200]: print('File uploaded successfully!') else: print('Failed to upload file:', response.json()) def resize_image_with_aspect_ratio(uploaded_file, width=None, height=None, interpolation=cv2.INTER_AREA): """ Resize an uploaded image while maintaining its aspect ratio and return it as a BytesIO object. Parameters: uploaded_file (UploadedFile): The uploaded file from Streamlit. width (int, optional): The desired width. Defaults to None. height (int, optional): The desired height. Defaults to None. interpolation (int, optional): Interpolation method. Defaults to cv2.INTER_AREA. Returns: BytesIO: The resized image in the same format as the input. """ # Convert the uploaded file to a NumPy array file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8) # Decode the file bytes to an OpenCV image image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR) # Get the original dimensions original_height, original_width = image.shape[:2] if width is None and height is None: resized_image = image # No resizing needed elif width is not None: # Calculate the new height based on the aspect ratio aspect_ratio = original_height / original_width new_width = width new_height = int(new_width * aspect_ratio) resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation) elif height is not None: # Calculate the new width based on the aspect ratio aspect_ratio = original_width / original_height new_height = height new_width = int(new_height * aspect_ratio) resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation) # Encode the resized image back to the original format try: _, buffer = cv2.imencode('.' + uploaded_file.name.split('.')[-1], resized_image) except Exception: _, buffer = cv2.imencode('.jpg', resized_image) # Convert to BytesIO for compatibility with Streamlit's file-like handling resized_file = BytesIO(buffer) resized_file.name = uploaded_file.name # Retain original filename return resized_file def get_ronda_index(ronda_list, reference_date, current_date): """ Calculate the index of ronda_list based on the number of weeks between reference_date and current_date. The index cycles each Saturday. :param ronda_list: List of items to cycle through. :param reference_date: A reference Saturday as the starting point. :param current_date: The date for which to calculate the index. :return: The index in ronda_list and the corresponding value. """ # Ensure both dates are datetime objects if isinstance(reference_date, str): reference_date = datetime.strptime(reference_date, "%Y-%m-%d") if isinstance(current_date, str): current_date = datetime.strptime(current_date, "%Y-%m-%d") # Calculate the number of days between the dates delta_days = (current_date - reference_date).days # Calculate the number of weeks elapsed weeks_elapsed = delta_days // 7 # Determine the index in the list ronda_index = weeks_elapsed % len(ronda_list) return ronda_index, ronda_list[ronda_index] def get_previous_or_current_saturday(today=None): jakarta_tz = pytz.timezone('Asia/Jakarta') # Get the current date if today is None: today = datetime.now(jakarta_tz) else: if isinstance(today, str): today = datetime.strptime(today, "%Y-%m-%d") # Calculate how many days to subtract to get to the previous Saturday days_to_subtract = (today.weekday() + 2) % 7 # Saturday is 0, Sunday is 1, ..., Friday is 6 # Get the previous or current Saturday previous_or_current_saturday = today - timedelta(days=days_to_subtract) return previous_or_current_saturday