Spaces:
Runtime error
Runtime error
Added the update JSON heuristics
Browse files
app.py
CHANGED
|
@@ -3,60 +3,136 @@ import streamlit as st
|
|
| 3 |
import json
|
| 4 |
import clipboard
|
| 5 |
|
| 6 |
-
from main import genetic_algorithm, polish_errors, calculate_errors
|
| 7 |
|
| 8 |
# Initialize session state
|
| 9 |
if 'services' not in st.session_state:
|
| 10 |
st.session_state.services = {}
|
| 11 |
if 'users' not in st.session_state:
|
| 12 |
st.session_state.users = {}
|
|
|
|
|
|
|
| 13 |
|
| 14 |
# App title
|
| 15 |
-
st.title('Services and Users
|
| 16 |
|
| 17 |
# Add sliders for population_size, num_generations, and mutation_rate
|
| 18 |
st.subheader('Genetic Algorithm Parameters')
|
| 19 |
-
population_size = st.slider('Population Size', min_value=500, max_value=5000, value=
|
| 20 |
-
num_generations = st.slider('Number of Generations', min_value=1000, max_value=10000, value=
|
| 21 |
-
mutation_rate = st.slider('Mutation Rate', min_value=0.0, max_value=1.0, value=0.01, step=0.
|
| 22 |
|
| 23 |
# Button to run the genetic algorithm
|
| 24 |
-
|
|
|
|
|
|
|
|
|
|
| 25 |
# Call the genetic_algorithm function and get the best_solution
|
| 26 |
-
best_solution = genetic_algorithm(
|
| 27 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 28 |
|
| 29 |
# Convert the best_solution to JSON
|
| 30 |
-
best_solution_json = json.dumps(
|
| 31 |
-
best_solution_errors = calculate_errors(
|
|
|
|
| 32 |
best_solution_errors = polish_errors(best_solution_errors)
|
| 33 |
-
|
| 34 |
|
| 35 |
# Display the output JSON in a read-only form
|
| 36 |
-
st.subheader('Best
|
| 37 |
-
st.text_area('Best
|
| 38 |
-
|
|
|
|
|
|
|
| 39 |
|
| 40 |
if st.button('Copy solution to Clipboard'):
|
| 41 |
clipboard.copy(best_solution_json)
|
| 42 |
-
st.success('JSON copied to clipboard!')
|
| 43 |
if st.button('Copy unmet constraints to Clipboard'):
|
| 44 |
-
clipboard.copy(
|
| 45 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 46 |
|
| 47 |
# Sidebar for uploading previously generated JSON
|
| 48 |
-
with st.sidebar.expander('
|
| 49 |
-
|
| 50 |
-
|
| 51 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 52 |
|
| 53 |
if reset_json:
|
| 54 |
st.session_state.services = {}
|
| 55 |
st.session_state.users = {}
|
| 56 |
|
| 57 |
-
if merge_json and
|
| 58 |
try:
|
| 59 |
-
loaded_data = json.loads(
|
| 60 |
st.session_state.services.update(loaded_data.get('services', {}))
|
| 61 |
st.session_state.users.update(loaded_data.get('users', {}))
|
| 62 |
st.success('JSON loaded successfully')
|
|
@@ -69,20 +145,26 @@ with st.sidebar.expander('Update existing user or service'):
|
|
| 69 |
|
| 70 |
if object_type == 'Service':
|
| 71 |
service_key = st.selectbox('Select a service', list(st.session_state.services.keys()), key='update_service_key')
|
| 72 |
-
if service_key
|
| 73 |
-
st.
|
| 74 |
-
|
| 75 |
-
|
| 76 |
-
|
| 77 |
-
|
|
|
|
|
|
|
|
|
|
| 78 |
|
| 79 |
elif object_type == 'User':
|
| 80 |
user_key = st.selectbox('Select a user', list(st.session_state.users.keys()), key='update_user_key')
|
| 81 |
-
if user_key
|
| 82 |
-
st.
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
|
|
|
|
|
|
|
|
|
|
| 86 |
|
| 87 |
# Add a service form
|
| 88 |
with st.form(key='service_form'):
|
|
@@ -133,7 +215,7 @@ combined_data = {
|
|
| 133 |
json_data = json.dumps(combined_data, indent=4)
|
| 134 |
|
| 135 |
# Display the generated JSON
|
| 136 |
-
st.subheader('Generated JSON')
|
| 137 |
st.code(json_data, language='json')
|
| 138 |
|
| 139 |
# Button to copy JSON to clipboard
|
|
|
|
| 3 |
import json
|
| 4 |
import clipboard
|
| 5 |
|
| 6 |
+
from main import genetic_algorithm, polish_errors, calculate_errors, update_genetic_algorithm, calculate_diff
|
| 7 |
|
| 8 |
# Initialize session state
|
| 9 |
if 'services' not in st.session_state:
|
| 10 |
st.session_state.services = {}
|
| 11 |
if 'users' not in st.session_state:
|
| 12 |
st.session_state.users = {}
|
| 13 |
+
if 'solution' not in st.session_state:
|
| 14 |
+
st.session_state.solution = {}
|
| 15 |
|
| 16 |
# App title
|
| 17 |
+
st.title('Services and Users Assignment Center')
|
| 18 |
|
| 19 |
# Add sliders for population_size, num_generations, and mutation_rate
|
| 20 |
st.subheader('Genetic Algorithm Parameters')
|
| 21 |
+
population_size = st.slider('Population Size', min_value=500, max_value=5000, value=1500, step=100)
|
| 22 |
+
num_generations = st.slider('Number of Generations', min_value=1000, max_value=10000, value=2500, step=250)
|
| 23 |
+
mutation_rate = st.slider('Mutation Rate', min_value=0.0, max_value=1.0, value=0.01, step=0.05)
|
| 24 |
|
| 25 |
# Button to run the genetic algorithm
|
| 26 |
+
new_generation_run = st.button('Run new solution')
|
| 27 |
+
update_generation_run = st.button('Update solution')
|
| 28 |
+
|
| 29 |
+
if new_generation_run:
|
| 30 |
# Call the genetic_algorithm function and get the best_solution
|
| 31 |
+
best_solution = genetic_algorithm(
|
| 32 |
+
services=st.session_state.services,
|
| 33 |
+
users=st.session_state.users,
|
| 34 |
+
population_size=population_size,
|
| 35 |
+
num_generations=num_generations,
|
| 36 |
+
mutation_rate=mutation_rate
|
| 37 |
+
)
|
| 38 |
+
|
| 39 |
+
# Save the state of the current best solution
|
| 40 |
+
st.session_state.solution = best_solution
|
| 41 |
|
| 42 |
# Convert the best_solution to JSON
|
| 43 |
+
best_solution_json = json.dumps(st.session_state.solution, indent=4)
|
| 44 |
+
best_solution_errors = calculate_errors(st.session_state.solution, st.session_state.services,
|
| 45 |
+
st.session_state.users)
|
| 46 |
best_solution_errors = polish_errors(best_solution_errors)
|
| 47 |
+
best_solution_errors_json = json.dumps(best_solution_errors, indent=4)
|
| 48 |
|
| 49 |
# Display the output JSON in a read-only form
|
| 50 |
+
st.subheader('Best solution JSON')
|
| 51 |
+
st.text_area('Best solution',
|
| 52 |
+
value=best_solution_json, height=400, max_chars=None, key=None, disabled=True)
|
| 53 |
+
st.text_area('Unmet constraints',
|
| 54 |
+
value=best_solution_errors_json, height=200, max_chars=None, key=None, disabled=True)
|
| 55 |
|
| 56 |
if st.button('Copy solution to Clipboard'):
|
| 57 |
clipboard.copy(best_solution_json)
|
|
|
|
| 58 |
if st.button('Copy unmet constraints to Clipboard'):
|
| 59 |
+
clipboard.copy(best_solution_errors_json)
|
| 60 |
+
|
| 61 |
+
if update_generation_run:
|
| 62 |
+
# Call the genetic_algorithm function and get the best_solution
|
| 63 |
+
best_updated_solution = update_genetic_algorithm(
|
| 64 |
+
prev_solution=st.session_state.solution,
|
| 65 |
+
updated_services=st.session_state.services,
|
| 66 |
+
updated_users=st.session_state.users,
|
| 67 |
+
population_size=population_size,
|
| 68 |
+
num_generations=num_generations,
|
| 69 |
+
mutation_rate=mutation_rate
|
| 70 |
+
)
|
| 71 |
+
|
| 72 |
+
change_report = calculate_diff(best_updated_solution, st.session_state.solution)
|
| 73 |
+
change_report_json = json.dumps(change_report, indent=4)
|
| 74 |
+
|
| 75 |
+
# Convert the best_solution to JSON
|
| 76 |
+
best_updated_solution_json = json.dumps(st.session_state.solution, indent=4)
|
| 77 |
+
best_updated_solution_errors = calculate_errors(
|
| 78 |
+
st.session_state.solution, st.session_state.services, st.session_state.users
|
| 79 |
+
)
|
| 80 |
+
best_updated_solution_errors = polish_errors(best_updated_solution_errors)
|
| 81 |
+
best_updated_solution_errors_json = json.dumps(best_updated_solution_errors, indent=4)
|
| 82 |
+
|
| 83 |
+
# Display the output JSON in a read-only form
|
| 84 |
+
st.subheader('Best updated solution JSON')
|
| 85 |
+
st.text_area('Best updated solution',
|
| 86 |
+
value=best_updated_solution_json, height=600, max_chars=None, key=None, disabled=True)
|
| 87 |
+
st.text_area('Updated unmet constraints',
|
| 88 |
+
value=best_updated_solution_errors_json, height=300, max_chars=None, key=None, disabled=True)
|
| 89 |
+
st.text_area('Change report',
|
| 90 |
+
value=change_report_json, height=200, max_chars=None, key=None, disabled=True)
|
| 91 |
+
|
| 92 |
+
if st.button('Copy updated solution to Clipboard'):
|
| 93 |
+
clipboard.copy(best_updated_solution_json)
|
| 94 |
+
if st.button('Copy updated unmet constraints to Clipboard'):
|
| 95 |
+
clipboard.copy(best_updated_solution_errors_json)
|
| 96 |
+
if st.button('Save this updated solution over the last fully generated one'):
|
| 97 |
+
# Save the state of the current best solution
|
| 98 |
+
st.session_state.solution = best_updated_solution
|
| 99 |
|
| 100 |
# Sidebar for uploading previously generated JSON
|
| 101 |
+
with st.sidebar.expander('Previously generated solution JSON'):
|
| 102 |
+
uploaded_solution_json = st.text_area('Paste your previously generated JSON here',
|
| 103 |
+
value=json.dumps(st.session_state.get('solution', ''), indent=4))
|
| 104 |
+
merge_json = st.button('Upload previously generated JSON')
|
| 105 |
+
reset_json = st.button('Reset previously generated JSON')
|
| 106 |
+
|
| 107 |
+
if reset_json:
|
| 108 |
+
st.session_state.solution = {}
|
| 109 |
+
|
| 110 |
+
if merge_json and uploaded_solution_json:
|
| 111 |
+
try:
|
| 112 |
+
st.session_state.solution = json.loads(uploaded_solution_json)
|
| 113 |
+
st.success('JSON loaded successfully')
|
| 114 |
+
except json.JSONDecodeError:
|
| 115 |
+
st.error('Invalid JSON format')
|
| 116 |
+
|
| 117 |
+
# Sidebar for uploading previously user and service description JSON
|
| 118 |
+
with st.sidebar.expander('Previously generated user and service description JSON'):
|
| 119 |
+
previously_generated_user_service_json = {
|
| 120 |
+
'services': st.session_state.services,
|
| 121 |
+
'users': st.session_state.users
|
| 122 |
+
}
|
| 123 |
+
uploaded_user_service_json = st.text_area('Paste your user and service JSON here',
|
| 124 |
+
value=json.dumps(previously_generated_user_service_json, indent=4)
|
| 125 |
+
)
|
| 126 |
+
merge_json = st.button('Merge user and service description JSON')
|
| 127 |
+
reset_json = st.button('Reset user and service description JSON')
|
| 128 |
|
| 129 |
if reset_json:
|
| 130 |
st.session_state.services = {}
|
| 131 |
st.session_state.users = {}
|
| 132 |
|
| 133 |
+
if merge_json and uploaded_user_service_json:
|
| 134 |
try:
|
| 135 |
+
loaded_data = json.loads(uploaded_user_service_json)
|
| 136 |
st.session_state.services.update(loaded_data.get('services', {}))
|
| 137 |
st.session_state.users.update(loaded_data.get('users', {}))
|
| 138 |
st.success('JSON loaded successfully')
|
|
|
|
| 145 |
|
| 146 |
if object_type == 'Service':
|
| 147 |
service_key = st.selectbox('Select a service', list(st.session_state.services.keys()), key='update_service_key')
|
| 148 |
+
if service_key:
|
| 149 |
+
if st.button('Load Service'):
|
| 150 |
+
st.session_state.service_name = service_key
|
| 151 |
+
st.session_state.min_val = st.session_state.services[service_key]['min']
|
| 152 |
+
st.session_state.rec_val = st.session_state.services[service_key]['rec']
|
| 153 |
+
st.session_state.max_val = st.session_state.services[service_key]['max']
|
| 154 |
+
st.session_state.priority = st.session_state.services[service_key]['priority']
|
| 155 |
+
if st.button('Drop Service'):
|
| 156 |
+
del st.session_state.services[service_key]
|
| 157 |
|
| 158 |
elif object_type == 'User':
|
| 159 |
user_key = st.selectbox('Select a user', list(st.session_state.users.keys()), key='update_user_key')
|
| 160 |
+
if user_key:
|
| 161 |
+
if st.button('Load User'):
|
| 162 |
+
st.session_state.user_name = user_key
|
| 163 |
+
st.session_state.max_assignments = st.session_state.users[user_key]['max_assignments']
|
| 164 |
+
st.session_state.preferences = st.session_state.users[user_key]['preferences']
|
| 165 |
+
st.session_state.cannot_assign = st.session_state.users[user_key]['cannot_assign']
|
| 166 |
+
if st.button('Drop User'):
|
| 167 |
+
del st.session_state.users[user_key]
|
| 168 |
|
| 169 |
# Add a service form
|
| 170 |
with st.form(key='service_form'):
|
|
|
|
| 215 |
json_data = json.dumps(combined_data, indent=4)
|
| 216 |
|
| 217 |
# Display the generated JSON
|
| 218 |
+
st.subheader('Generated user and services JSON')
|
| 219 |
st.code(json_data, language='json')
|
| 220 |
|
| 221 |
# Button to copy JSON to clipboard
|
main.py
CHANGED
|
@@ -1,6 +1,7 @@
|
|
| 1 |
import copy
|
| 2 |
import random
|
| 3 |
-
from
|
|
|
|
| 4 |
|
| 5 |
|
| 6 |
def initialize_population(services: dict, users: dict, population_size: int) -> list:
|
|
@@ -83,35 +84,101 @@ def default_fitness_function(assignment_solution: dict, services: dict, users: d
|
|
| 83 |
Returns:
|
| 84 |
float: The fitness score of the given assignment solution.
|
| 85 |
"""
|
| 86 |
-
fitness =
|
| 87 |
|
| 88 |
for service, assigned_users in assignment_solution.items():
|
| 89 |
service_info = services[service]
|
| 90 |
num_assigned_users = len(assigned_users)
|
| 91 |
|
| 92 |
-
#
|
|
|
|
| 93 |
if service_info["min"] <= num_assigned_users <= service_info["max"]:
|
| 94 |
-
fitness += abs(num_assigned_users - service_info["rec"])
|
| 95 |
|
| 96 |
# Punish solutions that assign users below the minimum value
|
|
|
|
| 97 |
elif num_assigned_users < service_info["min"]:
|
| 98 |
-
fitness
|
| 99 |
|
| 100 |
# Punish solutions that assign users above the maximum value
|
|
|
|
| 101 |
else: # num_assigned_users > service_info["max"]:
|
| 102 |
-
fitness
|
| 103 |
|
| 104 |
# Punish solutions that assign users to their cannot_assign services
|
|
|
|
| 105 |
for user in assigned_users:
|
| 106 |
if service in users[user]["cannot_assign"]:
|
| 107 |
-
fitness
|
| 108 |
|
| 109 |
# Bonus solutions that assign users to their preferred services
|
|
|
|
| 110 |
for user, user_info in users.items():
|
| 111 |
if service in user_info["preferences"] and user in assigned_users:
|
| 112 |
-
fitness
|
| 113 |
|
| 114 |
-
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 115 |
|
| 116 |
|
| 117 |
def selection(fitness_scores: list) -> Tuple[int, int]:
|
|
@@ -332,8 +399,51 @@ def polish_errors(errors: dict) -> dict:
|
|
| 332 |
return polished_errors
|
| 333 |
|
| 334 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 335 |
def genetic_algorithm(services: dict, users: dict, population_size: int = 100, num_generations: int = 100,
|
| 336 |
-
mutation_rate: float = 0.01, fitness_fn: Optional[Callable] = None
|
|
|
|
| 337 |
"""
|
| 338 |
Run the genetic algorithm to find an optimal assignment solution based on user preferences and constraints.
|
| 339 |
|
|
@@ -344,12 +454,13 @@ def genetic_algorithm(services: dict, users: dict, population_size: int = 100, n
|
|
| 344 |
num_generations (int): The number of generations for the genetic algorithm to run (default: 100).
|
| 345 |
mutation_rate (float): The probability of mutation for each individual in the population (default: 0.01).
|
| 346 |
fitness_fn (Callable, optional): An optional custom fitness function.
|
|
|
|
| 347 |
|
| 348 |
Returns:
|
| 349 |
dict: The best assignment solution found by the genetic algorithm.
|
| 350 |
"""
|
| 351 |
# Initialize the population
|
| 352 |
-
population = initialize_population(services, users, population_size)
|
| 353 |
|
| 354 |
# If no custom fitness function is provided, use the default fitness function
|
| 355 |
if fitness_fn is None:
|
|
@@ -389,3 +500,38 @@ def genetic_algorithm(services: dict, users: dict, population_size: int = 100, n
|
|
| 389 |
report_generation(generation, fitness_scores, best_solution, services, users)
|
| 390 |
|
| 391 |
return best_solution
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import copy
|
| 2 |
import random
|
| 3 |
+
from functools import partial
|
| 4 |
+
from typing import Callable, Optional, Tuple, List
|
| 5 |
|
| 6 |
|
| 7 |
def initialize_population(services: dict, users: dict, population_size: int) -> list:
|
|
|
|
| 84 |
Returns:
|
| 85 |
float: The fitness score of the given assignment solution.
|
| 86 |
"""
|
| 87 |
+
fitness = -100
|
| 88 |
|
| 89 |
for service, assigned_users in assignment_solution.items():
|
| 90 |
service_info = services[service]
|
| 91 |
num_assigned_users = len(assigned_users)
|
| 92 |
|
| 93 |
+
# Prefers for solutions that assign users near the recommended value
|
| 94 |
+
# (positive or negative value is punishment, score 0 for the best fit)
|
| 95 |
if service_info["min"] <= num_assigned_users <= service_info["max"]:
|
| 96 |
+
fitness += abs(num_assigned_users - service_info["rec"]) * service_info["priority"]
|
| 97 |
|
| 98 |
# Punish solutions that assign users below the minimum value
|
| 99 |
+
# (positive value is punishment)
|
| 100 |
elif num_assigned_users < service_info["min"]:
|
| 101 |
+
fitness += (service_info["min"] - num_assigned_users) * service_info["priority"]
|
| 102 |
|
| 103 |
# Punish solutions that assign users above the maximum value
|
| 104 |
+
# (positive value is punishment)
|
| 105 |
else: # num_assigned_users > service_info["max"]:
|
| 106 |
+
fitness += (num_assigned_users - service_info["max"]) * service_info["priority"]
|
| 107 |
|
| 108 |
# Punish solutions that assign users to their cannot_assign services
|
| 109 |
+
# (positive value is punishment)
|
| 110 |
for user in assigned_users:
|
| 111 |
if service in users[user]["cannot_assign"]:
|
| 112 |
+
fitness += 100 * service_info["priority"]
|
| 113 |
|
| 114 |
# Bonus solutions that assign users to their preferred services
|
| 115 |
+
# (negative value is bonus)
|
| 116 |
for user, user_info in users.items():
|
| 117 |
if service in user_info["preferences"] and user in assigned_users:
|
| 118 |
+
fitness -= 20
|
| 119 |
|
| 120 |
+
return fitness
|
| 121 |
+
|
| 122 |
+
|
| 123 |
+
def least_changed_fitness_function(prev_solution: dict, solution: dict, services: dict, users: dict) -> float:
|
| 124 |
+
"""
|
| 125 |
+
A fitness function that favors solutions with the least changes from a previous solution.
|
| 126 |
+
|
| 127 |
+
Args:
|
| 128 |
+
solution (dict): The assignment solution to evaluate.
|
| 129 |
+
services (dict): The input services dictionary.
|
| 130 |
+
users (dict): The input users dictionary.
|
| 131 |
+
prev_solution (dict): The previous assignment solution to compare against.
|
| 132 |
+
|
| 133 |
+
Returns:
|
| 134 |
+
float: A fitness score for the assignment solution.
|
| 135 |
+
"""
|
| 136 |
+
# Add the default fitness function score
|
| 137 |
+
fitness = default_fitness_function(solution, services, users)
|
| 138 |
+
|
| 139 |
+
# Bonus for users assigned to the same services as in the previous solution
|
| 140 |
+
same_user_assignments = 0
|
| 141 |
+
for service, assigned_users in solution.items():
|
| 142 |
+
if service in prev_solution:
|
| 143 |
+
prev_assigned_users = prev_solution[service]
|
| 144 |
+
same_users = set(assigned_users).intersection(set(prev_assigned_users))
|
| 145 |
+
same_user_assignments += len(same_users)
|
| 146 |
+
|
| 147 |
+
user_bonus = same_user_assignments * 100 # / sum(len(user_data["preferences"]) for user_data in users.values())
|
| 148 |
+
# (positive value is punishment)
|
| 149 |
+
fitness -= user_bonus
|
| 150 |
+
|
| 151 |
+
# Bonus for services having the same users assigned as in the previous solution
|
| 152 |
+
same_service_assignments = 0
|
| 153 |
+
for service, assigned_users in solution.items():
|
| 154 |
+
if service in prev_solution:
|
| 155 |
+
prev_assigned_users = prev_solution[service]
|
| 156 |
+
same_users = set(assigned_users).intersection(set(prev_assigned_users))
|
| 157 |
+
same_service_assignments += len(same_users)
|
| 158 |
+
|
| 159 |
+
service_bonus = same_service_assignments * 100
|
| 160 |
+
# (positive value is punishment)
|
| 161 |
+
fitness -= service_bonus
|
| 162 |
+
|
| 163 |
+
# Malus for user and service changes
|
| 164 |
+
user_changes = 0
|
| 165 |
+
service_changes = 0
|
| 166 |
+
|
| 167 |
+
for service, service_data in services.items():
|
| 168 |
+
if service in prev_solution:
|
| 169 |
+
prev_service_data = prev_solution[service]
|
| 170 |
+
if service_data != prev_service_data:
|
| 171 |
+
service_changes += 1
|
| 172 |
+
for user, user_data in users.items():
|
| 173 |
+
if user in prev_service_data and user not in service_data:
|
| 174 |
+
user_changes += 1
|
| 175 |
+
|
| 176 |
+
user_malus = user_changes
|
| 177 |
+
service_malus = service_changes
|
| 178 |
+
# (positive value is punishment)
|
| 179 |
+
fitness += (user_malus + service_malus)
|
| 180 |
+
|
| 181 |
+
return fitness
|
| 182 |
|
| 183 |
|
| 184 |
def selection(fitness_scores: list) -> Tuple[int, int]:
|
|
|
|
| 399 |
return polished_errors
|
| 400 |
|
| 401 |
|
| 402 |
+
def calculate_diff(solution1: dict, solution2: dict) -> dict:
|
| 403 |
+
"""
|
| 404 |
+
Calculate the differences between two solution JSON objects and return the differences categorized into
|
| 405 |
+
"added" and "removed" attributes for each service.
|
| 406 |
+
|
| 407 |
+
Args:
|
| 408 |
+
solution1 (dict): The first solution JSON object.
|
| 409 |
+
solution2 (dict): The second solution JSON object.
|
| 410 |
+
|
| 411 |
+
Returns:
|
| 412 |
+
dict: A dictionary with the differences between the two solutions, categorized into "added" and
|
| 413 |
+
"removed" attributes for each service.
|
| 414 |
+
"""
|
| 415 |
+
diff = {}
|
| 416 |
+
|
| 417 |
+
all_services = set(solution1.keys()).union(set(solution2.keys()))
|
| 418 |
+
|
| 419 |
+
for service in all_services:
|
| 420 |
+
service_diff = {
|
| 421 |
+
"added": [],
|
| 422 |
+
"removed": []
|
| 423 |
+
}
|
| 424 |
+
|
| 425 |
+
if service not in solution1:
|
| 426 |
+
service_diff["added"] = solution2[service]
|
| 427 |
+
elif service not in solution2:
|
| 428 |
+
service_diff["removed"] = solution1[service]
|
| 429 |
+
else:
|
| 430 |
+
added_users = set(solution2[service]) - set(solution1[service])
|
| 431 |
+
removed_users = set(solution1[service]) - set(solution2[service])
|
| 432 |
+
|
| 433 |
+
if added_users:
|
| 434 |
+
service_diff["added"] = list(added_users)
|
| 435 |
+
if removed_users:
|
| 436 |
+
service_diff["removed"] = list(removed_users)
|
| 437 |
+
|
| 438 |
+
if service_diff["added"] or service_diff["removed"]:
|
| 439 |
+
diff[service] = service_diff
|
| 440 |
+
|
| 441 |
+
return diff
|
| 442 |
+
|
| 443 |
+
|
| 444 |
def genetic_algorithm(services: dict, users: dict, population_size: int = 100, num_generations: int = 100,
|
| 445 |
+
mutation_rate: float = 0.01, fitness_fn: Optional[Callable] = None,
|
| 446 |
+
initial_population: Optional[List[dict]] = None) -> dict:
|
| 447 |
"""
|
| 448 |
Run the genetic algorithm to find an optimal assignment solution based on user preferences and constraints.
|
| 449 |
|
|
|
|
| 454 |
num_generations (int): The number of generations for the genetic algorithm to run (default: 100).
|
| 455 |
mutation_rate (float): The probability of mutation for each individual in the population (default: 0.01).
|
| 456 |
fitness_fn (Callable, optional): An optional custom fitness function.
|
| 457 |
+
initial_population: An optional previous solution to be used as starting point
|
| 458 |
|
| 459 |
Returns:
|
| 460 |
dict: The best assignment solution found by the genetic algorithm.
|
| 461 |
"""
|
| 462 |
# Initialize the population
|
| 463 |
+
population = initial_population or initialize_population(services, users, population_size)
|
| 464 |
|
| 465 |
# If no custom fitness function is provided, use the default fitness function
|
| 466 |
if fitness_fn is None:
|
|
|
|
| 500 |
report_generation(generation, fitness_scores, best_solution, services, users)
|
| 501 |
|
| 502 |
return best_solution
|
| 503 |
+
|
| 504 |
+
|
| 505 |
+
def update_genetic_algorithm(prev_solution: dict, updated_services: dict, updated_users: dict,
|
| 506 |
+
population_size: int = 100, num_generations: int = 100, mutation_rate: float = 0.01,
|
| 507 |
+
fitness_fn: Optional[Callable] = None) -> dict:
|
| 508 |
+
"""
|
| 509 |
+
Update the previous assignment solution with updated services and users using a genetic algorithm.
|
| 510 |
+
|
| 511 |
+
Args:
|
| 512 |
+
prev_solution (dict): The previous assignment solution.
|
| 513 |
+
updated_services (dict): The updated services dictionary.
|
| 514 |
+
updated_users (dict): The updated users dictionary.
|
| 515 |
+
population_size (int): The size of the population for each generation in the genetic algorithm (default: 100).
|
| 516 |
+
num_generations (int): The number of generations for the genetic algorithm to run (default: 100).
|
| 517 |
+
mutation_rate (float): The probability of mutation for each individual in the population (default: 0.01).
|
| 518 |
+
fitness_fn (Optional[Callable]): An optional fitness function to use in the genetic algorithm.
|
| 519 |
+
|
| 520 |
+
Returns:
|
| 521 |
+
dict: The updated assignment solution.
|
| 522 |
+
"""
|
| 523 |
+
|
| 524 |
+
# Create the initial population with the given previous solution
|
| 525 |
+
population = initialize_population(updated_services, updated_users, population_size - 1)
|
| 526 |
+
population.append(prev_solution)
|
| 527 |
+
|
| 528 |
+
if fitness_fn is None:
|
| 529 |
+
fitness_fn = partial(least_changed_fitness_function, prev_solution)
|
| 530 |
+
|
| 531 |
+
# Run the genetic algorithm using the initial population
|
| 532 |
+
updated_solution = genetic_algorithm(updated_services, updated_users, population_size, num_generations,
|
| 533 |
+
mutation_rate,
|
| 534 |
+
fitness_fn,
|
| 535 |
+
population)
|
| 536 |
+
|
| 537 |
+
return updated_solution
|