course-recommendation-system / main_model_2.py
LvMAC's picture
Update main_model_2.py
4ba7040 verified
# -*- coding: utf-8 -*-
"""
AI-Powered Course Recommendation System
Advanced recommendation system using FAISS and Behavioral Analysis
"""
import os
import pandas as pd
import numpy as np
import warnings
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
import requests
import json
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
# Core ML imports with error handling
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
print("⚠️ PyTorch not available, using CPU-only mode")
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
print("⚠️ SentenceTransformers not available")
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
print("⚠️ FAISS not available")
try:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer, PorterStemmer
# Download required NLTK data
try:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)
except:
pass
NLTK_AVAILABLE = True
except ImportError:
NLTK_AVAILABLE = False
print("⚠️ NLTK not available, using basic text processing")
warnings.filterwarnings('ignore')
######## Data Preprocessing #########
class DataPreprocess:
def __init__(self, device='auto'):
print("Initializing Data Preprocessing")
# Initialize NLP components if available
if NLTK_AVAILABLE:
self.lemmatizer = WordNetLemmatizer()
self.stemmer = PorterStemmer()
else:
self.lemmatizer = None
self.stemmer = None
self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by','this','these','that','course'])
self.faiss_index = None
self.student_profile = {}
# Data quality tracking
self.preprocessing_log = {
'timestamp': datetime.now().isoformat(),
'issues_found': [],
'issues_fixed': [],
'statistics': {}
}
def load_and_preprocess_data(self):
"""Load and preprocess datasets"""
try:
# Try to load from current directory (HF Spaces)
self.raw_course_data = pd.read_csv("course_data_cleaned.csv")
self.raw_student_data = pd.read_csv("student_data_cleaned.csv")
except FileNotFoundError:
# Fallback: create empty DataFrames
print("⚠️ Data files not found, creating empty datasets")
self.raw_course_data = pd.DataFrame()
self.raw_student_data = pd.DataFrame()
return
self.preprocessing_log['statistics']['course_rows'] = len(self.raw_course_data)
self.preprocessing_log['statistics']['student_rows'] = len(self.raw_student_data)
# Clean data
self._clean_course_data_comprehensive()
self._clean_student_data_comprehensive()
print("βœ… Data preprocessing completed successfully!")
def _clean_course_data_comprehensive(self):
"""Clean course data column by column"""
self.course_data = self.raw_course_data.copy()
# Remove duplicates
initial_rows = len(self.course_data)
self.course_data = self.course_data.drop_duplicates()
removed_duplicates = initial_rows - len(self.course_data)
if removed_duplicates > 0:
self.preprocessing_log['issues_fixed'].append(f"Removed {removed_duplicates} duplicate course rows")
# Process text columns
text_columns = ['Department', 'Description', 'Type', 'Skill Required', 'Field Interest','Career Paths', 'Industry Sectors']
for col in text_columns:
if col in self.course_data.columns:
# Clean text
self.course_data[col] = self.course_data[col].apply(self._clean_individual_column)
# Tokenize and lemmatize if NLTK available
if NLTK_AVAILABLE:
self.course_data[f'{col}_tokens'] = self.course_data[col].apply(self._tokenize_individual_column)
self.course_data[f'{col}_lemmatized'] = self.course_data[f'{col}_tokens'].apply(self._lemmatize_tokens)
self.course_data[col] = self.course_data[f'{col}_lemmatized'].apply(lambda x: ' '.join(x) if x else '')
self.course_data.drop([f'{col}_tokens', f'{col}_lemmatized'], axis=1, inplace=True)
# Handle Stress Level
if 'Stress Level' in self.course_data.columns:
self.course_data['Stress Level'] = self.course_data['Stress Level'].apply(self._clean_individual_column)
self.course_data['Stress Level'] = self.course_data['Stress Level'].apply(self._standardize_stress_level)
# Generate stress_numeric values
stress_mapping = {'Low': 1, 'Medium': 2, 'High': 3}
self.course_data['stress_numeric'] = self.course_data['Stress Level'].map(stress_mapping)
self.preprocessing_log['issues_fixed'].append("Generated stress_numeric values")
print(f"βœ… Course data cleaning completed: {len(self.course_data)} rows")
def _clean_student_data_comprehensive(self):
"""Clean student data column by column"""
self.student_data = self.raw_student_data.copy()
# Remove duplicates and NaN
initial_rows = len(self.student_data)
self.student_data = self.student_data.drop_duplicates()
self.student_data = self.student_data.dropna()
removed_duplicates = initial_rows - len(self.student_data)
if removed_duplicates > 0:
self.preprocessing_log['issues_fixed'].append(f"Removed {removed_duplicates} duplicate student rows")
# Process Q1_Study_Hours column specially
if 'Q1_Study_Hours' in self.student_data.columns:
def categorize_study_hours(value):
"""Categorize study hours into high, medium, low"""
value_str = str(value).strip().lower()
numbers = re.findall(r'\d+(?:\.\d+)?', value_str)
try:
hours = float(numbers[0])
if hours <= 2:
return 'low'
elif hours <= 6:
return 'medium'
else:
return 'high'
except (ValueError, IndexError):
return 'medium'
self.student_data['Q1_Study_Hours'] = self.student_data['Q1_Study_Hours'].apply(categorize_study_hours)
# Process Q columns
q_columns = [col for col in self.student_data.columns if col.startswith('Q') and col != 'Q1_Study_Hours']
for col in q_columns:
# Clean text
self.student_data[col] = self.student_data[col].apply(self._clean_individual_column)
# Tokenize and lemmatize if NLTK available
if NLTK_AVAILABLE:
self.student_data[f'{col}_tokens'] = self.student_data[col].apply(self._tokenize_individual_column)
self.student_data[f'{col}_lemmatized'] = self.student_data[f'{col}_tokens'].apply(self._lemmatize_tokens)
self.student_data[col] = self.student_data[f'{col}_lemmatized'].apply(lambda x: ' '.join(x) if x else '')
self.student_data.drop([f'{col}_tokens', f'{col}_lemmatized'], axis=1, inplace=True)
print(f"βœ… Student data cleaning completed: {len(self.student_data)} rows")
def _clean_individual_column(self, text):
"""Clean individual column text thoroughly"""
if pd.isna(text) or text == "":
return ""
text = str(text)
# Remove newlines and replace with spaces
text = re.sub(r'\n+', ' ', text)
text = re.sub(r'\r+', ' ', text)
# Remove leading/trailing whitespace
text = text.strip()
# Replace multiple spaces with single space
text = re.sub(r'\s+', ' ', text)
# Remove trailing periods and commas
text = text.rstrip('.,;')
# Remove excessive punctuation
text = re.sub(r'[.]{2,}', '.', text)
text = re.sub(r'[!]{2,}', '!', text)
text = re.sub(r'[?]{2,}', '?', text)
text = re.sub(r'[,]{2,}', ',', text)
# Clean up mixed separators
text = re.sub(r'[,;\n\\]+', ', ', text)
text = text.strip(', ')
return text
def _tokenize_individual_column(self, text):
"""Tokenize individual column text and remove stopwords"""
if pd.isna(text) or text == "":
return []
try:
if NLTK_AVAILABLE:
# Convert to lowercase and tokenize
tokens = word_tokenize(str(text).lower())
# Remove punctuation and non-alphabetic tokens
tokens = [token for token in tokens if token.isalpha()]
# Remove stopwords
tokens = [token for token in tokens if token not in self.stop_words]
# Remove short tokens
tokens = [token for token in tokens if len(token) > 2]
# Remove duplicates while preserving order
seen = set()
unique_tokens = []
for token in tokens:
if token not in seen:
seen.add(token)
unique_tokens.append(token)
return unique_tokens
else:
# Fallback tokenization
text = re.sub(r'[^\w\s]', ' ', str(text).lower())
tokens = text.split()
tokens = [token for token in tokens if len(token) > 2 and token not in self.stop_words]
return list(dict.fromkeys(tokens))
except:
# Fallback tokenization
text = re.sub(r'[^\w\s]', ' ', str(text).lower())
tokens = text.split()
tokens = [token for token in tokens if len(token) > 2 and token not in self.stop_words]
return list(dict.fromkeys(tokens))
def _lemmatize_tokens(self, tokens):
"""Lemmatize tokens"""
if not tokens:
return []
try:
if self.lemmatizer:
return [self.lemmatizer.lemmatize(token) for token in tokens]
else:
return tokens
except:
return tokens
def _standardize_stress_level(self, stress):
"""Standardize stress level values"""
if pd.isna(stress):
return "Medium"
stress_str = str(stress).lower().strip()
if any(word in stress_str for word in ['high', 'difficult', 'challenging', 'intense', 'very high','hard', 'harder','strong']):
return "High"
elif any(word in stress_str for word in ['low', 'easy', 'light', 'minimal','easier','very low','weak','lighter']):
return "Low"
else:
return "Medium"
######## MAIN MODEL #########
class ProductionCourseRecommendationSystem:
def __init__(self, device='auto'):
"""Initialize the system with production-grade components"""
self.device = self._setup_device(device)
print(f"Using device: {self.device}")
# Initialize embedding model
self.embedding_model = None
if SENTENCE_TRANSFORMERS_AVAILABLE:
try:
self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L12-v2', device=self.device)
print("βœ… Embedding model loaded successfully")
except Exception as e:
print(f"⚠️ Error loading embedding model: {e}")
self.embedding_model = None
else:
print("⚠️ SentenceTransformers not available, using TF-IDF fallback")
# Initialize NLP components
if NLTK_AVAILABLE:
self.lemmatizer = WordNetLemmatizer()
self.stemmer = PorterStemmer()
else:
self.lemmatizer = None
self.stemmer = None
self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by','this','these','that'])
# API key for Mistral (optional)
self.mistral_api_key = os.getenv("MISTRAL_API_KEY", "")
self.model_loaded = False
# Data components
try:
self.course_data = pd.read_csv("course_data_cleaned.csv")
self.student_data = pd.read_csv("student_data_cleaned.csv")
print(f"βœ… Loaded {len(self.course_data)} courses and {len(self.student_data)} student records")
except FileNotFoundError:
print("⚠️ Data files not found")
self.course_data = pd.DataFrame()
self.student_data = pd.DataFrame()
self.course_embeddings = None
self.faiss_index = None
self.student_profile = {}
# Survey questions
self.survey_questions = [
"How many hours can you dedicate to studying?",
"From Your previous semesters which course was your favorite?",
"If you had unlimited resources, what project topic would you work on?",
"What strategies do you naturally use to find solutions to a problem?",
"What profession do you want to be in the next five years?",
"List some of your strongest soft/technical skills?",
"List some of your weakest points about yourself?",
"What research areas do you find most motivating outside of your academic discipline?",
"What kind of course would you like the most?",
"How do you typically respond when you are under stress?"
]
def _setup_device(self, device):
"""Setup optimal device for computation"""
if device == 'auto':
if TORCH_AVAILABLE and torch.cuda.is_available():
return 'cuda'
else:
return 'cpu'
return device
def conduct_enhanced_survey(self):
"""Conduct enhanced survey with validation"""
print("\n" + "="*80)
print("πŸŽ“ ADVANCED COURSE RECOMMENDATION SYSTEM")
print("="*80)
print("Please provide detailed answers for better recommendations.")
responses = {}
for i, question in enumerate(self.survey_questions, 1):
print(f"\nπŸ“ Q{i}: {question}")
if i == 1:
print(" πŸ’‘ Example: '8-10 hours per day' or '40 hours per week'")
elif i == 6:
print(" πŸ’‘ Example: 'Python, Machine Learning, Problem Solving, Communication'")
elif i == 7:
print(" πŸ’‘ Example: 'Perfectionism, Time management, Public speaking'")
response = input(" Your answer: ").strip()
while len(response) < 5:
print(" ⚠️ Please provide a more detailed answer (minimum 5 characters)")
response = input(" Your answer: ").strip()
responses[f'Q{i}'] = response
self.student_profile = responses
return responses
def _create_enhanced_embeddings_and_faiss_index(self):
"""Create embeddings and FAISS index"""
if len(self.course_data) == 0:
print("⚠️ Cannot create embeddings: no course data")
return
print("\n🎯 Creating optimized embeddings")
print("-" * 50)
combined_texts = []
for _, row in self.course_data.iterrows():
sentence = f"This course is {row['Course Name'].lower()}. {row['Description'].lower()}. It is a {row['Type'].lower()} course. This course require skill like {row['Skill Required'].lower()}. A student should have interest on {row['Field Interest'].lower()}. The stress level of this course is {row['Stress Level'].lower()}."
combined_texts.append(sentence)
print(f"πŸ“š Encoding {len(combined_texts)} course descriptions...")
if self.embedding_model and SENTENCE_TRANSFORMERS_AVAILABLE:
try:
self.course_embeddings = self.embedding_model.encode(
combined_texts,
batch_size=8,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
# Build FAISS index if available
if FAISS_AVAILABLE:
dimension = self.course_embeddings.shape[1]
self.faiss_index = faiss.IndexFlatIP(dimension)
self.faiss_index.add(self.course_embeddings.astype('float32'))
print(f"βœ… FAISS index created with {self.faiss_index.ntotal} courses")
print(f"πŸ“ Embedding dimension: {dimension}")
else:
print("⚠️ FAISS not available, using similarity search fallback")
except Exception as e:
print(f"⚠️ Error creating embeddings: {e}")
self._create_tfidf_fallback(combined_texts)
else:
print("⚠️ SentenceTransformers not available, using TF-IDF fallback")
self._create_tfidf_fallback(combined_texts)
def _create_tfidf_fallback(self, texts):
"""Create TF-IDF based similarity system as fallback"""
try:
from sklearn.feature_extraction.text import TfidfVectorizer
self.tfidf_vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(texts)
print("βœ… TF-IDF fallback system created")
except Exception as e:
print(f"⚠️ Error creating TF-IDF fallback: {e}")
def create_enhanced_student_profile(self):
"""Create student profile embedding"""
if not self.student_profile:
return None, []
stress_tolerance = self._assess_enhanced_stress_tolerance(self.student_profile['Q10'].lower())
study_dedication = self._extract_enhanced_hours_preference(self.student_profile['Q1'].lower())
sentence = f"I can dedicate {study_dedication} study hours. I loved the {self.student_profile['Q2'].lower()} course from previous semester and want to build projects on {self.student_profile['Q3'].lower()} that can help in practical applications. To solve problems, I {self.student_profile['Q4'].lower()}. I want to work as a {self.student_profile['Q5'].lower()}. I have skills in {self.student_profile['Q6'].lower()}. My other field interests includes {self.student_profile['Q8'].lower()}. I prefer courses that are {self.student_profile['Q9'].lower()}. My stress management and ability to handle high workload is {stress_tolerance}."
if self.embedding_model and SENTENCE_TRANSFORMERS_AVAILABLE:
try:
profile_embedding = self.embedding_model.encode([sentence], normalize_embeddings=True)
return profile_embedding[0], [sentence]
except Exception as e:
print(f"⚠️ Error creating student profile: {e}")
# Fallback: return text for TF-IDF processing
return sentence, [sentence]
def advanced_similarity_search(self, student_embedding, k=None):
"""Enhanced similarity search"""
if k is None:
k = len(self.course_data)
if self.faiss_index is not None and isinstance(student_embedding, np.ndarray):
try:
# FAISS search
similarities, indices = self.faiss_index.search(student_embedding.reshape(1, -1).astype('float32'), k)
base_similarities = (similarities[0] * 100).clip(0, 100)
return base_similarities, indices[0]
except Exception as e:
print(f"⚠️ FAISS search error: {e}")
# TF-IDF fallback
if hasattr(self, 'tfidf_vectorizer') and hasattr(self, 'tfidf_matrix'):
try:
if isinstance(student_embedding, str):
query_vector = self.tfidf_vectorizer.transform([student_embedding])
similarities = cosine_similarity(query_vector, self.tfidf_matrix)[0]
top_indices = np.argsort(similarities)[::-1][:k]
top_similarities = similarities[top_indices] * 100
return top_similarities, top_indices
except Exception as e:
print(f"⚠️ TF-IDF search error: {e}")
# Final fallback: random selection
indices = np.arange(min(k, len(self.course_data)))
similarities = np.random.rand(len(indices)) * 50 + 25 # Random 25-75%
return similarities, indices
def calculate_advanced_behavioral_metrics(self):
"""Calculate comprehensive behavioral matching"""
if not self.student_profile or len(self.course_data) == 0:
return {}
metrics = {}
# Extract key information
study_hours = self.student_profile['Q1']
favourite_course = self.student_profile['Q2']
project_topic = self.student_profile['Q3']
career_goals = self.student_profile['Q5']
strengths = self.student_profile['Q6']
weaknesses = self.student_profile['Q7']
research_interests = self.student_profile['Q8']
course_preference = self.student_profile['Q9']
stress_response = self.student_profile['Q10']
# Enhanced assessments
stress_tolerance = self._assess_enhanced_stress_tolerance(stress_response)
study_dedication = self._extract_enhanced_hours_preference(study_hours)
# Calculate metrics for each course
for metric_name, calculator in [
('stress_matching', self._calculate_stress_compatibility),
('type_matching', self._calculate_type_compatibility),
('description_matching', self._calculate_description_compatibility),
('skill_matching', self._calculate_skill_compatibility),
('field_matching', self._calculate_field_compatibility)
]:
matches = []
for _, course in self.course_data.iterrows():
if metric_name == 'stress_matching':
match = calculator(stress_tolerance, study_dedication, course)
elif metric_name == 'type_matching':
match = calculator(course_preference, course)
elif metric_name == 'description_matching':
match = calculator(favourite_course, project_topic, career_goals, course)
elif metric_name == 'skill_matching':
match = calculator(strengths, weaknesses, course)
else: # field_matching
match = calculator(research_interests, career_goals, course)
matches.append(match)
metrics[metric_name] = matches
return metrics
def _calculate_stress_compatibility(self, stress_tolerance, study_dedication, course):
"""Calculate stress compatibility"""
course_stress = course.get('stress_numeric', 2)
compatibility_matrix = {
('high', 3): 95, ('high', 2): 85, ('high', 1): 70,
('medium', 3): 60, ('medium', 2): 90, ('medium', 1): 85,
('low', 3): 25, ('low', 2): 70, ('low', 1): 95
}
base_score = compatibility_matrix.get((stress_tolerance, course_stress), 50)
if study_dedication == 'high':
base_score += 5
elif study_dedication == 'low' and course_stress >= 2:
base_score -= 10
return min(100, max(0, base_score))
def _calculate_type_compatibility(self, course_preference, course):
"""Enhanced type compatibility using semantic matching - ORIGINAL VERSION"""
course_type = course['Type']
course_type_array = course['Type'].split()
base_similarity = self._calculate_enhanced_text_similarity(course_preference, course_type)
# Type-specific boost based on keywords - ORIGINAL KEYWORDS
type_keywords = {
'technical': ['test','code', 'program', 'technical', 'algorithm', 'system', 'software','application','hands-on','hands on'],
'practical': ['hands-on', 'practical', 'build', 'create', 'implement', 'project'],
'analytical': ['analytical','design','analyze', 'data', 'research', 'statistical', 'study', 'investigate','hands-on','hands on'],
'creative': ['creative', 'design', 'innovative', 'artistic', 'visual', 'original'],
'theoretical': ['theory', 'concept', 'abstract', 'academic', 'principle', 'framework'],
'research': ['research', 'investigate', 'explore', 'discover', 'academic', 'scholarly']
}
strategy_lower = course_preference.lower()
keyword_matches = 0
for course_types in course_type_array:
if course_types in type_keywords:
for keyword in type_keywords[course_types]:
if keyword in strategy_lower:
keyword_matches += 1
keyword_boost = min(20, keyword_matches * 5)
base_similarity += keyword_boost
return min(100, max(0, int(base_similarity)))
def _calculate_description_compatibility(self, favourite_course, project_topic, career_goals, course):
"""ORIGINAL VERSION - with weighted calculations"""
# Extract course information
course_desc = course['Description']
# Initialize total similarity score
total_similarity = 0
weight_sum = 0
# Question 1: Favorite course from previous semesters (Weight: 30)
fav_course_similarity = self._calculate_enhanced_text_similarity(favourite_course, course_desc)
total_similarity += fav_course_similarity * 30
weight_sum += 30
# Question 2: Dream project topic (Weight: 40 - Highest weight)
project_similarity = self._calculate_enhanced_text_similarity(project_topic, course_desc)
# Also check against field interest for better matching
field_similarity = self._calculate_enhanced_text_similarity(project_topic, course['Field Interest'])
combined_similarity = max(project_similarity, field_similarity)
total_similarity += combined_similarity * 40
weight_sum += 40
# Question 3: Career goals (Weight: 30)
career_similarity = self._calculate_enhanced_text_similarity(career_goals, course_desc)
career_boost = self._calculate_career_alignment(career_goals, course_desc)
total_similarity += (career_similarity + career_boost) * 30
weight_sum += 30
# Calculate weighted average
if weight_sum > 0:
base_similarity = total_similarity / weight_sum
else:
base_similarity = 0
# ORIGINAL CALCULATION - Simple sum without weights
total = fav_course_similarity + combined_similarity + career_similarity
return min(100, max(0, int(total)))
def _calculate_career_alignment(self, career_goals, course_desc):
"""Calculate career-specific alignment boost"""
career_keywords = {
'data scientist': ['data science', 'machine learning', 'analytics', 'statistical', 'python', 'data analysis'],
'software engineer': ['software development', 'programming', 'coding', 'software engineering', 'system design'],
'cybersecurity': ['security', 'cryptography', 'network security', 'ethical hacking', 'cybersecurity'],
'ai researcher': ['artificial intelligence', 'machine learning', 'neural networks', 'deep learning', 'AI'],
'web developer': ['web development', 'frontend', 'backend', 'javascript', 'html', 'css'],
'mobile developer': ['mobile', 'android', 'ios', 'app development', 'mobile computing'],
'game developer': ['game development', 'computer graphics', 'gaming', 'unity', 'unreal'],
'database administrator': ['database', 'sql', 'data management', 'database design'],
'cloud engineer': ['cloud computing', 'aws', 'azure', 'devops', 'cloud architecture'],
'robotics engineer': ['robotics', 'embedded systems', 'automation', 'sensors', 'control systems']
}
career_lower = career_goals.lower()
course_lower = course_desc.lower()
boost = 0
for career, keywords in career_keywords.items():
if career in career_lower:
matching_keywords = sum(1 for keyword in keywords if keyword in course_lower)
boost += matching_keywords * 3
return boost
def _calculate_skill_compatibility(self, strengths, weaknesses, course):
"""Calculate skill compatibility"""
skills_required = course['Skill Required']
strength_match = self._calculate_enhanced_text_similarity(strengths, skills_required)
# Check for weakness conflicts
weakness_penalty = 0
weakness_lower = weaknesses.lower()
skills_lower = skills_required.lower()
conflict_terms = {
'math': ['mathematics', 'statistical', 'analytics'],
'programming': ['python', 'javascript', 'coding', 'software'],
'communication': ['presentation', 'writing', 'teamwork'],
'time': ['deadline', 'project management', 'organization']
}
for weakness_key, skill_terms in conflict_terms.items():
if weakness_key in weakness_lower:
if any(term in skills_lower for term in skill_terms):
weakness_penalty += 5
final_score = strength_match - weakness_penalty
return min(100, max(0, int(final_score)))
def _calculate_field_compatibility(self, research_interests, career_goals, course):
"""Enhanced field compatibility calculation - ORIGINAL VERSION"""
career_interest = course.get('Career Paths', '').lower()
industry = course.get('Industry Sectors', '').lower()
field_interest = course['Field Interest'].lower()
base_similarity = self._calculate_enhanced_text_similarity(research_interests, field_interest)
career_similarity = self._calculate_enhanced_text_similarity(career_goals, career_interest)
industry_similarity = self._calculate_enhanced_text_similarity(career_goals, industry)
total = base_similarity + career_similarity + industry_similarity
# ORIGINAL SCALING LOGIC
if total <= 100:
return total
else:
if 100 < total <= 110:
new_total = (total * 0.85)
elif 110 < total <= 120:
new_total = (total * 0.80)
elif 120 < total <= 130:
new_total = (total * 0.75)
else:
new_total = (total * 0.70)
return min(100, new_total)
def _assess_enhanced_stress_tolerance(self, stress_response):
"""Assess stress tolerance"""
response_lower = stress_response.lower()
high_indicators = ['calm', 'organized', 'handle', 'manage', 'control', 'systematic', 'planned', 'structured', 'methodical', 'efficient']
medium_indicators = ['break', 'pause', 'time', 'step back', 'breathe', 'moderate']
low_indicators = ['overwhelmed', 'panic', 'stressed', 'anxious', 'difficult', 'struggle', 'freeze', 'shutdown']
high_score = sum(1 for indicator in high_indicators if indicator in response_lower)
medium_score = sum(1 for indicator in medium_indicators if indicator in response_lower)
low_score = sum(1 for indicator in low_indicators if indicator in response_lower)
if high_score >= 2 or (high_score > 0 and medium_score == 0 and low_score == 0):
return 'high'
elif low_score >= 2 or (low_score > 0 and high_score == 0):
return 'low'
else:
return 'medium'
def _extract_enhanced_hours_preference(self, hours_text):
"""Extract study hours preference"""
hours_lower = hours_text.lower()
numbers = re.findall(r'\d+', hours_text)
if numbers:
max_hours = max(int(num) for num in numbers)
if max_hours >= 7:
return 'high'
elif 3 <= max_hours <= 6:
return 'medium'
else:
return 'low'
# Fallback to keyword analysis
if any(word in hours_lower for word in ['many', 'lot', 'intensive', 'dedicated','six', 'seven', 'eight', 'nine', 'very']):
return 'high'
elif any(word in hours_lower for word in ['moderate', 'average', 'three', 'four','five', 'not much','about']):
return 'medium'
else:
return 'low'
def _calculate_enhanced_text_similarity(self, text1, text2):
"""Ultra-optimized text similarity for your specific domain - ORIGINAL VERSION"""
if not text1 or not text2:
return 30
text1 = str(text1).lower().strip()
text2 = str(text2).lower().strip()
if not text1 or not text2:
return 30
if text1 == text2:
return 100
# Method 1: Semantic similarity using embeddings
try:
embeddings = self.embedding_model.encode([text1, text2])
semantic_similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
semantic_score = semantic_similarity * 100
except:
semantic_score = 0
# Method 2: Enhanced domain-specific matching
# Create comprehensive synonym groups for your domain
domain_synonyms = {
'data_analysis': ['data analytics', 'data analysis', 'data science', 'analytics', 'data mining', 'business intelligence'],
'programming': ['programming', 'coding', 'development', 'software', 'python', 'sql'],
'prediction': ['prediction', 'forecasting', 'stock market', 'machine learning', 'modeling'],
'practical': ['practical', 'hands-on', 'applied', 'real-world', 'implementation'],
'quantum': ['quantum computing', 'quantum', 'architecture', 'nanotechnology', 'advanced computing'],
'corporate': ['corporate', 'business', 'professional', 'industry', 'enterprise'],
'technical': ['technical', 'programming', 'software', 'system', 'computer']
}
# Calculate domain-specific similarity boost
domain_boost = 0
for category, synonyms in domain_synonyms.items():
text1_has = any(syn in text1 for syn in synonyms)
text2_has = any(syn in text2 for syn in synonyms)
if text1_has and text2_has:
domain_boost += 20 # Significant boost for domain matches
# Method 3: Direct keyword matching with weights
# High-value keywords for your student
high_value_keywords = {
'python': 25, 'sql': 25, 'data': 20, 'analytics': 20, 'practical': 20,
'machine learning': 25, 'prediction': 20, 'quantum': 25, 'programming': 15,
'hands-on': 20, 'corporate': 15, 'development': 15
}
keyword_boost = 0
for keyword, weight in high_value_keywords.items():
if keyword in text1 and keyword in text2:
keyword_boost += weight
# Method 4: TF-IDF with n-grams
try:
vectorizer = TfidfVectorizer(
stop_words='english',
lowercase=True,
min_df=1,
ngram_range=(1, 3),
analyzer='word'
)
tfidf_matrix = vectorizer.fit_transform([text1, text2])
if tfidf_matrix.shape[0] >= 2:
tfidf_similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
tfidf_score = tfidf_similarity * 100
else:
tfidf_score = 0
except:
tfidf_score = 0
# Intelligent score combination with emphasis on domain relevance - ORIGINAL FORMULA
final_score = max(
semantic_score * 0.3 + domain_boost * 0.3 + keyword_boost * 0.2 + tfidf_score * 0.2,
max(semantic_score, tfidf_score) + domain_boost * 0.5 + keyword_boost * 0.3
)
return min(100, max(0, int(final_score)))
def _generate_fallback_recommendations(self, top_course_indices, similarity_scores, behavioral_metrics):
"""Generate enhanced recommendations without Mistral-7B - ORIGINAL VERSION"""
recommendations = []
for i, course_idx in enumerate(top_course_indices[:3]):
if course_idx < len(self.course_data):
course = self.course_data.iloc[course_idx]
base_confidence = similarity_scores[i] if i < len(similarity_scores) else 70
print(f"confidence {i}:{base_confidence}")
# Calculate enhanced confidence using behavioral metrics
behavior_scores = []
for metric_values in behavioral_metrics.values():
if course_idx < len(metric_values):
behavior_scores.append(metric_values[course_idx])
avg_behavior_score = np.mean(behavior_scores) if behavior_scores else 60
enhanced_confidence = (base_confidence * 0.4 + avg_behavior_score * 0.6)
# Generate basic analysis
avg_bhvr_score = self._generate_basic_analysis(course, behavioral_metrics, course_idx)
recommendations.append({
'course': course,
'confidence': enhanced_confidence,
'index': course_idx,
'avg_bhvr_score': avg_bhvr_score,
'base_confidence': base_confidence
})
# ORIGINAL SORTING - by avg_bhvr_score, not confidence
sorted_recommendations = sorted(recommendations, key=lambda x: x['avg_bhvr_score'], reverse=True)
return sorted_recommendations
def _generate_basic_analysis(self, course, behavioral_metrics, course_idx):
"""Generate basic analysis without AI model - ORIGINAL VERSION"""
stress_score = behavioral_metrics['stress_matching'][course_idx]
type_score = behavioral_metrics['type_matching'][course_idx]
desc_score = behavioral_metrics['description_matching'][course_idx]
skill_score = behavioral_metrics['skill_matching'][course_idx]
field_score = behavioral_metrics['field_matching'][course_idx]
avg_score = np.mean([stress_score, type_score, desc_score, skill_score, field_score])
return avg_score
def _display_production_results(self, recommendations, metrics, all_similarity_scores):
"""Display comprehensive results"""
print("\n" + "="*80)
print("🎯 COURSE RECOMMENDATION RESULTS")
print("="*80)
print(f"\nπŸ† TOP 3 RECOMMENDATIONS:")
for i, rec in enumerate(recommendations, 1):
confidence_icon = "πŸ₯‡" if i == 1 else "πŸ₯ˆ" if i == 2 else "πŸ₯‰"
print(f"\n{confidence_icon} {i}. {rec['course']['Course Name']}")
print(f" Overall Confidence: {rec['confidence']:.1f}%")
print(f" Behavioral Score: {rec['avg_bhvr_score']:.1f}%")
print(f" Base Similarity: {rec['base_confidence']:.1f}%")
if metrics:
print(f" Stress Compatibility: {metrics['stress_matching'][rec['index']]:.1f}%")
print(f" Learning Style Match: {metrics['type_matching'][rec['index']]:.1f}%")
print(f" Interest Alignment: {metrics['description_matching'][rec['index']]:.1f}%")
print(f" Skill Compatibility: {metrics['skill_matching'][rec['index']]:.1f}%")
print(f" Field Match: {metrics['field_matching'][rec['index']]:.1f}%")
avg_confidence = np.mean([rec['confidence'] for rec in recommendations])
print(f"\nπŸ“ˆ SYSTEM STATISTICS:")
print(f" Average Top-3 Confidence: {avg_confidence:.1f}%")
print(f" Total Courses Analyzed: {len(self.course_data)}")
print(f" Embedding Model: {'βœ… SentenceTransformers' if self.embedding_model else '⚠️ TF-IDF Fallback'}")
print(f" FAISS Index: {'βœ… Available' if self.faiss_index else '⚠️ Similarity Fallback'}")
# Example usage function
def run_production_demo():
"""Run production demo"""
print("πŸš€ Initializing Course Recommendation System")
# Initialize system
system = ProductionCourseRecommendationSystem(device='cpu')
# Example student profile
system.student_profile = {
'Q1': '8-10 hours daily',
'Q2': 'Introduction to Biology',
'Q3': 'AI-powered medical diagnosis system that can help doctors',
'Q4': 'I research to understand requirements first, then design solutions systematically',
'Q5': 'Biological researcher',
'Q6': 'python programming, molecular biology, analytical skills',
'Q7': 'Sometimes perfectionist, need to improve time management',
'Q8': 'Artificial Intelligence in healthcare, computer vision, natural language processing',
'Q9': 'Practical courses with theoretical foundation and complex concepts',
'Q10': 'I stay organized, break tasks into manageable pieces, maintain work-life balance'
}
# Check if profile is complete
has_null = any(value == '' for value in system.student_profile.values())
if has_null:
print("❌ Incomplete student profile")
return None, None, None
else:
# Load system components
system._create_enhanced_embeddings_and_faiss_index()
# Generate recommendations
student_embedding, student_profile_sections = system.create_enhanced_student_profile()
similarity_scores, course_indices = system.advanced_similarity_search(student_embedding)
behavioral_metrics = system.calculate_advanced_behavioral_metrics()
recommendations = system._generate_fallback_recommendations(course_indices[:5], similarity_scores[:5], behavioral_metrics)
# Display results
system._display_production_results(recommendations, behavioral_metrics, similarity_scores)
print("\n🎯 Demo completed successfully!")
return recommendations, behavioral_metrics, student_embedding
if __name__ == "__main__":
# Initialize and run demo
try:
recommendations, metrics, student_embedding = run_production_demo()
except Exception as e:
print(f"❌ Error running demo: {e}")
print("System is still functional for web interface")