chatbot2 / course_recommender.py
markobinario's picture
Update course_recommender.py
bcd37b2 verified
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import joblib
import re
from typing import List, Dict, Tuple
from database_connection import DatabaseConnection
import os
class CourseRecommender:
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.label_encoders = {}
self.scaler = StandardScaler()
self.db_connection = DatabaseConnection()
self.is_trained = False
self._available_courses = None # Cache for available courses
self._last_data_count = 0 # Track data count for auto-retraining
self._auto_retrain_threshold = 5 # Retrain every 5 new feedbacks
self._min_samples_for_training = 10 # Minimum samples needed to train
self._local_feedback = [] # Store feedback locally for learning
def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Preprocess the data for training"""
df_processed = df.copy()
# Normalize strand to uppercase for case-insensitive matching
if 'strand' in df_processed.columns:
df_processed['strand'] = df_processed['strand'].astype(str).str.upper()
# Encode categorical variables
categorical_columns = ['strand', 'hobbies']
for col in categorical_columns:
if col not in self.label_encoders:
self.label_encoders[col] = LabelEncoder()
df_processed[col] = self.label_encoders[col].fit_transform(df_processed[col].astype(str))
else:
# Handle unseen labels by using a default value
try:
df_processed[col] = self.label_encoders[col].transform(df_processed[col].astype(str))
except ValueError:
# For unseen labels, use the most common label from training
most_common = self.label_encoders[col].classes_[0]
df_processed[col] = self.label_encoders[col].transform([most_common] * len(df_processed))
return df_processed
def extract_hobbies_features(self, hobbies: str) -> Dict[str, int]:
"""Extract features from hobbies string"""
if not hobbies or pd.isna(hobbies):
hobbies = ""
hobbies_lower = str(hobbies).lower()
# Define hobby categories
hobby_categories = {
'technical': ['programming', 'coding', 'computer', 'technology', 'software', 'gaming', 'electronics', 'math', 'mathematics'],
'creative': ['art', 'music', 'writing', 'design', 'photography', 'dancing', 'drawing', 'literature'],
'academic': ['reading', 'mathematics', 'science', 'research', 'studying', 'history', 'literature', 'books'],
'physical': ['sports', 'fitness', 'exercise', 'running', 'swimming', 'basketball', 'football', 'gym'],
'social': ['traveling', 'cooking', 'volunteering', 'community', 'leadership', 'social']
}
features = {}
for category, keywords in hobby_categories.items():
features[f'hobby_{category}'] = sum(1 for keyword in keywords if keyword in hobbies_lower)
return features
def prepare_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare features for the model"""
df_features = df.copy()
# Extract hobby features
hobby_features = []
for hobbies in df['hobbies']:
features = self.extract_hobbies_features(hobbies)
hobby_features.append(features)
hobby_df = pd.DataFrame(hobby_features)
df_features = pd.concat([df_features, hobby_df], axis=1)
# Normalize GWA to 0-1 scale (75-100 -> 0-1)
df_features['gwa_normalized'] = (df_features['gwa'] - 75) / 25
# Create stanine bins
df_features['stanine_high'] = (df_features['stanine'] >= 7).astype(int)
df_features['stanine_medium'] = ((df_features['stanine'] >= 4) & (df_features['stanine'] < 7)).astype(int)
df_features['stanine_low'] = (df_features['stanine'] < 4).astype(int)
return df_features
def get_available_courses(self):
"""Get available courses with caching"""
if self._available_courses is None:
# Try to get courses from /courses endpoint first
courses = self.db_connection.get_available_courses()
if not courses:
print("No courses found in /courses endpoint. Using courses from student feedback data...")
# Get courses from student feedback data
df_temp = self.db_connection.get_student_feedback_counts()
if df_temp.empty:
raise ValueError("No courses available in /courses endpoint and no student feedback data found.")
courses = df_temp['course'].unique().tolist()
print(f"Using courses from student feedback: {courses}")
self._available_courses = courses
print(f"Available courses cached: {len(courses)} courses")
return self._available_courses
def refresh_courses_cache(self):
"""Refresh the available courses cache"""
self._available_courses = None
return self.get_available_courses()
def get_current_data_count(self):
"""Get current number of feedback records in database"""
try:
df = self.db_connection.get_student_feedback_counts()
return len(df) if not df.empty else 0
except:
return 0
def check_and_auto_retrain(self):
"""Check if enough new data exists and auto-retrain if needed"""
# Use local feedback count for auto-retraining
local_feedback_count = len(self._local_feedback)
if local_feedback_count < self._min_samples_for_training:
print(f"Not enough local feedback for training: {local_feedback_count} < {self._min_samples_for_training}")
return False
if local_feedback_count - self._last_data_count >= self._auto_retrain_threshold:
print(f"Auto-retraining triggered: {local_feedback_count - self._last_data_count} new local feedbacks")
try:
accuracy = self.train_model(use_database=True)
self._last_data_count = local_feedback_count
print(f"Auto-retraining completed with accuracy: {accuracy:.3f}")
return True
except Exception as e:
print(f"Auto-retraining failed: {e}")
return False
return False
def add_feedback_with_learning(self, course: str, stanine: int, gwa: float, strand: str,
rating: str, hobbies: str) -> bool:
"""Add feedback to database and trigger auto-learning if needed"""
# Add feedback to database
success = self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies)
if success:
print(f"Feedback added for course: {course}")
# Store feedback locally for learning (since API has issues)
feedback_record = {
'course': course,
'stanine': stanine,
'gwa': gwa,
'strand': strand,
'rating': rating,
'hobbies': hobbies,
'count': 1
}
self._local_feedback.append(feedback_record)
print(f"Feedback stored locally for learning: {len(self._local_feedback)} total")
# Check if we should auto-retrain
self.check_and_auto_retrain()
return success
def configure_auto_learning(self, retrain_threshold=5, min_samples=10):
"""Configure auto-learning parameters"""
self._auto_retrain_threshold = retrain_threshold
self._min_samples_for_training = min_samples
print(f"Auto-learning configured: retrain every {retrain_threshold} new feedbacks, minimum {min_samples} samples")
def get_learning_status(self):
"""Get current learning status"""
current_count = self.get_current_data_count()
return {
'current_data_count': current_count,
'last_trained_count': self._last_data_count,
'new_feedbacks': current_count - self._last_data_count,
'retrain_threshold': self._auto_retrain_threshold,
'min_samples': self._min_samples_for_training,
'ready_for_retrain': (current_count - self._last_data_count) >= self._auto_retrain_threshold
}
def train_model(self, use_database: bool = True):
"""Train the recommendation model using student feedback data"""
print("Loading training data from student feedback...")
# Get available courses with caching
available_courses = self.get_available_courses()
# Get training data from student feedback
df = self.db_connection.get_student_feedback_counts()
if df.empty:
raise ValueError("No student feedback data available for training")
print(f"Student feedback data: {len(df)} samples")
print(f"Feedback courses: {df['course'].unique().tolist()}")
# Filter training data to only include courses that are available in /courses
df_filtered = df[df['course'].isin(available_courses)]
if df_filtered.empty:
raise ValueError("No training data available for courses that exist in /courses endpoint")
print(f"Training with {len(df_filtered)} samples (filtered to available courses)")
# Clean and prepare data
df_clean = df_filtered.copy()
# Convert data types
df_clean['stanine'] = pd.to_numeric(df_clean['stanine'], errors='coerce')
df_clean['gwa'] = pd.to_numeric(df_clean['gwa'], errors='coerce')
df_clean['rating'] = df_clean['rating'].astype(str)
# Remove rows with invalid data
df_clean = df_clean.dropna(subset=['stanine', 'gwa'])
if df_clean.empty:
raise ValueError("No valid training data after cleaning")
print(f"Training with {len(df_clean)} clean samples")
# Prepare features
df_features = self.prepare_features(df_clean)
df_processed = self.preprocess_data(df_features)
# Select features for training
feature_columns = [
'stanine', 'gwa_normalized', 'strand', 'hobby_technical',
'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social',
'stanine_high', 'stanine_medium', 'stanine_low'
]
X = df_processed[feature_columns]
y = df_processed['course']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Train model
self.model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = self.model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model accuracy: {accuracy:.3f}")
self.is_trained = True
# Save model
self.save_model()
# Update data count tracking
self._last_data_count = len(df_clean)
return accuracy
def predict_course(self, stanine: int, gwa: float, strand: str, hobbies: str) -> List[Tuple[str, float]]:
"""Predict course recommendations using student feedback data and available courses"""
if not self.is_trained:
self.load_model()
if not self.is_trained:
raise ValueError("Model not trained. Please train the model first.")
# Get available courses with caching
available_courses = self.get_available_courses()
# Create input data
input_data = pd.DataFrame({
'stanine': [stanine],
'gwa': [gwa],
'strand': [strand],
'hobbies': [hobbies]
})
# Prepare features
input_features = self.prepare_features(input_data)
input_processed = self.preprocess_data(input_features)
# Select same features as training
feature_columns = [
'stanine', 'gwa_normalized', 'strand', 'hobby_technical',
'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social',
'stanine_high', 'stanine_medium', 'stanine_low'
]
X = input_processed[feature_columns]
X_scaled = self.scaler.transform(X)
# Get predictions with probabilities
probabilities = self.model.predict_proba(X_scaled)[0]
classes = self.model.classes_
# Filter recommendations to only include courses available in /courses endpoint
available_recommendations = []
for i, course in enumerate(classes):
if course in available_courses:
available_recommendations.append((course, probabilities[i]))
# Sort by probability and get top 5
available_recommendations.sort(key=lambda x: x[1], reverse=True)
recommendations = available_recommendations[:5]
return recommendations
def save_model(self):
"""Save the trained model and encoders"""
os.makedirs('models', exist_ok=True)
joblib.dump(self.model, 'models/course_recommender_model.pkl')
joblib.dump(self.label_encoders, 'models/label_encoders.pkl')
joblib.dump(self.scaler, 'models/scaler.pkl')
print("Model saved successfully")
def load_model(self):
"""Load the trained model and encoders"""
try:
self.model = joblib.load('models/course_recommender_model.pkl')
self.label_encoders = joblib.load('models/label_encoders.pkl')
self.scaler = joblib.load('models/scaler.pkl')
self.is_trained = True
# Initialize data count tracking
self._last_data_count = self.get_current_data_count()
print("Model loaded successfully")
except FileNotFoundError:
print("No saved model found. Please train the model first.")
self.is_trained = False
def add_feedback(self, course: str, stanine: int, gwa: float, strand: str,
rating: int, hobbies: str) -> bool:
"""Add user feedback to the database"""
return self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies)