raflesia / utils.py
ikhbarikhbar's picture
update
5571d06
import pandas as pd
import numpy as np
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.auth.exceptions import RefreshError
from pyarrow.lib import ArrowInvalid
import streamlit as st
import os
import json
import base64
import re
from datetime import datetime, timedelta
import time
import pytz
from io import BytesIO
from cryptography.fernet import Fernet
import requests
import cv2
def clean_file_name(file_name):
# Define the restricted characters in Google Drive filenames
restricted_chars = r'[\/:*?"<>|]'
# Replace the restricted characters with an empty string
cleaned_name = re.sub(restricted_chars, '', file_name)
return cleaned_name
# Authenticate Google Drive
def authenticate_google_drive():
key = os.getenv("CREDS_KEY")
# Decrypt the credentials
with open("creds/credentials.json.enc", "rb") as encrypted_file:
encrypted_data = encrypted_file.read()
fernet = Fernet(key)
decrypted_data = fernet.decrypt(encrypted_data)
creds_path = "creds/credentials.json"
# os.makedirs("raflesia/creds", exist_ok=True)
with open(creds_path, "wb") as creds_file:
creds_file.write(decrypted_data)
gauth = GoogleAuth()
# gauth.LocalWebserverAuth() # Creates a local web server for authentication
try:
gauth.LoadCredentialsFile("creds/credentials.json")
except FileNotFoundError:
print("Credentials file not found. You need to authenticate for the first time.")
if gauth.credentials is None:
# Authenticate if credentials are not available (first-time use)
gauth.LocalWebserverAuth()
elif gauth.access_token_expired:
# Refresh the access token if it has expired
try:
gauth.Refresh()
except RefreshError as e:
print(f"Token refresh failed: {e}. Re-authenticating...")
else:
# Use the saved credentials
gauth.Authorize()
return GoogleDrive(gauth)
# Upload file to Google Drive and get the link
def upload_file_to_drive(file, file_name, drive_folder_id, is_table=True):
drive = authenticate_google_drive()
jakarta_tz = pytz.timezone('Asia/Jakarta')
current_datetime_jakarta = datetime.now(jakarta_tz)
formatted_datetime = current_datetime_jakarta.strftime("%Y_%m_%d-%H_%M_%S")
if is_table:
if '/' in file_name:
file_name = file_name.split('/')[-1]
file_list = drive.ListFile({'q': f"title = '{file_name}' and '{drive_folder_id}' in parents and trashed = false"}).GetList()
file_drive = file_list[0] # to overwrite
file_drive.SetContentFile(f'data/{file_name}') # Set the content of the file
file_drive.Upload()
else:
temp_file_path = os.path.join("/tmp", file.name) # For deployment
# temp_file_path = os.path.join("tmp", file.name) # For testing
with open(temp_file_path, "wb") as f:
f.write(file.getbuffer())
ready_file_name = clean_file_name(file_name).replace(" ","_")
file_drive = drive.CreateFile({'title': f"{formatted_datetime}_{ready_file_name}", 'parents': [{'id': drive_folder_id}]})
file_drive.SetContentFile(temp_file_path)
file_drive.Upload()
return file_drive['alternateLink']
def get_data_gdrive(local_filename,file_id='1ti_IyICHZI5BOxEBSK6Iq0Kr0dFczsx_'): # ini file id data_keuangan
drive = authenticate_google_drive()
local_filename = f"data/{local_filename}"
file_drive = drive.CreateFile({'id': file_id})
file_drive.GetContentFile(local_filename)
try:
df = pd.read_parquet(local_filename, engine='pyarrow')
except ArrowInvalid:
df = pd.read_csv(local_filename)
finally:
return df
def get_data_github(file_name):
timestamp = int(time.time())
url = f"https://github.com/ikhbarfirman/raflesia/raw/refs/heads/main/dataset/{file_name}"
response = requests.get(url)
local_filename = f"data/{file_name}"
# # Delete the local file if it exists
# if os.path.exists(local_filename):
# os.remove(local_filename)
# Check if the request was successful
if response.status_code == 200:
# Load the Parquet file using pyarrow engine
# Write the content to a local file
with open(local_filename, "wb") as f:
f.write(response.content)
df = pd.read_parquet(BytesIO(response.content),engine='pyarrow')
# df = pd.read_parquet(local_filename, engine='pyarrow')
return df
else:
print(f"Failed to fetch file: {response.status_code}")
return None
def push_data_github(file_local,file_github, token):
REPO = 'ikhbarfirman/raflesia' # Replace with your GitHub repository
FILE_PATH = f'dataset/{file_github}' # Path where you want to save it
GITHUB_API_URL = f'https://api.github.com/repos/{REPO}/contents/{FILE_PATH}'
# Step 4: Get the existing file SHA if it exists
response = requests.get(GITHUB_API_URL, headers={'Authorization': f'token {token}'})
sha = None
if response.status_code == 200:
sha = response.json().get('sha') # Get the SHA of the existing file
# Step 4: Read the file and encode it in base64
with open(file_local, 'rb') as file:
content = file.read()
content_base64 = base64.b64encode(content).decode('utf-8')
# Step 5: Prepare the request payload
payload = {
'message': 'Updated Parquet file with modified data',
'content': content_base64
}
# Include SHA if it exists (for overwriting)
if sha:
payload['sha'] = sha
# Step 6: Make the request to the GitHub API
headers = {'Authorization': f'token {token}'}
response = requests.put(GITHUB_API_URL, headers=headers, data=json.dumps(payload))
# Step 7: Check the response
if response.status_code in [201, 200]:
print('File uploaded successfully!')
else:
print('Failed to upload file:', response.json())
def resize_image_with_aspect_ratio(uploaded_file, width=None, height=None, interpolation=cv2.INTER_AREA):
"""
Resize an uploaded image while maintaining its aspect ratio and return it as a BytesIO object.
Parameters:
uploaded_file (UploadedFile): The uploaded file from Streamlit.
width (int, optional): The desired width. Defaults to None.
height (int, optional): The desired height. Defaults to None.
interpolation (int, optional): Interpolation method. Defaults to cv2.INTER_AREA.
Returns:
BytesIO: The resized image in the same format as the input.
"""
# Convert the uploaded file to a NumPy array
file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8)
# Decode the file bytes to an OpenCV image
image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
# Get the original dimensions
original_height, original_width = image.shape[:2]
if width is None and height is None:
resized_image = image # No resizing needed
elif width is not None:
# Calculate the new height based on the aspect ratio
aspect_ratio = original_height / original_width
new_width = width
new_height = int(new_width * aspect_ratio)
resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation)
elif height is not None:
# Calculate the new width based on the aspect ratio
aspect_ratio = original_width / original_height
new_height = height
new_width = int(new_height * aspect_ratio)
resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation)
# Encode the resized image back to the original format
try:
_, buffer = cv2.imencode('.' + uploaded_file.name.split('.')[-1], resized_image)
except Exception:
_, buffer = cv2.imencode('.jpg', resized_image)
# Convert to BytesIO for compatibility with Streamlit's file-like handling
resized_file = BytesIO(buffer)
resized_file.name = uploaded_file.name # Retain original filename
return resized_file
def get_ronda_index(ronda_list, reference_date, current_date):
"""
Calculate the index of ronda_list based on the number of weeks between reference_date and current_date.
The index cycles each Saturday.
:param ronda_list: List of items to cycle through.
:param reference_date: A reference Saturday as the starting point.
:param current_date: The date for which to calculate the index.
:return: The index in ronda_list and the corresponding value.
"""
# Ensure both dates are datetime objects
if isinstance(reference_date, str):
reference_date = datetime.strptime(reference_date, "%Y-%m-%d")
if isinstance(current_date, str):
current_date = datetime.strptime(current_date, "%Y-%m-%d")
# Calculate the number of days between the dates
delta_days = (current_date - reference_date).days
# Calculate the number of weeks elapsed
weeks_elapsed = delta_days // 7
# Determine the index in the list
ronda_index = weeks_elapsed % len(ronda_list)
return ronda_index, ronda_list[ronda_index]
def get_previous_or_current_saturday(today=None):
jakarta_tz = pytz.timezone('Asia/Jakarta')
# Get the current date
if today is None:
today = datetime.now(jakarta_tz)
else:
if isinstance(today, str):
today = datetime.strptime(today, "%Y-%m-%d")
# Calculate how many days to subtract to get to the previous Saturday
days_to_subtract = (today.weekday() + 2) % 7 # Saturday is 0, Sunday is 1, ..., Friday is 6
# Get the previous or current Saturday
previous_or_current_saturday = today - timedelta(days=days_to_subtract)
return previous_or_current_saturday