Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from pydrive.auth import GoogleAuth | |
| from pydrive.drive import GoogleDrive | |
| from google.auth.exceptions import RefreshError | |
| from pyarrow.lib import ArrowInvalid | |
| import streamlit as st | |
| import os | |
| import json | |
| import base64 | |
| import re | |
| from datetime import datetime, timedelta | |
| import time | |
| import pytz | |
| from io import BytesIO | |
| from cryptography.fernet import Fernet | |
| import requests | |
| import cv2 | |
| def clean_file_name(file_name): | |
| # Define the restricted characters in Google Drive filenames | |
| restricted_chars = r'[\/:*?"<>|]' | |
| # Replace the restricted characters with an empty string | |
| cleaned_name = re.sub(restricted_chars, '', file_name) | |
| return cleaned_name | |
| # Authenticate Google Drive | |
| def authenticate_google_drive(): | |
| key = os.getenv("CREDS_KEY") | |
| # Decrypt the credentials | |
| with open("creds/credentials.json.enc", "rb") as encrypted_file: | |
| encrypted_data = encrypted_file.read() | |
| fernet = Fernet(key) | |
| decrypted_data = fernet.decrypt(encrypted_data) | |
| creds_path = "creds/credentials.json" | |
| # os.makedirs("raflesia/creds", exist_ok=True) | |
| with open(creds_path, "wb") as creds_file: | |
| creds_file.write(decrypted_data) | |
| gauth = GoogleAuth() | |
| # gauth.LocalWebserverAuth() # Creates a local web server for authentication | |
| try: | |
| gauth.LoadCredentialsFile("creds/credentials.json") | |
| except FileNotFoundError: | |
| print("Credentials file not found. You need to authenticate for the first time.") | |
| if gauth.credentials is None: | |
| # Authenticate if credentials are not available (first-time use) | |
| gauth.LocalWebserverAuth() | |
| elif gauth.access_token_expired: | |
| # Refresh the access token if it has expired | |
| try: | |
| gauth.Refresh() | |
| except RefreshError as e: | |
| print(f"Token refresh failed: {e}. Re-authenticating...") | |
| else: | |
| # Use the saved credentials | |
| gauth.Authorize() | |
| return GoogleDrive(gauth) | |
| # Upload file to Google Drive and get the link | |
| def upload_file_to_drive(file, file_name, drive_folder_id, is_table=True): | |
| drive = authenticate_google_drive() | |
| jakarta_tz = pytz.timezone('Asia/Jakarta') | |
| current_datetime_jakarta = datetime.now(jakarta_tz) | |
| formatted_datetime = current_datetime_jakarta.strftime("%Y_%m_%d-%H_%M_%S") | |
| if is_table: | |
| if '/' in file_name: | |
| file_name = file_name.split('/')[-1] | |
| file_list = drive.ListFile({'q': f"title = '{file_name}' and '{drive_folder_id}' in parents and trashed = false"}).GetList() | |
| file_drive = file_list[0] # to overwrite | |
| file_drive.SetContentFile(f'data/{file_name}') # Set the content of the file | |
| file_drive.Upload() | |
| else: | |
| temp_file_path = os.path.join("/tmp", file.name) # For deployment | |
| # temp_file_path = os.path.join("tmp", file.name) # For testing | |
| with open(temp_file_path, "wb") as f: | |
| f.write(file.getbuffer()) | |
| ready_file_name = clean_file_name(file_name).replace(" ","_") | |
| file_drive = drive.CreateFile({'title': f"{formatted_datetime}_{ready_file_name}", 'parents': [{'id': drive_folder_id}]}) | |
| file_drive.SetContentFile(temp_file_path) | |
| file_drive.Upload() | |
| return file_drive['alternateLink'] | |
| def get_data_gdrive(local_filename,file_id='1ti_IyICHZI5BOxEBSK6Iq0Kr0dFczsx_'): # ini file id data_keuangan | |
| drive = authenticate_google_drive() | |
| local_filename = f"data/{local_filename}" | |
| file_drive = drive.CreateFile({'id': file_id}) | |
| file_drive.GetContentFile(local_filename) | |
| try: | |
| df = pd.read_parquet(local_filename, engine='pyarrow') | |
| except ArrowInvalid: | |
| df = pd.read_csv(local_filename) | |
| finally: | |
| return df | |
| def get_data_github(file_name): | |
| timestamp = int(time.time()) | |
| url = f"https://github.com/ikhbarfirman/raflesia/raw/refs/heads/main/dataset/{file_name}" | |
| response = requests.get(url) | |
| local_filename = f"data/{file_name}" | |
| # # Delete the local file if it exists | |
| # if os.path.exists(local_filename): | |
| # os.remove(local_filename) | |
| # Check if the request was successful | |
| if response.status_code == 200: | |
| # Load the Parquet file using pyarrow engine | |
| # Write the content to a local file | |
| with open(local_filename, "wb") as f: | |
| f.write(response.content) | |
| df = pd.read_parquet(BytesIO(response.content),engine='pyarrow') | |
| # df = pd.read_parquet(local_filename, engine='pyarrow') | |
| return df | |
| else: | |
| print(f"Failed to fetch file: {response.status_code}") | |
| return None | |
| def push_data_github(file_local,file_github, token): | |
| REPO = 'ikhbarfirman/raflesia' # Replace with your GitHub repository | |
| FILE_PATH = f'dataset/{file_github}' # Path where you want to save it | |
| GITHUB_API_URL = f'https://api.github.com/repos/{REPO}/contents/{FILE_PATH}' | |
| # Step 4: Get the existing file SHA if it exists | |
| response = requests.get(GITHUB_API_URL, headers={'Authorization': f'token {token}'}) | |
| sha = None | |
| if response.status_code == 200: | |
| sha = response.json().get('sha') # Get the SHA of the existing file | |
| # Step 4: Read the file and encode it in base64 | |
| with open(file_local, 'rb') as file: | |
| content = file.read() | |
| content_base64 = base64.b64encode(content).decode('utf-8') | |
| # Step 5: Prepare the request payload | |
| payload = { | |
| 'message': 'Updated Parquet file with modified data', | |
| 'content': content_base64 | |
| } | |
| # Include SHA if it exists (for overwriting) | |
| if sha: | |
| payload['sha'] = sha | |
| # Step 6: Make the request to the GitHub API | |
| headers = {'Authorization': f'token {token}'} | |
| response = requests.put(GITHUB_API_URL, headers=headers, data=json.dumps(payload)) | |
| # Step 7: Check the response | |
| if response.status_code in [201, 200]: | |
| print('File uploaded successfully!') | |
| else: | |
| print('Failed to upload file:', response.json()) | |
| def resize_image_with_aspect_ratio(uploaded_file, width=None, height=None, interpolation=cv2.INTER_AREA): | |
| """ | |
| Resize an uploaded image while maintaining its aspect ratio and return it as a BytesIO object. | |
| Parameters: | |
| uploaded_file (UploadedFile): The uploaded file from Streamlit. | |
| width (int, optional): The desired width. Defaults to None. | |
| height (int, optional): The desired height. Defaults to None. | |
| interpolation (int, optional): Interpolation method. Defaults to cv2.INTER_AREA. | |
| Returns: | |
| BytesIO: The resized image in the same format as the input. | |
| """ | |
| # Convert the uploaded file to a NumPy array | |
| file_bytes = np.asarray(bytearray(uploaded_file.read()), dtype=np.uint8) | |
| # Decode the file bytes to an OpenCV image | |
| image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR) | |
| # Get the original dimensions | |
| original_height, original_width = image.shape[:2] | |
| if width is None and height is None: | |
| resized_image = image # No resizing needed | |
| elif width is not None: | |
| # Calculate the new height based on the aspect ratio | |
| aspect_ratio = original_height / original_width | |
| new_width = width | |
| new_height = int(new_width * aspect_ratio) | |
| resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation) | |
| elif height is not None: | |
| # Calculate the new width based on the aspect ratio | |
| aspect_ratio = original_width / original_height | |
| new_height = height | |
| new_width = int(new_height * aspect_ratio) | |
| resized_image = cv2.resize(image, (new_width, new_height), interpolation=interpolation) | |
| # Encode the resized image back to the original format | |
| try: | |
| _, buffer = cv2.imencode('.' + uploaded_file.name.split('.')[-1], resized_image) | |
| except Exception: | |
| _, buffer = cv2.imencode('.jpg', resized_image) | |
| # Convert to BytesIO for compatibility with Streamlit's file-like handling | |
| resized_file = BytesIO(buffer) | |
| resized_file.name = uploaded_file.name # Retain original filename | |
| return resized_file | |
| def get_ronda_index(ronda_list, reference_date, current_date): | |
| """ | |
| Calculate the index of ronda_list based on the number of weeks between reference_date and current_date. | |
| The index cycles each Saturday. | |
| :param ronda_list: List of items to cycle through. | |
| :param reference_date: A reference Saturday as the starting point. | |
| :param current_date: The date for which to calculate the index. | |
| :return: The index in ronda_list and the corresponding value. | |
| """ | |
| # Ensure both dates are datetime objects | |
| if isinstance(reference_date, str): | |
| reference_date = datetime.strptime(reference_date, "%Y-%m-%d") | |
| if isinstance(current_date, str): | |
| current_date = datetime.strptime(current_date, "%Y-%m-%d") | |
| # Calculate the number of days between the dates | |
| delta_days = (current_date - reference_date).days | |
| # Calculate the number of weeks elapsed | |
| weeks_elapsed = delta_days // 7 | |
| # Determine the index in the list | |
| ronda_index = weeks_elapsed % len(ronda_list) | |
| return ronda_index, ronda_list[ronda_index] | |
| def get_previous_or_current_saturday(today=None): | |
| jakarta_tz = pytz.timezone('Asia/Jakarta') | |
| # Get the current date | |
| if today is None: | |
| today = datetime.now(jakarta_tz) | |
| else: | |
| if isinstance(today, str): | |
| today = datetime.strptime(today, "%Y-%m-%d") | |
| # Calculate how many days to subtract to get to the previous Saturday | |
| days_to_subtract = (today.weekday() + 2) % 7 # Saturday is 0, Sunday is 1, ..., Friday is 6 | |
| # Get the previous or current Saturday | |
| previous_or_current_saturday = today - timedelta(days=days_to_subtract) | |
| return previous_or_current_saturday | |