Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Dict, Optional, Set, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from config import Config | |
| logger = logging.getLogger(__name__) | |
| class DataProcessor: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.raw_data: Dict[str, pd.DataFrame] = {} | |
| self.processed_data: pd.DataFrame = pd.DataFrame() | |
| self.elective_codes: Set[str] = set() | |
| def load_and_process(self) -> Tuple[pd.DataFrame, Set[str]]: | |
| self._load_excel() | |
| self._validate_raw_data() | |
| return self._preprocess() | |
| def _load_excel(self): | |
| try: | |
| sheets = pd.read_excel(self.config.data.FILE_PATH, sheet_name=None) | |
| self.raw_data = { | |
| "courses": sheets[self.config.data.SHEET_COURSES], | |
| "offerings": sheets[self.config.data.SHEET_OFFERINGS], | |
| "students_yearly": sheets[self.config.data.SHEET_STUDENTS_YEARLY], | |
| "students_ind": sheets[self.config.data.SHEET_STUDENTS_INDIVIDUAL], | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to load Excel: {e}") | |
| raise | |
| def _validate_raw_data(self): | |
| req_cols = { | |
| "courses": ["kode_mk", "kategori_mk"], | |
| "students_ind": ["kode_mk", "thn", "smt", "kode_mhs"], | |
| "students_yearly": ["thn", "smt", "jumlah_aktif"], | |
| } | |
| for key, cols in req_cols.items(): | |
| if not all(col in self.raw_data[key].columns for col in cols): | |
| raise ValueError(f"Missing columns in {key}: {cols}") | |
| def get_actual_classes_opened( | |
| self, year: int, semester: int, course_code: Optional[str] = None | |
| ) -> Dict[str, int]: | |
| offerings = self.raw_data.get("offerings") | |
| if offerings is None or len(offerings) == 0: | |
| logger.warning("No offerings data (tabel2) available") | |
| return {} | |
| # Standardize column names | |
| offerings = offerings.copy() | |
| for old_col, new_col in self.config.data.OFFERINGS_RENAME.items(): | |
| if old_col in offerings.columns and new_col not in offerings.columns: | |
| offerings = offerings.rename(columns={old_col: new_col}) | |
| # Log column names for debugging | |
| logger.debug(f"Offerings columns: {offerings.columns.tolist()}") | |
| # Filter by year and semester | |
| mask = (offerings["thn"] == year) & (offerings["smt"] == semester) | |
| if course_code: | |
| mask = mask & (offerings["kode_mk"] == course_code) | |
| filtered = offerings[mask] | |
| if len(filtered) == 0: | |
| logger.info(f"No class offerings found for {year} semester {semester}") | |
| return {} | |
| class_id_candidates = [ | |
| "kelas_id", | |
| "id_kelas", | |
| "kode_kelas", | |
| "class_id", | |
| "kelas", | |
| "section_id", | |
| "section", | |
| ] | |
| class_id_col = None | |
| for col in class_id_candidates: | |
| if col in filtered.columns: | |
| class_id_col = col | |
| logger.debug(f"Using class ID column: {col}") | |
| break | |
| if class_id_col is None: | |
| cols = filtered.columns.tolist() | |
| if len(cols) > 2: | |
| potential_id_col = cols[2] | |
| non_id_cols = [ | |
| "nama_mk", | |
| "smt", | |
| "thn", | |
| "semester", | |
| "tahun", | |
| "kuota", | |
| "kapasitas", | |
| ] | |
| if potential_id_col.lower() not in non_id_cols: | |
| class_id_col = potential_id_col | |
| logger.debug( | |
| f"Using positional class ID column (index 2): {potential_id_col}" | |
| ) | |
| result = {} | |
| for kode_mk in filtered["kode_mk"].unique(): | |
| course_data = filtered[filtered["kode_mk"] == kode_mk] | |
| if class_id_col and class_id_col in course_data.columns: | |
| unique_classes = course_data[class_id_col].nunique() | |
| logger.debug( | |
| f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (by {class_id_col})" | |
| ) | |
| else: | |
| all_cols = course_data.columns.tolist() | |
| dosen_cols = [ | |
| col | |
| for col in all_cols | |
| if "dosen" in col.lower() | |
| or "pengajar" in col.lower() | |
| or "teacher" in col.lower() | |
| ] | |
| if len(all_cols) > 0: | |
| last_col = all_cols[-1] | |
| if last_col not in dosen_cols: | |
| non_last_cols = [c for c in all_cols if c != last_col] | |
| if len(non_last_cols) > 0: | |
| grouped = course_data.groupby(non_last_cols)[ | |
| last_col | |
| ].nunique() | |
| if (grouped > 1).any(): | |
| dosen_cols.append(last_col) | |
| non_dosen_cols = [col for col in all_cols if col not in dosen_cols] | |
| if non_dosen_cols: | |
| unique_classes = len( | |
| course_data.drop_duplicates(subset=non_dosen_cols) | |
| ) | |
| else: | |
| unique_classes = len(course_data.drop_duplicates()) | |
| logger.debug( | |
| f"Course {kode_mk}: {len(course_data)} rows, {unique_classes} unique classes (fallback method)" | |
| ) | |
| result[kode_mk] = max(1, unique_classes) | |
| logger.info( | |
| f"Found {len(result)} courses with {sum(result.values())} total classes for {year} sem {semester}" | |
| ) | |
| return result | |
| def get_class_count_for_validation(self, year: int, semester: int) -> pd.DataFrame: | |
| actual_classes = self.get_actual_classes_opened(year, semester) | |
| if not actual_classes: | |
| return pd.DataFrame(columns=["kode_mk", "actual_classes"]) | |
| return pd.DataFrame( | |
| [ | |
| {"kode_mk": kode, "actual_classes": count} | |
| for kode, count in actual_classes.items() | |
| ] | |
| ) | |
| def _clean_courses_data(self, courses: pd.DataFrame) -> pd.DataFrame: | |
| initial_count = len(courses) | |
| # Remove duplicate | |
| courses = courses.drop_duplicates() | |
| if len(courses) < initial_count: | |
| logger.info( | |
| f" Removed {initial_count - len(courses)} exact duplicate rows" | |
| ) | |
| # Standardize kategori_mk | |
| courses["kategori_mk"] = ( | |
| courses["kategori_mk"] | |
| .astype(str) | |
| .str.upper() | |
| .str.strip() | |
| .replace("", np.nan) | |
| ) | |
| # Remove rows with missing critical data | |
| before_dropna = len(courses) | |
| courses = courses.dropna(subset=["kode_mk", "kategori_mk"]).copy() | |
| if len(courses) < before_dropna: | |
| logger.info( | |
| f" Removed {before_dropna - len(courses)} rows with missing kode_mk or kategori_mk" | |
| ) | |
| # Validate kategori_mk values | |
| valid_categories = {"P", "W"} | |
| invalid_mask = ~courses["kategori_mk"].isin(valid_categories) | |
| if invalid_mask.any(): | |
| invalid_cats = courses[invalid_mask]["kategori_mk"].unique() | |
| logger.warning( | |
| f" Found {invalid_mask.sum()} courses with invalid categories: {invalid_cats}" | |
| ) | |
| logger.warning(" Keeping only valid categories (P, W)") | |
| courses = courses[~invalid_mask] | |
| # Remove duplicate course codes (keep first) | |
| before_dedup = len(courses) | |
| courses = courses.drop_duplicates(subset="kode_mk", keep="first") | |
| if len(courses) < before_dedup: | |
| logger.info( | |
| f" Removed {before_dedup - len(courses)} duplicate course codes (kept first occurrence)" | |
| ) | |
| logger.info(f" Final course catalog: {len(courses)} unique courses") | |
| return courses | |
| def _clean_students_data(self, students: pd.DataFrame) -> pd.DataFrame: | |
| initial_count = len(students) | |
| # Remove rows with missing critical data | |
| students = students.dropna(subset=["kode_mk", "thn", "smt", "kode_mhs"]).copy() | |
| if len(students) < initial_count: | |
| logger.info( | |
| f" Removed {initial_count - len(students)} rows with missing critical data" | |
| ) | |
| # Ensure correct data types | |
| students.loc[:, "thn"] = pd.to_numeric(students["thn"], errors="coerce") | |
| students.loc[:, "smt"] = pd.to_numeric(students["smt"], errors="coerce") | |
| # Remove rows with invalid year/semester after conversion | |
| before_invalid = len(students) | |
| students = students.dropna(subset=["thn", "smt"]).copy() | |
| if len(students) < before_invalid: | |
| logger.info( | |
| f" Removed {before_invalid - len(students)} rows with invalid year/semester values" | |
| ) | |
| # Validate semester values | |
| valid_semesters = {1, 2} | |
| invalid_sem = ~students["smt"].isin(valid_semesters) | |
| if invalid_sem.any(): | |
| logger.warning( | |
| f" Found {invalid_sem.sum()} records with invalid semester values" | |
| ) | |
| students = students[~invalid_sem].copy() | |
| # Validate year range | |
| current_year = pd.Timestamp.now().year | |
| invalid_year = (students["thn"] < 2000) | (students["thn"] > current_year + 1) | |
| if invalid_year.any(): | |
| logger.warning( | |
| f" Found {invalid_year.sum()} records with unreasonable year values" | |
| ) | |
| students = students[~invalid_year].copy() | |
| # Remove exact duplicate enrollments (same student, course, semester) | |
| before_dedup = len(students) | |
| students = students.drop_duplicates( | |
| subset=["kode_mhs", "kode_mk", "thn", "smt"], keep="first" | |
| ) | |
| if len(students) < before_dedup: | |
| logger.info( | |
| f" Removed {before_dedup - len(students)} duplicate enrollment records" | |
| ) | |
| logger.info(f" Final enrollment records: {len(students)}") | |
| return students | |
| def _clean_yearly_population(self, yearly_pop: pd.DataFrame) -> pd.DataFrame: | |
| # Remove duplicate year-semester combinations | |
| before_dedup = len(yearly_pop) | |
| yearly_pop = yearly_pop.drop_duplicates(subset=["thn", "smt"], keep="first") | |
| if len(yearly_pop) < before_dedup: | |
| logger.info( | |
| f" Removed {before_dedup - len(yearly_pop)} duplicate year-semester records" | |
| ) | |
| # Ensure jumlah_aktif is numeric and positive | |
| yearly_pop["jumlah_aktif"] = pd.to_numeric( | |
| yearly_pop["jumlah_aktif"], errors="coerce" | |
| ) | |
| # Replace zero or negative values with NaN | |
| yearly_pop.loc[yearly_pop["jumlah_aktif"] <= 0, "jumlah_aktif"] = np.nan | |
| # Sort by year and semester | |
| yearly_pop = yearly_pop.sort_values(["thn", "smt"]).reset_index(drop=True) | |
| logger.info(f" Yearly population records: {len(yearly_pop)}") | |
| return yearly_pop | |
| def _preprocess(self) -> Tuple[pd.DataFrame, Set[str]]: | |
| # Clean course catalog | |
| courses = self._clean_courses_data(self.raw_data["courses"].copy()) | |
| # Identify elective courses | |
| elective_category = self.config.data.ELECTIVE_CATEGORY | |
| self.elective_codes = set( | |
| courses[courses["kategori_mk"] == elective_category]["kode_mk"] | |
| ) | |
| if len(self.elective_codes) == 0: | |
| logger.warning( | |
| f"No elective courses found! Check if kategori_mk = '{elective_category}' exists in data." | |
| ) | |
| logger.warning( | |
| f"Elective identification rule: {self.config.get_elective_filter_description()}" | |
| ) | |
| return pd.DataFrame(), set() | |
| # Clean student enrollment data | |
| students = self._clean_students_data(self.raw_data["students_ind"].copy()) | |
| # Filter for elective courses only | |
| students = students[students["kode_mk"].isin(self.elective_codes)] | |
| if len(students) == 0: | |
| logger.warning("No enrollment data found for elective courses!") | |
| return pd.DataFrame(), self.elective_codes | |
| # Aggregate enrollment by course-semester | |
| enrollment = ( | |
| students.groupby(["kode_mk", "thn", "smt"])["kode_mhs"] | |
| .nunique() | |
| .reset_index(name="enrollment") | |
| ) | |
| # Clean yearly population data | |
| yearly_pop = self._clean_yearly_population( | |
| self.raw_data["students_yearly"][["thn", "smt", "jumlah_aktif"]].copy() | |
| ) | |
| # Merge enrollment with population data | |
| df = enrollment.merge(yearly_pop, on=["thn", "smt"], how="left") | |
| # Handle missing population data | |
| missing_pop = df["jumlah_aktif"].isna().sum() | |
| if missing_pop > 0: | |
| df["jumlah_aktif"] = df["jumlah_aktif"].ffill().bfill() | |
| if df["jumlah_aktif"].isna().any(): | |
| default_pop = 500 | |
| df["jumlah_aktif"] = df["jumlah_aktif"].fillna(default_pop) | |
| # Validate enrollment data | |
| df = self._validate_enrollment_data(df) | |
| # Sort and finalize | |
| df = df.sort_values(["kode_mk", "thn", "smt"]).reset_index(drop=True) | |
| self.processed_data = df | |
| return df, self.elective_codes | |
| def _validate_enrollment_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| # Remove zero enrollments | |
| df = df[df["enrollment"] > 0] | |
| # Check for extreme outliers in enrollment | |
| for course in df["kode_mk"].unique(): | |
| course_data = df[df["kode_mk"] == course]["enrollment"] | |
| if len(course_data) > 1: | |
| q75, q25 = course_data.quantile([0.75, 0.25]) | |
| iqr = q75 - q25 | |
| upper_bound = q75 + (3 * iqr) | |
| outliers = course_data > upper_bound | |
| if outliers.any(): | |
| logger.debug( | |
| f" Course {course} has {outliers.sum()} potential outliers (keeping them)" | |
| ) | |
| # Ensure population is reasonable | |
| if (df["jumlah_aktif"] < 50).any(): | |
| logger.warning(" Some semesters have very low student population (<50)") | |
| return df | |