classquota / data_processor.py
muhalwan's picture
'data'
bd4c80f
import logging
from typing import Dict, Optional, Set, Tuple
import numpy as np
import pandas as pd
from config import Config
logger = logging.getLogger(__name__)
class DataProcessor:
def __init__(self, config: Config):
self.config = config
self.raw_data: Dict[str, pd.DataFrame] = {}
self.processed_data: pd.DataFrame = pd.DataFrame()
self.elective_codes: Set[str] = set()
def load_and_process(self) -> Tuple[pd.DataFrame, Set[str]]:
self._load_excel()
self._validate_raw_data()
return self._preprocess()
def _load_excel(self):
try:
sheets = pd.read_excel(self.config.data.FILE_PATH, sheet_name=None)
self.raw_data = {
"courses": sheets[self.config.data.SHEET_COURSES],
"offerings": sheets[self.config.data.SHEET_OFFERINGS],
"students_yearly": sheets[self.config.data.SHEET_STUDENTS_YEARLY],
"students_ind": sheets[self.config.data.SHEET_STUDENTS_INDIVIDUAL],
}
except Exception as e:
logger.error(f"Failed to load Excel: {e}")
raise
def _validate_raw_data(self):
req_cols = {
"courses": ["kode_mk", "kategori_mk"],
"students_ind": ["kode_mk", "thn", "smt", "kode_mhs"],
"students_yearly": ["thn", "smt", "jumlah_aktif"],
}
for key, cols in req_cols.items():
if not all(col in self.raw_data[key].columns for col in cols):
raise ValueError(f"Missing columns in {key}: {cols}")
def get_actual_classes_opened(
self, year: int, semester: int, course_code: Optional[str] = None
) -> Dict[str, int]:
offerings = self.raw_data.get("offerings")
if offerings is None or len(offerings) == 0:
logger.warning("No offerings data (tabel2) available")
return {}
# Standardize column names
offerings = offerings.copy()
for old_col, new_col in self.config.data.OFFERINGS_RENAME.items():
if old_col in offerings.columns and new_col not in offerings.columns:
offerings = offerings.rename(columns={old_col: new_col})
# Log column names for debugging
logger.debug(f"Offerings columns: {offerings.columns.tolist()}")
# Filter by year and semester
mask = (offerings["thn"] == year) & (offerings["smt"] == semester)
if course_code:
mask = mask & (offerings["kode_mk"] == course_code)
filtered = offerings[mask]
if len(filtered) == 0:
logger.info(f"No class offerings found for {year} semester {semester}")
return {}
class_id_candidates = [
"kelas_id",
"id_kelas",
"kode_kelas",
"class_id",
"kelas",
"section_id",
"section",
]
class_id_col = None
for col in class_id_candidates:
if col in filtered.columns:
class_id_col = col
logger.debug(f"Using class ID column: {col}")
break
if class_id_col is None:
cols = filtered.columns.tolist()
if len(cols) > 2:
potential_id_col = cols[2]
non_id_cols = [
"nama_mk",
"smt",
"thn",
"semester",
"tahun",
"kuota",
"kapasitas",
]
if potential_id_col.lower() not in non_id_cols:
class_id_col = potential_id_col
logger.debug(
f"Using positional class ID column (index 2): {potential_id_col}"
)
result = {}
for kode_mk in filtered["kode_mk"].unique():
course_data = filtered[filtered["kode_mk"] == kode_mk]
if class_id_col and class_id_col in course_data.columns:
unique_classes = course_data[class_id_col].nunique()
logger.debug(
f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (by {class_id_col})"
)
else:
all_cols = course_data.columns.tolist()
dosen_cols = [
col
for col in all_cols
if "dosen" in col.lower()
or "pengajar" in col.lower()
or "teacher" in col.lower()
]
if len(all_cols) > 0:
last_col = all_cols[-1]
if last_col not in dosen_cols:
non_last_cols = [c for c in all_cols if c != last_col]
if len(non_last_cols) > 0:
grouped = course_data.groupby(non_last_cols)[
last_col
].nunique()
if (grouped > 1).any():
dosen_cols.append(last_col)
non_dosen_cols = [col for col in all_cols if col not in dosen_cols]
if non_dosen_cols:
unique_classes = len(
course_data.drop_duplicates(subset=non_dosen_cols)
)
else:
unique_classes = len(course_data.drop_duplicates())
logger.debug(
f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (fallback method)"
)
result[kode_mk] = max(1, unique_classes)
logger.info(
f"Found {len(result)} courses with {sum(result.values())} total classes for {year} sem {semester}"
)
return result
def get_class_count_for_validation(self, year: int, semester: int) -> pd.DataFrame:
actual_classes = self.get_actual_classes_opened(year, semester)
if not actual_classes:
return pd.DataFrame(columns=["kode_mk", "actual_classes"])
return pd.DataFrame(
[
{"kode_mk": kode, "actual_classes": count}
for kode, count in actual_classes.items()
]
)
def _clean_courses_data(self, courses: pd.DataFrame) -> pd.DataFrame:
initial_count = len(courses)
# Remove duplicate
courses = courses.drop_duplicates()
if len(courses) < initial_count:
logger.info(
f" Removed {initial_count - len(courses)} exact duplicate rows"
)
# Standardize kategori_mk
courses["kategori_mk"] = (
courses["kategori_mk"]
.astype(str)
.str.upper()
.str.strip()
.replace("", np.nan)
)
# Remove rows with missing critical data
before_dropna = len(courses)
courses = courses.dropna(subset=["kode_mk", "kategori_mk"]).copy()
if len(courses) < before_dropna:
logger.info(
f" Removed {before_dropna - len(courses)} rows with missing kode_mk or kategori_mk"
)
# Validate kategori_mk values
valid_categories = {"P", "W"}
invalid_mask = ~courses["kategori_mk"].isin(valid_categories)
if invalid_mask.any():
invalid_cats = courses[invalid_mask]["kategori_mk"].unique()
logger.warning(
f" Found {invalid_mask.sum()} courses with invalid categories: {invalid_cats}"
)
logger.warning(" Keeping only valid categories (P, W)")
courses = courses[~invalid_mask]
# Remove duplicate course codes (keep first)
before_dedup = len(courses)
courses = courses.drop_duplicates(subset="kode_mk", keep="first")
if len(courses) < before_dedup:
logger.info(
f" Removed {before_dedup - len(courses)} duplicate course codes (kept first occurrence)"
)
logger.info(f" Final course catalog: {len(courses)} unique courses")
return courses
def _clean_students_data(self, students: pd.DataFrame) -> pd.DataFrame:
initial_count = len(students)
# Remove rows with missing critical data
students = students.dropna(subset=["kode_mk", "thn", "smt", "kode_mhs"]).copy()
if len(students) < initial_count:
logger.info(
f" Removed {initial_count - len(students)} rows with missing critical data"
)
# Ensure correct data types
students.loc[:, "thn"] = pd.to_numeric(students["thn"], errors="coerce")
students.loc[:, "smt"] = pd.to_numeric(students["smt"], errors="coerce")
# Remove rows with invalid year/semester after conversion
before_invalid = len(students)
students = students.dropna(subset=["thn", "smt"]).copy()
if len(students) < before_invalid:
logger.info(
f" Removed {before_invalid - len(students)} rows with invalid year/semester values"
)
# Validate semester values
valid_semesters = {1, 2}
invalid_sem = ~students["smt"].isin(valid_semesters)
if invalid_sem.any():
logger.warning(
f" Found {invalid_sem.sum()} records with invalid semester values"
)
students = students[~invalid_sem].copy()
# Validate year range
current_year = pd.Timestamp.now().year
invalid_year = (students["thn"] < 2000) | (students["thn"] > current_year + 1)
if invalid_year.any():
logger.warning(
f" Found {invalid_year.sum()} records with unreasonable year values"
)
students = students[~invalid_year].copy()
# Remove exact duplicate enrollments (same student, course, semester)
before_dedup = len(students)
students = students.drop_duplicates(
subset=["kode_mhs", "kode_mk", "thn", "smt"], keep="first"
)
if len(students) < before_dedup:
logger.info(
f" Removed {before_dedup - len(students)} duplicate enrollment records"
)
logger.info(f" Final enrollment records: {len(students)}")
return students
def _clean_yearly_population(self, yearly_pop: pd.DataFrame) -> pd.DataFrame:
# Remove duplicate year-semester combinations
before_dedup = len(yearly_pop)
yearly_pop = yearly_pop.drop_duplicates(subset=["thn", "smt"], keep="first")
if len(yearly_pop) < before_dedup:
logger.info(
f" Removed {before_dedup - len(yearly_pop)} duplicate year-semester records"
)
# Ensure jumlah_aktif is numeric and positive
yearly_pop["jumlah_aktif"] = pd.to_numeric(
yearly_pop["jumlah_aktif"], errors="coerce"
)
# Replace zero or negative values with NaN
yearly_pop.loc[yearly_pop["jumlah_aktif"] <= 0, "jumlah_aktif"] = np.nan
# Sort by year and semester
yearly_pop = yearly_pop.sort_values(["thn", "smt"]).reset_index(drop=True)
logger.info(f" Yearly population records: {len(yearly_pop)}")
return yearly_pop
def _preprocess(self) -> Tuple[pd.DataFrame, Set[str]]:
# Clean course catalog
courses = self._clean_courses_data(self.raw_data["courses"].copy())
# Identify elective courses
elective_category = self.config.data.ELECTIVE_CATEGORY
self.elective_codes = set(
courses[courses["kategori_mk"] == elective_category]["kode_mk"]
)
if len(self.elective_codes) == 0:
logger.warning(
f"No elective courses found! Check if kategori_mk = '{elective_category}' exists in data."
)
logger.warning(
f"Elective identification rule: {self.config.get_elective_filter_description()}"
)
return pd.DataFrame(), set()
# Clean student enrollment data
students = self._clean_students_data(self.raw_data["students_ind"].copy())
# Filter for elective courses only
students = students[students["kode_mk"].isin(self.elective_codes)]
if len(students) == 0:
logger.warning("No enrollment data found for elective courses!")
return pd.DataFrame(), self.elective_codes
# Aggregate enrollment by course-semester
enrollment = (
students.groupby(["kode_mk", "thn", "smt"])["kode_mhs"]
.nunique()
.reset_index(name="enrollment")
)
# Clean yearly population data
yearly_pop = self._clean_yearly_population(
self.raw_data["students_yearly"][["thn", "smt", "jumlah_aktif"]].copy()
)
# Merge enrollment with population data
df = enrollment.merge(yearly_pop, on=["thn", "smt"], how="left")
# Handle missing population data
missing_pop = df["jumlah_aktif"].isna().sum()
if missing_pop > 0:
df["jumlah_aktif"] = df["jumlah_aktif"].ffill().bfill()
if df["jumlah_aktif"].isna().any():
default_pop = 500
df["jumlah_aktif"] = df["jumlah_aktif"].fillna(default_pop)
# Validate enrollment data
df = self._validate_enrollment_data(df)
# Sort and finalize
df = df.sort_values(["kode_mk", "thn", "smt"]).reset_index(drop=True)
self.processed_data = df
return df, self.elective_codes
def _validate_enrollment_data(self, df: pd.DataFrame) -> pd.DataFrame:
# Remove zero enrollments
df = df[df["enrollment"] > 0]
# Check for extreme outliers in enrollment
for course in df["kode_mk"].unique():
course_data = df[df["kode_mk"] == course]["enrollment"]
if len(course_data) > 1:
q75, q25 = course_data.quantile([0.75, 0.25])
iqr = q75 - q25
upper_bound = q75 + (3 * iqr)
outliers = course_data > upper_bound
if outliers.any():
logger.debug(
f" Course {course} has {outliers.sum()} potential outliers (keeping them)"
)
# Ensure population is reasonable
if (df["jumlah_aktif"] < 50).any():
logger.warning(" Some semesters have very low student population (<50)")
return df