drkvcsstvn's picture
Update app.py
4b46dbf verified
import gradio as gr
import threading
import time
import schedule
import requests
import urllib.parse
from huggingface_hub import HfApi, Repository
import os
import math
from datetime import datetime, timedelta
import pandas as pd
from workalendar.europe import Hungary
from pandas.tseries.offsets import CustomBusinessDay
import numpy as np
import gspread
from oauth2client.service_account import ServiceAccountCredentials
import base64
import json
from datasets import load_dataset, DatasetDict, Dataset
from gradio.themes.base import Base
import matplotlib.pyplot as plt
from io import BytesIO
from PIL import Image
import random
import httpx
from collections import defaultdict
#GET_MEDSERV_API AUTH:
def medserv_authenticate():
# Get the payload from the environment variable
MEDSERV_CREDS = os.getenv('MEDSERV_CREDS')
if not MEDSERV_CREDS:
print("Environment variable PAYLOAD not set.")
return None
try:
# Parse the JSON string into a dictionary
payload = json.loads(MEDSERV_CREDS)
except json.JSONDecodeError:
print("Failed to decode the JSON from the environment variable.")
return None
# Ensure both keys "azonosito" and "jelszo" are present
if 'azonosito' not in payload or 'jelszo' not in payload:
print("Payload must contain both 'azonosito' and 'jelszo'.")
return None
# Define the login URL
login_url = 'https://api.lelet.medserv.hu/auth/login'
# Send the POST request to the login endpoint with the JSON payload
response = httpx.post(login_url, json=payload)
# Check if authentication was successful (status code 200)
if response.status_code == 200:
# Extract the JWT token from the response
token = response.json().get('jwt')
if token:
print("Authenticated successfully, token received.")
return token
else:
print("Token not found in the response.")
return None
else:
print(f"Failed to authenticate. Status code: {response.status_code}")
print(f"Response: {response.text}")
return None
headers = {
"Authorization": f"Bearer {medserv_authenticate()}",
"Accept": "application/json"
}
def url_conversion(include_list):
# Convert list to a JSON-like string format
encoded_fields = json.dumps(include_list)
# Encode into URL format
return urllib.parse.quote_plus(encoded_fields)
#GSHEETS_AUTH
service_account_base64 = os.getenv('GSHEETS_KEY')
# Decode the Base64 string to JSON
try:
service_account_json_str = base64.b64decode(service_account_base64).decode('utf-8')
#print("Decoded service account JSON:", service_account_json_str[:100]) # Print a part for debugging
except Exception as e:
print("Error decoding service account:", e)
# Parse the JSON string to a Python dictionary
service_account_info = json.loads(service_account_json_str)
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
creds = ServiceAccountCredentials.from_json_keyfile_dict(service_account_info, scope)
client = gspread.authorize(creds)
#Retrieve google sheets smearshare_config_lims
SMEARSHARE_URL ='https://docs.google.com/spreadsheets/d/1D7w_ewX37953fAZdGfJAc_pIHpCinlpK2YSsTuT4a3U/edit?gid=0#gid=0'
smearshare_config = client.open_by_url(SMEARSHARE_URL).sheet1
smearshare_config_data = smearshare_config.get_all_records()
smearshare_config_df = pd.DataFrame(smearshare_config_data)
#Retrieve google sheets szabik
SZABIK_URL = 'https://docs.google.com/spreadsheets/d/1Dh8wCA5i3w-t7sQWecbtcj2ZlidF3KhS8S-OS2JrMPg/edit?gid=0#gid=0'
# Open the Google Sheet by URL or ID
szabik_munkalap_1 = client.open_by_url(SZABIK_URL).sheet1 # or open_by_key(SHEET_ID)
# Load the data into a DataFrame
munkalap1_data = szabik_munkalap_1.get_all_records()
szabik_df = pd.DataFrame(munkalap1_data)
# FOR TESTING:
# Calculate tomorrow's date
tomorrow_date = datetime.now() + timedelta(days=0)
#tomorrow_date = datetime(2025, 1, 6)
# Format tomorrow's date as a string in the format "YYYY-MM-DD"
current_date = tomorrow_date.strftime("%Y-%m-%d")
# FOR DEPLOYING:
#current_date = datetime.now().strftime("%Y-%m-%d")
current_date_dt = pd.to_datetime(current_date)
# Initialize HfApi
repo_id = "drkvcsstvn/smear_share_testing"
api_token = os.getenv("HF_API_TOKEN")
api = HfApi(token=api_token)
# Define the dataset repository name
DATASET_NAME = "drkvcsstvn/TEST_smearshare_cumulative_distribution_lims"
DISTRIBUTION_ACTIVITY_DATASET = "drkvcsstvn/TEST_smearshare_distribution_activity_lims"
ALLOCATION_ACTIVITY_DATASET = "drkvcsstvn/TEST_smearshare_allocation_activity_lims"
def restart_space():
response = api.restart_space(repo_id=repo_id)
def delete_dataset():
try:
response = api.delete_repo(repo_id=DATASET_NAME, repo_type="dataset")
except Exception:
print("Dataset not found or already deleted.") # Optional logging
return None
def schedule_restart():
# Schedule the restart every minute
# FOR TESTING:
#schedule.every(3).minutes.do(restart_space)
schedule.every(5).minutes.do(push_daily_activity)
# Schedule daily push before restart
#schedule.every().day.at("02:50").do(push_daily_activity) # Push at 02:50
#schedule.every().day.at("02:55").do(delete_dataset)
#schedule.every().day.at("03:00").do(restart_space)
while True:
schedule.run_pending()
time.sleep(60)
def start_scheduler():
# Start the scheduler in a separate thread
scheduler_thread = threading.Thread(target=schedule_restart, daemon=True)
scheduler_thread.start()
# Function to save cumulative_distribution to Hugging Face dataset
def save_cumulative_distribution(cumulative_distribution):
# Convert cumulative_distribution to a DataFrame
df = pd.DataFrame(list(cumulative_distribution.items()), columns=['Peeler', 'Total'])
# Convert DataFrame to Dataset
dataset = Dataset.from_pandas(df)
dataset.push_to_hub(DATASET_NAME, split="train", token=api_token)
print(f"Dataset pushed to {DATASET_NAME}")
# In-memory storage for daily activities
daily_distribution_activity = {}
daily_allocation_activity = []
def save_distribution_activity(cumulative_distribution):
""" Accumulate activity instead of pushing immediately """
global daily_distribution_activity
for peeler, total in cumulative_distribution.items():
daily_distribution_activity[peeler] = daily_distribution_activity.get(peeler, 0) + total
def save_allocation_activity(onbehalf_of_user, target_selection):
""" Accumulate allocation activity instead of pushing immediately """
global daily_allocation_activity
daily_allocation_activity.append({
"Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"User": onbehalf_of_user,
"Választó": target_selection
})
def push_daily_activity():
""" Pushes accumulated activities to the Hub once per day """
global daily_distribution_activity, daily_allocation_activity
# Push distribution activity
if daily_distribution_activity:
df = pd.DataFrame(list(daily_distribution_activity.items()), columns=['Peeler', 'Total'])
try:
existing_dataset = load_dataset(DISTRIBUTION_ACTIVITY_DATASET, split="train", token=api_token)
existing_df = existing_dataset.to_pandas()
updated_df = pd.merge(existing_df, df, on='Peeler', how='outer', suffixes=('_existing', '_new'))
updated_df['Total'] = updated_df['Total_existing'].fillna(0) + updated_df['Total_new'].fillna(0)
updated_df = updated_df[['Peeler', 'Total']]
except Exception:
updated_df = df
Dataset.from_pandas(updated_df).push_to_hub(DISTRIBUTION_ACTIVITY_DATASET, split="train", token=api_token)
# Push allocation activity
if daily_allocation_activity:
df = pd.DataFrame(daily_allocation_activity)
try:
existing_dataset = load_dataset(ALLOCATION_ACTIVITY_DATASET, split="train", token=api_token)
existing_df = existing_dataset.to_pandas()
updated_df = pd.concat([existing_df, df], ignore_index=True)
except Exception:
updated_df = df
Dataset.from_pandas(updated_df).push_to_hub(ALLOCATION_ACTIVITY_DATASET, split="train", token=api_token)
# Clear in-memory storage
daily_distribution_activity.clear()
daily_allocation_activity.clear()
print("Daily activity pushed and cleared.")
# Function to load cumulative_distribution from Hugging Face dataset
def load_cumulative_distribution():
try:
dataset = load_dataset(DATASET_NAME, split="train", token=api_token)
if len(dataset) == 0:
print("Dataset is empty. Initializing cumulative_distribution from medserv.")
return cumulative_distribution_from_medserv
df = dataset.to_pandas()
cumulative_distribution = dict(zip(df['Peeler'], df['Total']))
return cumulative_distribution
except Exception as e:
print(f"Error loading cumulative distribution: {e}")
return {peeler: 0 for peeler in peeler_capacities}
def update_cumulative_distribution(new_distribution):
cumulative_dict = cumulative_distribution
# Update cumulative distribution with new values
for name, count in new_distribution.items():
# Find the peeler ID based on name
peeler_id = next((id for id, peeler_name in peeler_names.items() if peeler_name == name), None)
if peeler_id is not None:
# Add count to existing value
if peeler_id in cumulative_dict:
cumulative_dict[peeler_id] += count
else:
cumulative_dict[peeler_id] = count
return cumulative_dict
# Peeler names
peeler_names = dict(zip(smearshare_config_df['peelers'], smearshare_config_df['peeler_names']))
# Create a dictionary mapping peelers to their capacities
peeler_capacities = dict(zip(smearshare_config_df['peelers'], smearshare_config_df['peeler_capacities']))
peeler_azo = dict(zip(smearshare_config_df['peeler_azo'], smearshare_config_df['peelers']))
peeler_name_azo = dict(zip(smearshare_config_df['peeler_names'], smearshare_config_df['peeler_azo']))
#print(peeler_azo)
# Start Cumulative from Medserv
cumulative_include_list = [
'id',
'lelet_kelte',
'adatok_taj',
'lelet_eloszuro_nev',
'lelet_leletezo_orvos_1_nev'
]
# Calculate the date six months ago
half_year_earlier = tomorrow_date - timedelta(days=180) # 180
lelet_kelte_start_date = half_year_earlier.strftime("%Y-%m-%d")
# URL for cumulative distribution data
cumulative_distribution_url = f"https://api.lelet.medserv.hu/cytology/grid?include={url_conversion(cumulative_include_list)}&sort=%5B%7B%22letrehozva%22:-1%7D%5D&where=%7B%22and%22:%5B%7B%22lelet_kelte%22:%7B%22gte%22:%22{lelet_kelte_start_date}%22%7D%7D%5D%7D"
def fetch_all_data(url, headers, batch_size=12000):
"""Fetch all paginated data from the specified URL."""
all_data = []
skip = 0
while True:
paginated_url = f"{url}&skip={skip}&limit={batch_size}"
response = requests.get(paginated_url, headers=headers)
if response.status_code == 200:
data = response.json()
rows = data.get('rows', [])
if not rows: # Break if no more data is returned
break
all_data.extend(rows) # Append new records to all_data
skip += batch_size # Increment skip for the next batch
else:
print(f"Error: {response.status_code}")
print(response.text)
break
return all_data
# Fetch and process cumulative data
fetched_df_for_cumulative = pd.DataFrame(fetch_all_data(cumulative_distribution_url, headers))
format_fetched_df_for_cumulative = fetched_df_for_cumulative.groupby(['lelet_leletezo_orvos_1_nev']).count()['id'].reset_index()
# Function to format the names
def format_name(name):
# Remove "Dr." and split the name into parts
parts = name.replace("Dr. ", "").split()
# Keep the 2nd and 3rd elements, if they exist
return ' '.join(parts[0:2]) if len(parts) >= 2 else ''
# Apply the formatting function to the 'names' column
format_fetched_df_for_cumulative['peeler_names'] = format_fetched_df_for_cumulative['lelet_leletezo_orvos_1_nev'].apply(format_name)
format_fetched_df_for_cumulative = format_fetched_df_for_cumulative[['peeler_names','id']]
format_fetched_df_for_cumulative.rename(columns={'id': 'Total'}, inplace=True)
#Dropping Babarczi Edit from Dataframe:
format_fetched_df_for_cumulative = format_fetched_df_for_cumulative[format_fetched_df_for_cumulative['peeler_names'] != 'Kiss Zsombor']
# Merge DataFrames
merged_cumulative = pd.merge(format_fetched_df_for_cumulative, smearshare_config_df, on= 'peeler_names',how='left')[['peelers','peeler_names','Total']]
# Convert the values to numeric by extracting the number part
# The regex extracts digits and converts them to integers
merged_cumulative['numeric_part'] = merged_cumulative['peelers'].str.extract('(\d+)').astype(float)
# Calculate the maximum value
pmax = merged_cumulative['numeric_part'].max()
# Find the indices of NaN values
na_indices = merged_cumulative[merged_cumulative['peelers'].isna()].index
# Fill NaN values with pmax + 1, pmax + 2, ...
for i, index in enumerate(na_indices):
merged_cumulative.at[index, 'peelers'] = f'P{int(pmax) + i + 1}'
# Optionally, drop the temporary numeric part column if not needed
merged_cumulative = merged_cumulative.drop(columns='numeric_part')
# Create a dictionary for cumulative distribution
cumulative_distribution_from_medserv = dict(zip(merged_cumulative['peelers'], merged_cumulative['Total']))
save_cumulative_distribution(cumulative_distribution_from_medserv)
#Updating smearshare_config_df
try:
# Updating smearshare_config_df
update_smearshare_config_df = pd.merge(merged_cumulative, smearshare_config_df, on=['peelers', 'peeler_names'], how='outer')
# Step 1: Handle NaN values
numeric_columns = ['peeler_azo','peeler_capacities', 'capacity_weight', 'vacation_buffer', 'cumulative_impact', 'tomorrow_date']
update_smearshare_config_df[numeric_columns] = update_smearshare_config_df[numeric_columns].fillna(0)
# Dropping the 'Total' column
update_smearshare_config_df = update_smearshare_config_df.drop('Total', axis=1)
# Sorting by 'peelers'
update_smearshare_config_df = update_smearshare_config_df.sort_values(by=['peelers'])
# Clear existing content in the Google Sheet
smearshare_config.clear()
# Convert DataFrame to list of lists
data_to_update = [update_smearshare_config_df.columns.values.tolist()] + update_smearshare_config_df.values.tolist()
# Update the Google Sheet
response = smearshare_config.update(data_to_update)
print("Update response:", response)
except Exception as e:
print("An error occurred:", e)
# Log additional details for debugging
print("Error details:")
#print(f"DataFrame to update:\n{update_smearshare_config_df.head()}")
#print(f"Data sent to Google Sheets:\n{data_to_update[:5]}")
# Create a custom business day excluding the holidays / NOT IN USE NOW
# Initialize the calendar for Hungary
cal = Hungary()
current_year = datetime.now().year
special_days_include_list = ['datum','tipus','megnevezes']
special_days_url = f"https://api.lelet.medserv.hu/special-day/grid?include={url_conversion(special_days_include_list)}&sort=%5B%7B%22datum%22:-1%7D%5D&where=%7B%22and%22:%5B%7B%22datum%22:%7B%22contain%22:%22{current_year}%22%7D%7D%5D%7D&skip=0&limit=100"
response = requests.get(special_days_url, headers=headers)
data = response.json()
# Check if the response contains any data
if isinstance(data, dict) and 'rows' in data and len(data['rows']) == 0:
print("Problem: No data found.")
else:
print("Data retrieved successfully.")
# Creating two separate lists for 'Munkaszüneti nap' and 'Munkanap'
custom_holidays = pd.to_datetime([item['datum'] for item in data['rows'] if item['tipus'] == 'Munkaszüneti nap']).tolist()
additional_workdays = pd.to_datetime([item['datum'] for item in data['rows'] if item['tipus'] == 'Munkanap'])
# Get the holidays for the year
hungarian_holidays = cal.holidays(int(current_year))
hungarian_holidays_dates = [pd.Timestamp(date) for date, _ in hungarian_holidays]
combined_holidays = sorted(set(hungarian_holidays_dates + custom_holidays) - set(additional_workdays))
custom_business_day = CustomBusinessDay(holidays=combined_holidays)
# Generate all business days for the remainder of 2024
remaining_workdays = pd.date_range(start=current_date_dt, end=f"{str(int(current_year)+1)}-12-31", freq=custom_business_day)
# Combine remaining workdays with additional workdays, remove duplicates, and sort
#workdays_df = pd.DataFrame(pd.concat([pd.Series(remaining_workdays), pd.Series(additional_workdays)]).drop_duplicates().sort_values(), columns=["Workday"])
# Define the start date (6 months before) and end date (6 months after)
workdays_df_start_date = datetime.now() - pd.DateOffset(months=6)
workdays_df_end_date = datetime.now() + pd.DateOffset(months=6)
# Generate a date range for all days between the start and end dates
workdays_df_all_days = pd.date_range(start=workdays_df_start_date, end=workdays_df_end_date)
# Create a DataFrame with the 'Workday' column in datetime64[ns] format
workdays_df = pd.DataFrame(workdays_df_all_days, columns=["Workday"])
# Format the 'Workday' column to display as 'YYYY-MM-DD'
workdays_df["Workday"] = workdays_df["Workday"].dt.strftime('%Y-%m-%d')
# Ensure the 'Workday' column is in datetime64[ns] format (this should already be the case)
workdays_df["Workday"] = pd.to_datetime(workdays_df["Workday"])
#print(workdays_df)
#print(workdays_df.dtypes)
workers = smearshare_config_df['peeler_names'].to_list()
# Function to shuffle workers
def shuffle_workers(workers):
shuffled = workers.copy()
np.random.shuffle(shuffled)
return shuffled
# Add a column 'Active' with shuffled worker names for each day
workdays_df['Active'] = workdays_df['Workday'].apply(lambda x: shuffle_workers(workers))
# Vacation data
#szabik_df = pd.read_csv('szabik - Munkalap1.csv')
# Convert Start Date and End Date to datetime objects
szabik_df["Szabi Kezdete"] = pd.to_datetime(szabik_df["Szabi Kezdete"])
szabik_df["Szabi Vége"] = pd.to_datetime(szabik_df["Szabi Vége"])
# Calculate one workday before each vacation start date
szabik_df["Two Days Before Start Date"] = szabik_df["Szabi Kezdete"] - custom_business_day - timedelta(days=1)
# Calculate one week before vacation for Bori Rita and Serényi Péter
szabik_df["One Week Before Vacation"] = szabik_df["Szabi Kezdete"] - timedelta(weeks=1)
# Update 'Active' column by removing workers on vacation
def remove_vacationers(workday, active_workers):
# Iterate over the vacation dataframe
for _, vacation in szabik_df.iterrows():
# Remove workers on vacation during the workday
if vacation["Two Days Before Start Date"] <= workday < vacation["Szabi Vége"]:
if vacation["Név"] in active_workers:
active_workers.remove(vacation["Név"])
# Specifically for Bori Rita and Serényi Péter, remove them one week before their vacation
if vacation["Név"] in ["Bori Rita", "Serényi Péter"]:
if vacation["One Week Before Vacation"] <= workday < vacation["Szabi Vége"]:
if vacation["Név"] in active_workers:
active_workers.remove(vacation["Név"])
# Exclude Bori Rita and Serényi Péter on Thursdays and Fridays
#if workday.weekday() not in [2, 3, 4]: # Wednesday:2, Thursday: 3, Friday: 4
# active_workers = [worker for worker in active_workers if worker not in ["Bori Rita", "Serényi Péter"]]
return active_workers
# Apply the removal function to update the 'Active' column
workdays_df['Active'] = workdays_df.apply(
lambda row: remove_vacationers(row['Workday'], row['Active']), axis=1
)
#print(szabik_df[['Név', 'Szabi Kezdete', 'Two Days Before Start Date']])
#print(workdays_df[workdays_df['Workday']>= current_date])
# Track cumulative distribution
# Function to get active peelers for the current date
def get_active_peelers(workdays_df, current_date):
try:
return workdays_df.loc[workdays_df['Workday'] == current_date, 'Active'].values[0]
except IndexError:
return []
# Define the function to calculate dynamic vacation buffer during the grace period
def dynamic_vacation_buffer(worker_name, workday, szabik_df, custom_business_day, grace_period_days=4, default_vacation_buffer=smearshare_config_df['vacation_buffer'].iloc[0]):
#print(f"Default vacation buffer: {default_vacation_buffer}") # Debugging: Print the default buffer value
# Filter for the specific worker's vacation records
worker_vacations = szabik_df[szabik_df["Név"] == worker_name]
for _, vacation in worker_vacations.iterrows():
if vacation["Szabi Vége"] < workday <= vacation["Szabi Vége"] + timedelta(days=grace_period_days):
# Check if the vacation lasted more than one week (7 days)
vacation_length = (vacation["Szabi Vége"] - vacation["Szabi Kezdete"]).days
if vacation_length > 7:
grace_period_days += 1 # Add one day for vacations longer than a week
# Calculate valid workdays
grace_period_start = vacation["Szabi Vége"] + timedelta(days=1) # Start from the next day after vacation ends
valid_workdays = pd.date_range(start=grace_period_start, end=workday, freq=custom_business_day)
days_since_vacation = len(valid_workdays)
# If it's the first day back, return the default vacation buffer
if days_since_vacation == 1:
#print(f'First day after vacation for {worker_name}, default vacation buffer:{default_vacation_buffer}')
return default_vacation_buffer
# Diminishing buffer calculation
diminishing_buffer = max(1, default_vacation_buffer - ((days_since_vacation / grace_period_days) * (default_vacation_buffer - 1)))
#print(f'Diminishing buffer for {worker_name}: {diminishing_buffer}')
return diminishing_buffer
#print(f'No relevant vacation data found for {worker_name} on {workday}. Returning default factor of 1.')
return 1 # If not in grace period, return 1
#load_cumulative distribution once!!
cumulative_distribution = load_cumulative_distribution()
# Function to calculate potato distribution
def calculate_distribution(num_potatoes, active_peelers, peeler_names, workday, szabik_df):
valid_active_peelers = [p for p in active_peelers if p in peeler_capacities]
#cumulative_distribution = load_cumulative_distribution()
if not valid_active_peelers:
return {peeler_names[p]: 0 for p in active_peelers}
# Retrieve capacity weight and vacation buffer using the helper function
capacity_weight = smearshare_config_df['capacity_weight'].iloc[0]
cumulative_impact = smearshare_config_df['cumulative_impact'].iloc[0]
#print(f'Capacity weight:{capacity_weight}')
#print(f'Cumulative impact: {cumulative_impact}')
adjusted_weights = {}
for peeler in valid_active_peelers:
capacity = peeler_capacities[peeler]
#print(f'Capacity:{capacity}')
# Calculate the cumulative proportion of potatoes handled by the peeler
total_potatoes_processed = sum(cumulative_distribution.values()) or 1 # Avoid division by zero
peeler_proportion = cumulative_distribution.get(peeler, 0) / total_potatoes_processed
# Apply non-linear scaling to peeler_proportion to reduce the impact
non_linear_proportion = peeler_proportion ** cumulative_impact # You can adjust this for different effects
# Adjust the weight using the dynamically diminishing vacation buffer during grace period
vacation_buffer = dynamic_vacation_buffer(peeler_names[peeler], workday, szabik_df, custom_business_day)
adjusted_weight = (capacity * capacity_weight) / (1 + non_linear_proportion * vacation_buffer)
adjusted_weights[peeler] = adjusted_weight
#print(f"Adjusted Weights: {adjusted_weights}")
total_weight = sum(adjusted_weights.values())
if total_weight == 0:
return {peeler_names[p]: 0 for p in valid_active_peelers}
raw_distribution = {peeler: (num_potatoes * adjusted_weights[peeler]) / total_weight for peeler in valid_active_peelers}
#print(f"Raw Distribution: {raw_distribution}")
whole_distribution = {peeler: math.floor(potatoes) for peeler, potatoes in raw_distribution.items()}
total_distributed = sum(whole_distribution.values())
remaining_potatoes = num_potatoes - total_distributed
if remaining_potatoes > 0:
for i in range(remaining_potatoes):
peeler = valid_active_peelers[i % len(valid_active_peelers)]
whole_distribution[peeler] += 1
#for peeler in valid_active_peelers:
# cumulative_distribution[peeler] += whole_distribution[peeler]
# save_cumulative_distribution(cumulative_distribution)
return {peeler_names[peeler]: whole_distribution[peeler] for peeler in valid_active_peelers}
target_list = ['supervisor','orvos','special']
def distribute_potatoes(onbehalf_of_user, target, spec_peeler_azo = None):
#onBehalfOfUser: S34151 #azo,id
#target: supervisor | orvos | special
#spec_peeler: null | “O12345”
active_peelers = get_active_peelers(workdays_df, current_date_dt)
# Map keys based on active_peelers values
if not active_peelers:
return "A mai napon nem adható fel kenet konzultációra."
active_peeler_ids = {id: name for id, name in peeler_names.items() if name in active_peelers}
# Separate peelers for red and blue potatoes
red_peelers = [id for id in active_peeler_ids if id in ["P7", "P8"] and id != peeler_azo.get(onbehalf_of_user)]
blue_peelers = [id for id in active_peeler_ids if id in ["P1", "P13", "P2","P3", "P4","P5","P6"] and id != peeler_azo.get(onbehalf_of_user)]
# Shuffle the red and blue peelers to avoid bias in distribution
random.shuffle(red_peelers)
random.shuffle(blue_peelers)
# Calculate initial distribution for all active peelers
def calculate_initial_distribution(peelers):
# Calculate and return the initial distribution
return calculate_distribution(1, peelers, peeler_names, current_date_dt, szabik_df)
if target == 'supervisor':
final_distribution = calculate_initial_distribution(red_peelers)
cumulative_distribution = update_cumulative_distribution(final_distribution)
print(cumulative_distribution)
save_distribution_activity(final_distribution)
save_allocation_activity(onbehalf_of_user, "general")
# Get the names where the value is greater than 0
final_name = [key for key, value in final_distribution.items() if value > 0]
return peeler_name_azo.get(final_name[0])
elif target == 'orvos':
final_distribution = calculate_initial_distribution(blue_peelers)
cumulative_distribution = update_cumulative_distribution(final_distribution)
print(cumulative_distribution)
save_distribution_activity(final_distribution)
save_allocation_activity(onbehalf_of_user, "general")
final_name = [key for key, value in final_distribution.items() if value > 0]
return peeler_name_azo.get(final_name[0])
elif target == 'special':
if spec_peeler_azo is None:
return "No spec_peeler_azo provided for special distribution."
if spec_peeler_azo not in get_active_peelers_azo():
return " A kiválasztott konzulens nem aktív"
# Fordított szótár létrehozása
azo_to_peeler_name = {v: k for k, v in peeler_name_azo.items()}
# Megkeresni a nevet a spec_peeler_azo alapján
spec_peeler_name = azo_to_peeler_name.get(spec_peeler_azo)
# Létrehozni a final_distribution-t
final_distribution = {spec_peeler_name: 1}
cumulative_distribution = update_cumulative_distribution(final_distribution)
print(cumulative_distribution)
save_distribution_activity(final_distribution)
save_allocation_activity(onbehalf_of_user, "special")
return spec_peeler_azo
# Create Gradio interface
class Seafoam(Base):
pass
gradio_theme = Seafoam()
def get_active_peelers_azo():
return [v for k,v in peeler_name_azo.items() if k in get_active_peelers(workdays_df, current_date_dt)]
# Gradio app definition
with gr.Blocks(theme=gradio_theme) as demo:
gr.Markdown(f"<h1 style='text-align: center; font-weight: bold;'>Smear/Share - {current_date}</h1>")
with gr.Column():
user_input = gr.Number(label="Onbehalf_of_user")
target_input = gr.Dropdown(choices=target_list, label="Target_list")
special_input = gr.Dropdown(choices=get_active_peelers_azo(), label="Specifikus konzulens")
with gr.Row():
share_button = gr.Button("Szétosztás", variant="primary")
active_peelers_button = gr.Button("Aktív konzulensek", variant="primary")
with gr.Column():
output_box = gr.Textbox(label="Konzulens")
active_peelers_output = gr.Textbox(label="Aktív dolgozók")
# Share button action
share_button.click(
fn=lambda user, target, special: (
distribute_potatoes(user, target, special)
),
inputs=[user_input, target_input, special_input],
outputs=[output_box]
).then(
fn=lambda: (None, None, None), # Reset all inputs
inputs=[],
outputs=[user_input, target_input, special_input]
)
active_peelers_button.click(
fn= get_active_peelers_azo,
outputs=[active_peelers_output]
)
# Launch the main Gradio app with the API endpoint exposed
demo.launch()