Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.preprocessing import LabelEncoder, StandardScaler | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, classification_report | |
| import joblib | |
| import re | |
| from typing import List, Dict, Tuple | |
| from database_connection import DatabaseConnection | |
| import os | |
| class CourseRecommender: | |
| def __init__(self): | |
| self.model = RandomForestClassifier(n_estimators=100, random_state=42) | |
| self.label_encoders = {} | |
| self.scaler = StandardScaler() | |
| self.db_connection = DatabaseConnection() | |
| self.is_trained = False | |
| self._available_courses = None # Cache for available courses | |
| self._last_data_count = 0 # Track data count for auto-retraining | |
| self._auto_retrain_threshold = 5 # Retrain every 5 new feedbacks | |
| self._min_samples_for_training = 10 # Minimum samples needed to train | |
| self._local_feedback = [] # Store feedback locally for learning | |
| def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Preprocess the data for training""" | |
| df_processed = df.copy() | |
| # Normalize strand to uppercase for case-insensitive matching | |
| if 'strand' in df_processed.columns: | |
| df_processed['strand'] = df_processed['strand'].astype(str).str.upper() | |
| # Encode categorical variables | |
| categorical_columns = ['strand', 'hobbies'] | |
| for col in categorical_columns: | |
| if col not in self.label_encoders: | |
| self.label_encoders[col] = LabelEncoder() | |
| df_processed[col] = self.label_encoders[col].fit_transform(df_processed[col].astype(str)) | |
| else: | |
| # Handle unseen labels by using a default value | |
| try: | |
| df_processed[col] = self.label_encoders[col].transform(df_processed[col].astype(str)) | |
| except ValueError: | |
| # For unseen labels, use the most common label from training | |
| most_common = self.label_encoders[col].classes_[0] | |
| df_processed[col] = self.label_encoders[col].transform([most_common] * len(df_processed)) | |
| return df_processed | |
| def extract_hobbies_features(self, hobbies: str) -> Dict[str, int]: | |
| """Extract features from hobbies string""" | |
| if not hobbies or pd.isna(hobbies): | |
| hobbies = "" | |
| hobbies_lower = str(hobbies).lower() | |
| # Define hobby categories | |
| hobby_categories = { | |
| 'technical': ['programming', 'coding', 'computer', 'technology', 'software', 'gaming', 'electronics', 'math', 'mathematics'], | |
| 'creative': ['art', 'music', 'writing', 'design', 'photography', 'dancing', 'drawing', 'literature'], | |
| 'academic': ['reading', 'mathematics', 'science', 'research', 'studying', 'history', 'literature', 'books'], | |
| 'physical': ['sports', 'fitness', 'exercise', 'running', 'swimming', 'basketball', 'football', 'gym'], | |
| 'social': ['traveling', 'cooking', 'volunteering', 'community', 'leadership', 'social'] | |
| } | |
| features = {} | |
| for category, keywords in hobby_categories.items(): | |
| features[f'hobby_{category}'] = sum(1 for keyword in keywords if keyword in hobbies_lower) | |
| return features | |
| def prepare_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare features for the model""" | |
| df_features = df.copy() | |
| # Extract hobby features | |
| hobby_features = [] | |
| for hobbies in df['hobbies']: | |
| features = self.extract_hobbies_features(hobbies) | |
| hobby_features.append(features) | |
| hobby_df = pd.DataFrame(hobby_features) | |
| df_features = pd.concat([df_features, hobby_df], axis=1) | |
| # Normalize GWA to 0-1 scale (75-100 -> 0-1) | |
| df_features['gwa_normalized'] = (df_features['gwa'] - 75) / 25 | |
| # Create stanine bins | |
| df_features['stanine_high'] = (df_features['stanine'] >= 7).astype(int) | |
| df_features['stanine_medium'] = ((df_features['stanine'] >= 4) & (df_features['stanine'] < 7)).astype(int) | |
| df_features['stanine_low'] = (df_features['stanine'] < 4).astype(int) | |
| return df_features | |
| def get_available_courses(self): | |
| """Get available courses with caching""" | |
| if self._available_courses is None: | |
| # Try to get courses from /courses endpoint first | |
| courses = self.db_connection.get_available_courses() | |
| if not courses: | |
| print("No courses found in /courses endpoint. Using courses from student feedback data...") | |
| # Get courses from student feedback data | |
| df_temp = self.db_connection.get_student_feedback_counts() | |
| if df_temp.empty: | |
| raise ValueError("No courses available in /courses endpoint and no student feedback data found.") | |
| courses = df_temp['course'].unique().tolist() | |
| print(f"Using courses from student feedback: {courses}") | |
| self._available_courses = courses | |
| print(f"Available courses cached: {len(courses)} courses") | |
| return self._available_courses | |
| def refresh_courses_cache(self): | |
| """Refresh the available courses cache""" | |
| self._available_courses = None | |
| return self.get_available_courses() | |
| def get_current_data_count(self): | |
| """Get current number of feedback records in database""" | |
| try: | |
| df = self.db_connection.get_student_feedback_counts() | |
| return len(df) if not df.empty else 0 | |
| except: | |
| return 0 | |
| def check_and_auto_retrain(self): | |
| """Check if enough new data exists and auto-retrain if needed""" | |
| # Use local feedback count for auto-retraining | |
| local_feedback_count = len(self._local_feedback) | |
| if local_feedback_count < self._min_samples_for_training: | |
| print(f"Not enough local feedback for training: {local_feedback_count} < {self._min_samples_for_training}") | |
| return False | |
| if local_feedback_count - self._last_data_count >= self._auto_retrain_threshold: | |
| print(f"Auto-retraining triggered: {local_feedback_count - self._last_data_count} new local feedbacks") | |
| try: | |
| accuracy = self.train_model(use_database=True) | |
| self._last_data_count = local_feedback_count | |
| print(f"Auto-retraining completed with accuracy: {accuracy:.3f}") | |
| return True | |
| except Exception as e: | |
| print(f"Auto-retraining failed: {e}") | |
| return False | |
| return False | |
| def add_feedback_with_learning(self, course: str, stanine: int, gwa: float, strand: str, | |
| rating: str, hobbies: str) -> bool: | |
| """Add feedback to database and trigger auto-learning if needed""" | |
| # Add feedback to database | |
| success = self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies) | |
| if success: | |
| print(f"Feedback added for course: {course}") | |
| # Store feedback locally for learning (since API has issues) | |
| feedback_record = { | |
| 'course': course, | |
| 'stanine': stanine, | |
| 'gwa': gwa, | |
| 'strand': strand, | |
| 'rating': rating, | |
| 'hobbies': hobbies, | |
| 'count': 1 | |
| } | |
| self._local_feedback.append(feedback_record) | |
| print(f"Feedback stored locally for learning: {len(self._local_feedback)} total") | |
| # Check if we should auto-retrain | |
| self.check_and_auto_retrain() | |
| return success | |
| def configure_auto_learning(self, retrain_threshold=5, min_samples=10): | |
| """Configure auto-learning parameters""" | |
| self._auto_retrain_threshold = retrain_threshold | |
| self._min_samples_for_training = min_samples | |
| print(f"Auto-learning configured: retrain every {retrain_threshold} new feedbacks, minimum {min_samples} samples") | |
| def get_learning_status(self): | |
| """Get current learning status""" | |
| current_count = self.get_current_data_count() | |
| return { | |
| 'current_data_count': current_count, | |
| 'last_trained_count': self._last_data_count, | |
| 'new_feedbacks': current_count - self._last_data_count, | |
| 'retrain_threshold': self._auto_retrain_threshold, | |
| 'min_samples': self._min_samples_for_training, | |
| 'ready_for_retrain': (current_count - self._last_data_count) >= self._auto_retrain_threshold | |
| } | |
| def train_model(self, use_database: bool = True): | |
| """Train the recommendation model using student feedback data""" | |
| print("Loading training data from student feedback...") | |
| # Get available courses with caching | |
| available_courses = self.get_available_courses() | |
| # Get training data from student feedback | |
| df = self.db_connection.get_student_feedback_counts() | |
| if df.empty: | |
| raise ValueError("No student feedback data available for training") | |
| print(f"Student feedback data: {len(df)} samples") | |
| print(f"Feedback courses: {df['course'].unique().tolist()}") | |
| # Filter training data to only include courses that are available in /courses | |
| df_filtered = df[df['course'].isin(available_courses)] | |
| if df_filtered.empty: | |
| raise ValueError("No training data available for courses that exist in /courses endpoint") | |
| print(f"Training with {len(df_filtered)} samples (filtered to available courses)") | |
| # Clean and prepare data | |
| df_clean = df_filtered.copy() | |
| # Convert data types | |
| df_clean['stanine'] = pd.to_numeric(df_clean['stanine'], errors='coerce') | |
| df_clean['gwa'] = pd.to_numeric(df_clean['gwa'], errors='coerce') | |
| df_clean['rating'] = df_clean['rating'].astype(str) | |
| # Remove rows with invalid data | |
| df_clean = df_clean.dropna(subset=['stanine', 'gwa']) | |
| if df_clean.empty: | |
| raise ValueError("No valid training data after cleaning") | |
| print(f"Training with {len(df_clean)} clean samples") | |
| # Prepare features | |
| df_features = self.prepare_features(df_clean) | |
| df_processed = self.preprocess_data(df_features) | |
| # Select features for training | |
| feature_columns = [ | |
| 'stanine', 'gwa_normalized', 'strand', 'hobby_technical', | |
| 'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social', | |
| 'stanine_high', 'stanine_medium', 'stanine_low' | |
| ] | |
| X = df_processed[feature_columns] | |
| y = df_processed['course'] | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=42, stratify=y | |
| ) | |
| # Scale features | |
| X_train_scaled = self.scaler.fit_transform(X_train) | |
| X_test_scaled = self.scaler.transform(X_test) | |
| # Train model | |
| self.model.fit(X_train_scaled, y_train) | |
| # Evaluate | |
| y_pred = self.model.predict(X_test_scaled) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| print(f"Model accuracy: {accuracy:.3f}") | |
| self.is_trained = True | |
| # Save model | |
| self.save_model() | |
| # Update data count tracking | |
| self._last_data_count = len(df_clean) | |
| return accuracy | |
| def predict_course(self, stanine: int, gwa: float, strand: str, hobbies: str) -> List[Tuple[str, float]]: | |
| """Predict course recommendations using student feedback data and available courses""" | |
| if not self.is_trained: | |
| self.load_model() | |
| if not self.is_trained: | |
| raise ValueError("Model not trained. Please train the model first.") | |
| # Get available courses with caching | |
| available_courses = self.get_available_courses() | |
| # Create input data | |
| input_data = pd.DataFrame({ | |
| 'stanine': [stanine], | |
| 'gwa': [gwa], | |
| 'strand': [strand], | |
| 'hobbies': [hobbies] | |
| }) | |
| # Prepare features | |
| input_features = self.prepare_features(input_data) | |
| input_processed = self.preprocess_data(input_features) | |
| # Select same features as training | |
| feature_columns = [ | |
| 'stanine', 'gwa_normalized', 'strand', 'hobby_technical', | |
| 'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social', | |
| 'stanine_high', 'stanine_medium', 'stanine_low' | |
| ] | |
| X = input_processed[feature_columns] | |
| X_scaled = self.scaler.transform(X) | |
| # Get predictions with probabilities | |
| probabilities = self.model.predict_proba(X_scaled)[0] | |
| classes = self.model.classes_ | |
| # Filter recommendations to only include courses available in /courses endpoint | |
| available_recommendations = [] | |
| for i, course in enumerate(classes): | |
| if course in available_courses: | |
| available_recommendations.append((course, probabilities[i])) | |
| # Sort by probability and get top 5 | |
| available_recommendations.sort(key=lambda x: x[1], reverse=True) | |
| recommendations = available_recommendations[:5] | |
| return recommendations | |
| def save_model(self): | |
| """Save the trained model and encoders""" | |
| os.makedirs('models', exist_ok=True) | |
| joblib.dump(self.model, 'models/course_recommender_model.pkl') | |
| joblib.dump(self.label_encoders, 'models/label_encoders.pkl') | |
| joblib.dump(self.scaler, 'models/scaler.pkl') | |
| print("Model saved successfully") | |
| def load_model(self): | |
| """Load the trained model and encoders""" | |
| try: | |
| self.model = joblib.load('models/course_recommender_model.pkl') | |
| self.label_encoders = joblib.load('models/label_encoders.pkl') | |
| self.scaler = joblib.load('models/scaler.pkl') | |
| self.is_trained = True | |
| # Initialize data count tracking | |
| self._last_data_count = self.get_current_data_count() | |
| print("Model loaded successfully") | |
| except FileNotFoundError: | |
| print("No saved model found. Please train the model first.") | |
| self.is_trained = False | |
| def add_feedback(self, course: str, stanine: int, gwa: float, strand: str, | |
| rating: int, hobbies: str) -> bool: | |
| """Add user feedback to the database""" | |
| return self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies) | |