import logging from typing import Dict, Optional, Set, Tuple import numpy as np import pandas as pd from config import Config logger = logging.getLogger(__name__) class DataProcessor: def __init__(self, config: Config): self.config = config self.raw_data: Dict[str, pd.DataFrame] = {} self.processed_data: pd.DataFrame = pd.DataFrame() self.elective_codes: Set[str] = set() def load_and_process(self) -> Tuple[pd.DataFrame, Set[str]]: self._load_excel() self._validate_raw_data() return self._preprocess() def _load_excel(self): try: sheets = pd.read_excel(self.config.data.FILE_PATH, sheet_name=None) self.raw_data = { "courses": sheets[self.config.data.SHEET_COURSES], "offerings": sheets[self.config.data.SHEET_OFFERINGS], "students_yearly": sheets[self.config.data.SHEET_STUDENTS_YEARLY], "students_ind": sheets[self.config.data.SHEET_STUDENTS_INDIVIDUAL], } except Exception as e: logger.error(f"Failed to load Excel: {e}") raise def _validate_raw_data(self): req_cols = { "courses": ["kode_mk", "kategori_mk"], "students_ind": ["kode_mk", "thn", "smt", "kode_mhs"], "students_yearly": ["thn", "smt", "jumlah_aktif"], } for key, cols in req_cols.items(): if not all(col in self.raw_data[key].columns for col in cols): raise ValueError(f"Missing columns in {key}: {cols}") def get_actual_classes_opened( self, year: int, semester: int, course_code: Optional[str] = None ) -> Dict[str, int]: offerings = self.raw_data.get("offerings") if offerings is None or len(offerings) == 0: logger.warning("No offerings data (tabel2) available") return {} # Standardize column names offerings = offerings.copy() for old_col, new_col in self.config.data.OFFERINGS_RENAME.items(): if old_col in offerings.columns and new_col not in offerings.columns: offerings = offerings.rename(columns={old_col: new_col}) # Log column names for debugging logger.debug(f"Offerings columns: {offerings.columns.tolist()}") # Filter by year and semester mask = (offerings["thn"] == year) & (offerings["smt"] == semester) if course_code: mask = mask & (offerings["kode_mk"] == course_code) filtered = offerings[mask] if len(filtered) == 0: logger.info(f"No class offerings found for {year} semester {semester}") return {} class_id_candidates = [ "kelas_id", "id_kelas", "kode_kelas", "class_id", "kelas", "section_id", "section", ] class_id_col = None for col in class_id_candidates: if col in filtered.columns: class_id_col = col logger.debug(f"Using class ID column: {col}") break if class_id_col is None: cols = filtered.columns.tolist() if len(cols) > 2: potential_id_col = cols[2] non_id_cols = [ "nama_mk", "smt", "thn", "semester", "tahun", "kuota", "kapasitas", ] if potential_id_col.lower() not in non_id_cols: class_id_col = potential_id_col logger.debug( f"Using positional class ID column (index 2): {potential_id_col}" ) result = {} for kode_mk in filtered["kode_mk"].unique(): course_data = filtered[filtered["kode_mk"] == kode_mk] if class_id_col and class_id_col in course_data.columns: unique_classes = course_data[class_id_col].nunique() logger.debug( f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (by {class_id_col})" ) else: all_cols = course_data.columns.tolist() dosen_cols = [ col for col in all_cols if "dosen" in col.lower() or "pengajar" in col.lower() or "teacher" in col.lower() ] if len(all_cols) > 0: last_col = all_cols[-1] if last_col not in dosen_cols: non_last_cols = [c for c in all_cols if c != last_col] if len(non_last_cols) > 0: grouped = course_data.groupby(non_last_cols)[ last_col ].nunique() if (grouped > 1).any(): dosen_cols.append(last_col) non_dosen_cols = [col for col in all_cols if col not in dosen_cols] if non_dosen_cols: unique_classes = len( course_data.drop_duplicates(subset=non_dosen_cols) ) else: unique_classes = len(course_data.drop_duplicates()) logger.debug( f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (fallback method)" ) result[kode_mk] = max(1, unique_classes) logger.info( f"Found {len(result)} courses with {sum(result.values())} total classes for {year} sem {semester}" ) return result def get_class_count_for_validation(self, year: int, semester: int) -> pd.DataFrame: actual_classes = self.get_actual_classes_opened(year, semester) if not actual_classes: return pd.DataFrame(columns=["kode_mk", "actual_classes"]) return pd.DataFrame( [ {"kode_mk": kode, "actual_classes": count} for kode, count in actual_classes.items() ] ) def _clean_courses_data(self, courses: pd.DataFrame) -> pd.DataFrame: initial_count = len(courses) # Remove duplicate courses = courses.drop_duplicates() if len(courses) < initial_count: logger.info( f" Removed {initial_count - len(courses)} exact duplicate rows" ) # Standardize kategori_mk courses["kategori_mk"] = ( courses["kategori_mk"] .astype(str) .str.upper() .str.strip() .replace("", np.nan) ) # Remove rows with missing critical data before_dropna = len(courses) courses = courses.dropna(subset=["kode_mk", "kategori_mk"]).copy() if len(courses) < before_dropna: logger.info( f" Removed {before_dropna - len(courses)} rows with missing kode_mk or kategori_mk" ) # Validate kategori_mk values valid_categories = {"P", "W"} invalid_mask = ~courses["kategori_mk"].isin(valid_categories) if invalid_mask.any(): invalid_cats = courses[invalid_mask]["kategori_mk"].unique() logger.warning( f" Found {invalid_mask.sum()} courses with invalid categories: {invalid_cats}" ) logger.warning(" Keeping only valid categories (P, W)") courses = courses[~invalid_mask] # Remove duplicate course codes (keep first) before_dedup = len(courses) courses = courses.drop_duplicates(subset="kode_mk", keep="first") if len(courses) < before_dedup: logger.info( f" Removed {before_dedup - len(courses)} duplicate course codes (kept first occurrence)" ) logger.info(f" Final course catalog: {len(courses)} unique courses") return courses def _clean_students_data(self, students: pd.DataFrame) -> pd.DataFrame: initial_count = len(students) # Remove rows with missing critical data students = students.dropna(subset=["kode_mk", "thn", "smt", "kode_mhs"]).copy() if len(students) < initial_count: logger.info( f" Removed {initial_count - len(students)} rows with missing critical data" ) # Ensure correct data types students.loc[:, "thn"] = pd.to_numeric(students["thn"], errors="coerce") students.loc[:, "smt"] = pd.to_numeric(students["smt"], errors="coerce") # Remove rows with invalid year/semester after conversion before_invalid = len(students) students = students.dropna(subset=["thn", "smt"]).copy() if len(students) < before_invalid: logger.info( f" Removed {before_invalid - len(students)} rows with invalid year/semester values" ) # Validate semester values valid_semesters = {1, 2} invalid_sem = ~students["smt"].isin(valid_semesters) if invalid_sem.any(): logger.warning( f" Found {invalid_sem.sum()} records with invalid semester values" ) students = students[~invalid_sem].copy() # Validate year range current_year = pd.Timestamp.now().year invalid_year = (students["thn"] < 2000) | (students["thn"] > current_year + 1) if invalid_year.any(): logger.warning( f" Found {invalid_year.sum()} records with unreasonable year values" ) students = students[~invalid_year].copy() # Remove exact duplicate enrollments (same student, course, semester) before_dedup = len(students) students = students.drop_duplicates( subset=["kode_mhs", "kode_mk", "thn", "smt"], keep="first" ) if len(students) < before_dedup: logger.info( f" Removed {before_dedup - len(students)} duplicate enrollment records" ) logger.info(f" Final enrollment records: {len(students)}") return students def _clean_yearly_population(self, yearly_pop: pd.DataFrame) -> pd.DataFrame: # Remove duplicate year-semester combinations before_dedup = len(yearly_pop) yearly_pop = yearly_pop.drop_duplicates(subset=["thn", "smt"], keep="first") if len(yearly_pop) < before_dedup: logger.info( f" Removed {before_dedup - len(yearly_pop)} duplicate year-semester records" ) # Ensure jumlah_aktif is numeric and positive yearly_pop["jumlah_aktif"] = pd.to_numeric( yearly_pop["jumlah_aktif"], errors="coerce" ) # Replace zero or negative values with NaN yearly_pop.loc[yearly_pop["jumlah_aktif"] <= 0, "jumlah_aktif"] = np.nan # Sort by year and semester yearly_pop = yearly_pop.sort_values(["thn", "smt"]).reset_index(drop=True) logger.info(f" Yearly population records: {len(yearly_pop)}") return yearly_pop def _preprocess(self) -> Tuple[pd.DataFrame, Set[str]]: # Clean course catalog courses = self._clean_courses_data(self.raw_data["courses"].copy()) # Identify elective courses elective_category = self.config.data.ELECTIVE_CATEGORY self.elective_codes = set( courses[courses["kategori_mk"] == elective_category]["kode_mk"] ) if len(self.elective_codes) == 0: logger.warning( f"No elective courses found! Check if kategori_mk = '{elective_category}' exists in data." ) logger.warning( f"Elective identification rule: {self.config.get_elective_filter_description()}" ) return pd.DataFrame(), set() # Clean student enrollment data students = self._clean_students_data(self.raw_data["students_ind"].copy()) # Filter for elective courses only students = students[students["kode_mk"].isin(self.elective_codes)] if len(students) == 0: logger.warning("No enrollment data found for elective courses!") return pd.DataFrame(), self.elective_codes # Aggregate enrollment by course-semester enrollment = ( students.groupby(["kode_mk", "thn", "smt"])["kode_mhs"] .nunique() .reset_index(name="enrollment") ) # Clean yearly population data yearly_pop = self._clean_yearly_population( self.raw_data["students_yearly"][["thn", "smt", "jumlah_aktif"]].copy() ) # Merge enrollment with population data df = enrollment.merge(yearly_pop, on=["thn", "smt"], how="left") # Handle missing population data missing_pop = df["jumlah_aktif"].isna().sum() if missing_pop > 0: df["jumlah_aktif"] = df["jumlah_aktif"].ffill().bfill() if df["jumlah_aktif"].isna().any(): default_pop = 500 df["jumlah_aktif"] = df["jumlah_aktif"].fillna(default_pop) # Validate enrollment data df = self._validate_enrollment_data(df) # Sort and finalize df = df.sort_values(["kode_mk", "thn", "smt"]).reset_index(drop=True) self.processed_data = df return df, self.elective_codes def _validate_enrollment_data(self, df: pd.DataFrame) -> pd.DataFrame: # Remove zero enrollments df = df[df["enrollment"] > 0] # Check for extreme outliers in enrollment for course in df["kode_mk"].unique(): course_data = df[df["kode_mk"] == course]["enrollment"] if len(course_data) > 1: q75, q25 = course_data.quantile([0.75, 0.25]) iqr = q75 - q25 upper_bound = q75 + (3 * iqr) outliers = course_data > upper_bound if outliers.any(): logger.debug( f" Course {course} has {outliers.sum()} potential outliers (keeping them)" ) # Ensure population is reasonable if (df["jumlah_aktif"] < 50).any(): logger.warning(" Some semesters have very low student population (<50)") return df