Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import copy | |
| import os | |
| import gradio as gr | |
| from collections import Counter | |
| import random | |
| # CONSTANTS | |
| NAME_COL = 'Juggler_Name' | |
| NUM_WORKSHOPS_COL = 'Num_Workshops' | |
| AVAIL_COL = 'Availability' | |
| DESCRIP_COL = 'Workshop_Descriptions' | |
| DELIMITER = ';' | |
| class Schedule: | |
| def __init__(self, timeslots: dict): | |
| self.num_timeslots_filled = 0 | |
| self.total_num_workshops = 0 | |
| for time,instructors in timeslots.items(): | |
| curr_len = len(instructors) | |
| if curr_len > 0: | |
| self.num_timeslots_filled += 1 | |
| self.total_num_workshops += curr_len | |
| self.timeslots = timeslots | |
| def add(self, person: str, time: str): | |
| self.total_num_workshops += 1 | |
| if len(self.timeslots[time]) == 0: | |
| self.num_timeslots_filled += 1 | |
| self.timeslots[time].append(person) | |
| def remove(self, person: str, time: str): | |
| self.total_num_workshops -= 1 | |
| if len(self.timeslots[time]) == 1: | |
| self.num_timeslots_filled -= 1 | |
| self.timeslots[time].remove(person) | |
| # Returns True if the person can teach during the slot, and False otherwise | |
| def can_teach(person: str, slot: list, capacity: int) -> bool: | |
| if len(slot) == capacity or len(slot) > capacity: | |
| return False | |
| # No one can teach two workshops at once | |
| if person in slot: | |
| return False | |
| return True | |
| # Extracts relevant information from the df with availability and puts it into a useable format | |
| def convert_df(df): | |
| people = [] | |
| # Key: person's name | |
| # Value: a list of their availability | |
| availability = {} | |
| seen = set() | |
| for row in range(len(df)): | |
| # TODO: make sure no people with the same name fill out the form | |
| name = df.loc[row, NAME_COL] | |
| number = df.loc[row, NUM_WORKSHOPS_COL] | |
| if number == 1: | |
| people.append(name) | |
| # Add people who are teaching multiple workshops to the list more than once | |
| else: | |
| for i in range(number): | |
| people.append(name) | |
| curr_avail = df.loc[row, AVAIL_COL] | |
| curr_avail = curr_avail.split(DELIMITER) | |
| curr_avail = [elem.strip() for elem in curr_avail] | |
| availability[name] = curr_avail | |
| return people, availability | |
| # Returns False if curr is NaN, and True otherwise | |
| def is_defined(curr): | |
| # if curr != curr, then curr is NaN for some reason | |
| if curr != curr: | |
| return False | |
| else: | |
| return True | |
| # Returns True if curr is defined and its length is greater than 0 | |
| def is_valid(curr): | |
| return (is_defined(curr) and len(curr) > 0) | |
| # Makes a dictionary where each key is a timeslot and each value is a list. | |
| # If there's no partial schedule, each list will be empty. | |
| # If there's a partial schedule, each list will include the people teaching during that slot. | |
| def initialize_timeslots(df) -> dict: | |
| all_timeslots = set() | |
| availability = df[AVAIL_COL] | |
| for elem in availability: | |
| curr_list = elem.split(DELIMITER) | |
| for inner in curr_list: | |
| all_timeslots.add(inner.strip()) | |
| to_return = {} | |
| for slot in all_timeslots: | |
| to_return[slot] = [] | |
| return to_return | |
| # Recursive function that generates all possible schedules | |
| def find_all_schedules(people: list, availability: dict, schedule_obj: Schedule, capacity: int, schedules: list, max_list: list) -> None: | |
| if schedule_obj.num_timeslots_filled > max_list[0] or schedule_obj.num_timeslots_filled == max_list[0]: | |
| schedules.append(copy.deepcopy(schedule_obj)) | |
| max_list[0] = schedule_obj.num_timeslots_filled | |
| # Base case | |
| if len(people) == 0: | |
| return | |
| # Recursive cases | |
| person = people[0] | |
| for time in availability[person]: | |
| if can_teach(person, schedule_obj.timeslots[time], capacity): | |
| # Choose (put that person in that timeslot) | |
| schedule_obj.add(person, time) | |
| # Explore (assign everyone else to timeslots based on that decision) | |
| if len(people) == 1: | |
| find_all_schedules([], availability, schedule_obj, capacity, schedules, max_list) | |
| else: | |
| find_all_schedules(people[1:len(people)], availability, schedule_obj, capacity, schedules, max_list) | |
| # Unchoose (remove that person from the timeslot) | |
| schedule_obj.remove(person, time) | |
| # NOTE: this will not generate a full timeslot, but could still lead to a good schedule | |
| else: | |
| if len(people) == 1: | |
| find_all_schedules([], availability, schedule_obj, capacity, schedules, max_list) | |
| else: | |
| find_all_schedules(people[1:len(people)], availability, schedule_obj, capacity, schedules, max_list) | |
| return | |
| # Makes an organized DataFrame given a list of schedules | |
| def make_df(schedules: list, descrip_dict: dict): | |
| all_times = [] | |
| all_instructors = [] | |
| seen = [] | |
| count = 1 | |
| for i in range (len(schedules)): | |
| curr_sched = schedules[i] | |
| if curr_sched in seen: | |
| continue | |
| else: | |
| seen.append(curr_sched) | |
| # Sort dictionary by keys | |
| sorted_dict = dict(sorted(curr_sched.items(), key=lambda item: item[0])) | |
| curr_times = sorted_dict.keys() | |
| curr_instructors = sorted_dict.values() | |
| # Include an empty row between schedules | |
| if count != 1: | |
| all_times.append("") | |
| all_instructors.append("") | |
| if len(schedules) > 0: | |
| all_times.append(f"Schedule #{count}") | |
| all_instructors.append("") | |
| count += 1 | |
| for slot in curr_times: | |
| all_times.append(slot) | |
| for instructors in curr_instructors: | |
| if len(descrip_dict) == 0: | |
| all_instructors.append("; ". join(instructors)) | |
| # The format will be: Time: Instructor (Workshop); Instructor (Workshop) | |
| if len(descrip_dict) > 0: | |
| string = "" | |
| for person in instructors: | |
| if person in descrip_dict: | |
| descrip = descrip_dict[person] | |
| else: | |
| descrip = "Workshop" | |
| if len(descrip) > 0: | |
| descrip = descrip.replace(DELIMITER, f" OR ") | |
| string += f"{person} ({descrip}); " | |
| else: | |
| string += f"{person}" | |
| string = string.strip("; ") | |
| all_instructors.append(string) | |
| new_df = pd.DataFrame({ | |
| "Schedule": all_times, | |
| "Instructor(s)": all_instructors | |
| }) | |
| return new_df, count - 1 | |
| # Returns the stripped version of the column name | |
| # or the default one if the user didn't input a column name | |
| def get_var_name(var, default): | |
| if var is None or len(var) == 0: | |
| return default | |
| else: | |
| return var.strip() | |
| # Returns an error message, empty DataFrame, and blank csv file | |
| def error_msg(message: str): | |
| empty = pd.DataFrame({"Schedule": ["ERROR"], "Instructor": ["ERROR"]}) | |
| directory = os.path.abspath(os.getcwd()) | |
| path = directory + "/schedules/ERROR.csv" | |
| empty.to_csv(path, index=False) | |
| return "ERROR: " + message, empty, path | |
| # Returns column names that aren't in the csv file | |
| def find_missing_cols(df_columns: list, names: list, file: str) -> str: | |
| missing = [] | |
| for elem in names: | |
| if elem not in df_columns: | |
| missing.append(elem) | |
| double_check = f"""These are the columns in your file: {"; ".join(df_columns)}. Please double check your spelling/punctuation and try again.""" | |
| if len(missing) == 0: | |
| return "" | |
| elif len(missing) == 1: | |
| return f'I cannot find this column in the {file} file you uploaded: {missing[0]}. {double_check}' | |
| elif len(missing) == 2: | |
| return f'I cannot find these columns in the {file} file you uploaded: {missing[0]} and {missing[1]}. {double_check}' | |
| else: | |
| message = f"I cannot find these columns in the {file} file you uploaded: " | |
| for i in range(len(missing)): | |
| col = missing[i] | |
| if i != len(missing) - 1: | |
| message += col + ", " | |
| else: | |
| message += "and " + col + ". " | |
| message += double_check | |
| return message | |
| # Makes a dictionary where each key is the instructor's name and | |
| # the value is the workshop(s) they're teaching | |
| def get_description_dict(df): | |
| new_dict = {} | |
| for row in range(len(df)): | |
| name = df.loc[row, NAME_COL] | |
| new_dict[name] = df.loc[row, DESCRIP_COL] | |
| return new_dict | |
| # Classifies schedules into two categories: complete and incomplete: | |
| # Complete = everyone is teaching desired number of timeslots and each timeslot is filled | |
| # NOTE: I'm using "valid" instead of "complete" as a variable name so that I don't mix it up | |
| # Incomplete = not complete | |
| def classify_schedules(people: list, schedules: list, partial_names: list, total_timeslots: int, max_timeslots_filled: int) -> tuple: | |
| valid_schedules = [] | |
| # Key: score | |
| # Value: schedules with that score | |
| incomplete_schedules = {} | |
| # Get frequency of items in the list | |
| # Key: person | |
| # Value: number of workshops they WANT to teach | |
| pref_dict = Counter(people) | |
| pref_dict.update(Counter(partial_names)) | |
| all_names = pref_dict.keys() | |
| # Evaluate each schedule | |
| overall_max = 0 | |
| for sched in schedules: | |
| if sched.num_timeslots_filled != max_timeslots_filled: | |
| continue | |
| # Key: person | |
| # Value: how many workshops they're ACTUALLY teaching in this schedule | |
| freq_dict = {} | |
| for name in all_names: | |
| freq_dict[name] = 0 | |
| for timeslot, instructor_list in sched.timeslots.items(): | |
| for instructor in instructor_list: | |
| if instructor in freq_dict: | |
| freq_dict[instructor] += 1 | |
| else: | |
| print("there is a serious issue!!!!") | |
| # See if everyone is teaching their desired number of workshops | |
| everyone_is_teaching = True | |
| for teacher, freq in freq_dict.items(): | |
| if freq != pref_dict[teacher]: | |
| #print(f"teacher: {teacher}. preference: {pref_dict[teacher]}. actual frequency: {freq}") | |
| everyone_is_teaching = False | |
| break | |
| filled_all_timeslots = (sched.num_timeslots_filled == total_timeslots) | |
| if everyone_is_teaching and filled_all_timeslots: | |
| valid_schedules.append(sched) | |
| else: | |
| # No need to add to incomplete_schedules if there's at least one valid schedule | |
| if len(valid_schedules) > 0: | |
| continue | |
| #print(f"teaching desired number of timeslots: {everyone_is_teaching}. At least one workshop per slot: {filled_all_timeslots}.\n{sched}\n") | |
| if sched.num_timeslots_filled not in incomplete_schedules: | |
| incomplete_schedules[sched.num_timeslots_filled] = [] | |
| incomplete_schedules[sched.num_timeslots_filled].append(sched) | |
| if sched.num_timeslots_filled > overall_max: | |
| overall_max = sched.num_timeslots_filled | |
| if len(valid_schedules) > 0: | |
| return valid_schedules, [] | |
| else: | |
| return [], incomplete_schedules[overall_max] | |
| # Parameters: schedules that have the max number of timeslots filled | |
| # Returns: a list of all schedules that have the max number of workshops | |
| # To make it less overwhelming, it will return {cutoff} randomly | |
| def get_best_schedules(schedules: list, cutoff: str) -> list: | |
| cutoff = int(cutoff) | |
| overall_max = 0 | |
| best_schedules = {} | |
| for sched in schedules: | |
| if sched.total_num_workshops not in best_schedules: | |
| best_schedules[sched.total_num_workshops] = [] | |
| best_schedules[sched.total_num_workshops].append(sched.timeslots) | |
| if sched.total_num_workshops > overall_max: | |
| overall_max = sched.total_num_workshops | |
| all_best_schedules = best_schedules[overall_max] | |
| if cutoff == -1: | |
| return all_best_schedules | |
| else: | |
| if len(all_best_schedules) > cutoff: | |
| # Sample without replacement | |
| return random.sample(all_best_schedules, cutoff) | |
| else: | |
| return all_best_schedules | |
| # Big wrapper function that calls the other functions | |
| def main(df, capacity:int, num_results: int): | |
| descrip_dict = get_description_dict(df) | |
| # Convert the df with everyone's availability to a usable format | |
| res = convert_df(df) | |
| people = res[0] | |
| availability = res[1] | |
| partial_names = [] | |
| timeslots = initialize_timeslots(df) | |
| schedules = [] | |
| schedule_obj = Schedule(timeslots) | |
| max_list = [0] | |
| find_all_schedules(people, availability, schedule_obj, capacity, schedules, max_list) | |
| total_timeslots = len(timeslots) | |
| res = classify_schedules(people, schedules, partial_names, total_timeslots, max_list[0]) | |
| valid_schedules = res[0] | |
| decent_schedules = res[1] | |
| # Return schedules | |
| if len(valid_schedules) > 0: | |
| best_schedules = get_best_schedules(valid_schedules, num_results) | |
| res = make_df(best_schedules, descrip_dict) | |
| new_df = res[0] | |
| count = res[1] | |
| if count == 1: | |
| results = "Good news! I was able to make a schedule." | |
| else: | |
| results = "Good news! I was able to make multiple schedules." | |
| else: | |
| best_schedules = get_best_schedules(decent_schedules, num_results) | |
| res = make_df(best_schedules, descrip_dict) | |
| new_df = res[0] | |
| count = res[1] | |
| beginning = "Unfortunately, I wasn't able to make a complete schedule, but here" | |
| if count == 1: | |
| results = f"{beginning} is the best option." | |
| else: | |
| results = f"{beginning} are the best options." | |
| directory = os.path.abspath(os.getcwd()) | |
| path = directory + "/schedules/schedule.csv" | |
| new_df.to_csv(path, index=False) | |
| return results, new_df, path |