File size: 7,370 Bytes
02fd3ca 26ebf77 5afa2a4 02fd3ca 5afa2a4 02fd3ca 5afa2a4 02fd3ca 5afa2a4 02fd3ca 26ebf77 02fd3ca 29608b7 02fd3ca 89d7197 02fd3ca 51181a6 89d7197 02fd3ca 51181a6 02fd3ca 5afa2a4 29608b7 5afa2a4 29608b7 5afa2a4 29608b7 5afa2a4 29608b7 5afa2a4 29608b7 26ebf77 5afa2a4 26ebf77 5afa2a4 26ebf77 e542954 26ebf77 e542954 26ebf77 02fd3ca 89d7197 26ebf77 89d7197 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | import pandas as pd
import datetime
from datetime import date, timedelta
import json
import os
# Default dates - will be overridden by optimization_config.py
START_DATE = pd.Timestamp(2025, 7, 7)
END_DATE = pd.Timestamp(2025, 7, 11)
def set_global_dates(start_date, end_date):
"""Update global START_DATE and END_DATE variables"""
global START_DATE, END_DATE
START_DATE = pd.Timestamp(start_date)
END_DATE = pd.Timestamp(end_date)
print(f"Updated global dates: {START_DATE} to {END_DATE}")
def read_excel(path: str) -> pd.DataFrame:
return pd.read_excel(path, dtype={"id": "Int64"})
def read_demand_data(
path="data/real_data_excel/converted_csv/COOIS_Planned_and_Released.csv",
start_date=None,
end_date=None,
) -> pd.DataFrame:
df = pd.read_csv(path)
df["Basic start date"] = pd.to_datetime(df["Basic start date"])
# df["Basic finish date"] = pd.to_datetime(df["Basic finish date"])
# Use provided dates or fall back to module defaults
filter_start_date = start_date if start_date is not None else START_DATE
filter_end_date = end_date if end_date is not None else END_DATE
df = df[(df["Basic start date"] == filter_start_date)]
return df
def read_kit_line_match_data(
path="data/real_data_excel/converted_csv/Kit_Composition_and_relation_cleaned_with_line_type.csv",
) -> pd.DataFrame:
return pd.read_csv(path)
def read_employee_data(
path="data/real_data_excel/converted_csv/WH_Workforce_Hourly_Pay_Scale_processed.csv",
) -> pd.DataFrame:
return pd.read_csv(path)
def get_shift_info(
path = "data/real_data_excel/converted_csv/work_shift.csv"
) -> pd.DataFrame:
df = pd.read_csv(path)
return df
def read_shift_cost_data(
path="data/real_data_excel/converted_csv/WH_Workforce_Hourly_Pay_Scale_processed.csv",
) -> pd.DataFrame:
return pd.read_csv(path)
def read_work_center_capacity(
path="data/real_data_excel/converted_csv/Work_Centre_Capacity.csv",
) -> pd.DataFrame:
return pd.read_csv(path)
def read_material_master(
path="data/real_data_excel/converted_csv/Material_Master_WMS.csv",
) -> pd.DataFrame:
return pd.read_csv(path)
def read_packaging_line_data(
path="data/real_data_excel/converted_csv/Work_Centre_Capacity_processed.csv",
) -> pd.DataFrame:
df = pd.read_csv(path)
# Filter for packaging lines only
df = df[df["line_for_packaging"] == True]
return df
def read_orders_data(
path="data/real_data_excel/converted_csv/COOIS_Planned_and_Released.csv",
start_date=None,
# end_date=None,
) -> pd.DataFrame:
"""
COOIS_Released_Prod_Orders.csv
Args:
path: path to the csv file
start_date: start date (pd.Timestamp or datetime)
Returns:
pd.DataFrame: filtered dataframe by date
"""
df = pd.read_csv(path)
assert len(df) > 0, "No data found in the file"
# convert date column to datetime
df["Basic start date"] = pd.to_datetime(df["Basic start date"])
# df["Basic finish date"] = pd.to_datetime(df["Basic finish date"])
# filter by date
if start_date is not None: # Filter for exact start date only
df = df[df["Basic start date"] == pd.to_datetime(start_date)]
else:
raise ValueError("start_date is required")
return df
def read_package_speed_data(
path="data/real_data_excel/converted_csv/Kits__Calculation.csv",
):
df = pd.read_csv(path, usecols=["Kit", "Kit per day","Paid work hours per day"])
df["Kit per day"] = df["Kit per day"].astype(float)
df["Paid work hours per day"] = df["Paid work hours per day"].astype(float)
df["Kit"] = df["Kit"].astype(str)
df['kits_per_hour'] = df['Kit per day']/df['Paid work hours per day']
speeds_per_hour = dict(zip(df["Kit"], df["kits_per_hour"]))
return speeds_per_hour
def read_personnel_requirement_data(
path="data/real_data_excel/converted_csv/Kits__Calculation.csv",
):
df = pd.read_csv(path, usecols=["Kit", "Humanizer", "UNICEF staff"])
# Clean the data by handling special whitespace characters like \xa0 (non-breaking space)
def clean_and_convert_to_float(value):
if pd.isna(value):
return 0.0
# Convert to string and strip all kinds of whitespace (including \xa0)
clean_value = str(value).strip()
# If empty after stripping, return 0
if clean_value == '' or clean_value == 'nan':
return 0.0
try:
return float(clean_value)
except ValueError as e:
print(f"Warning: Could not convert '{repr(value)}' to float, setting to 0. Error: {e}")
return 0.0
df["Humanizer"] = df["Humanizer"].apply(clean_and_convert_to_float)
df["UNICEF staff"] = df["UNICEF staff"].apply(clean_and_convert_to_float)
df["Kit"] = df["Kit"].astype(str)
return df
def get_production_order_data():
"""
Extract production order information from hierarchy.
Returns:
- kit_levels: {kit_id: level} where level 0=prepack, 1=subkit, 2=master
- dependencies: {kit_id: [dependency_list]}
- priority_order: [kit_ids] sorted by production priority
"""
path = "data/hierarchy_exports/kit_hierarchy.json"
with open(path, 'r', encoding='utf-8') as f:
hierarchy = json.load(f)
kit_levels = {}
dependencies = {}
# Process hierarchy to extract levels and dependencies
for master_id, master_data in hierarchy.items():
# Master kits are level 2
kit_levels[master_id] = 2
dependencies[master_id] = master_data.get('dependencies', [])
# Process subkits (level 1)
for subkit_id, subkit_data in master_data.get('subkits', {}).items():
kit_levels[subkit_id] = 1
dependencies[subkit_id] = subkit_data.get('dependencies', [])
# Process prepacks under subkits (level 0)
for prepack_id in subkit_data.get('prepacks', []):
if prepack_id not in kit_levels: # Avoid overwriting if already exists
kit_levels[prepack_id] = 0
dependencies[prepack_id] = []
# Process direct prepacks under master (level 0)
for prepack_id in master_data.get('direct_prepacks', []):
if prepack_id not in kit_levels: # Avoid overwriting if already exists
kit_levels[prepack_id] = 0
dependencies[prepack_id] = []
# Create priority order: prepacks first, then subkits, then masters
priority_order = []
# Level 0: Prepacks (highest priority)
prepacks = [kit for kit, level in kit_levels.items() if level == 0]
priority_order.extend(sorted(prepacks))
# Level 1: Subkits (medium priority)
subkits = [kit for kit, level in kit_levels.items() if level == 1]
priority_order.extend(sorted(subkits))
# Level 2: Masters (lowest priority)
masters = [kit for kit, level in kit_levels.items() if level == 2]
priority_order.extend(sorted(masters))
return kit_levels, dependencies, priority_order
if __name__ == "__main__":
employee_data = read_employee_data()
print("employee data")
print(employee_data)
print("line speed data",read_package_speed_data())
|