Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """Flask AI Tutor App with Adaptive Learning - FIXED VERSION""" | |
| import os | |
| import json | |
| import time | |
| from flask import Flask, render_template, request, jsonify, session, send_from_directory | |
| from flask_cors import CORS | |
| from transformers import pipeline | |
| import time | |
| import groq | |
| from dotenv import load_dotenv | |
| import uuid | |
| import numpy as np | |
| import re | |
| import tempfile | |
| import json | |
| from datetime import datetime | |
| # LangChain imports | |
| from langchain_groq import ChatGroq | |
| from langchain_core.messages import HumanMessage | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_chroma import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_core.documents import Document | |
| # Document processing imports | |
| import chardet | |
| import fitz # PyMuPDF | |
| import docx | |
| import gtts | |
| from pptx import Presentation | |
| # Load environment variables | |
| load_dotenv() | |
| app = Flask(__name__) | |
| app.secret_key = 'adaptive_learning_secret_key_2024' | |
| # Allow all origins and methods for the API to ensure mobile browsers work correctly | |
| CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=False) | |
| # Initialize components | |
| transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base.en") | |
| # Initialize Groq client | |
| groq_api_key = os.getenv("GROQ_API_KEY") | |
| if not groq_api_key: | |
| raise ValueError("GROQ_API_KEY environment variable not found!") | |
| client = groq.Groq(api_key=groq_api_key) | |
| chat_model = ChatGroq(model_name="llama-3.3-70b-versatile", api_key=groq_api_key) | |
| # Initialize ChromaDB | |
| os.makedirs("chroma_db", exist_ok=True) | |
| embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vectorstore = Chroma( | |
| embedding_function=embedding_model, | |
| persist_directory="chroma_db" | |
| ) | |
| from diagnostic import diagnostic_engine, CompetencyLevel | |
| from module_manager import module_manager | |
| def generate_assessment_content(topic, level='advanced'): | |
| """Classify topic and generate dynamic puzzle/game content in one call""" | |
| prompt = f""" | |
| You are an AI Tutor creating an assessment for the topic: "{topic}" at the "{level}" level. | |
| First, classify the topic into one of two categories: | |
| CATEGORY A: Principle-Based / Simulation (e.g., Physics, Circuits, Math, Coding). | |
| CATEGORY B: Memorization-Based / Puzzle (e.g., Biology, Anatomy, History, Geography). | |
| Second, generate the content for the assessment. | |
| If CATEGORY A: We will use a simulation game. Provide a description of the principle being tested. | |
| If CATEGORY B: We will build an interactive Crossword Puzzle. Provide 5-8 words and their clues. | |
| Return ONLY a valid JSON object with this exact structure (no markdown formatting, no comments): | |
| {{ | |
| "category": "A" or "B", | |
| "title": "Assessment Title", | |
| "game_config": {{ | |
| // IF CATEGORY A: | |
| "engine": "circuit_simulation", | |
| "principle": "Description of the principle to test", | |
| "target_value": "e.g., 2.50A", | |
| // IF CATEGORY B: | |
| "engine": "crossword_puzzle", | |
| "words": [ | |
| {{"word": "XEROPHTHALMIA", "clue": "A medical condition in which the eye fails to produce tears"}} | |
| ] | |
| }} | |
| }} | |
| Ensure X and Y are coordinates (0-100) on a 2D plane. | |
| """ | |
| try: | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| content_str = clean_response(response.content) | |
| # Try to extract JSON if it's wrapped in markdown | |
| if "```json" in content_str: | |
| content_str = content_str.split("```json\n")[1].split("\n```")[0] | |
| elif "```" in content_str: | |
| content_str = content_str.split("```\n")[1].split("\n```")[0] | |
| return json.loads(content_str) | |
| except Exception as e: | |
| print(f"Error generating content: {e}") | |
| # Fallback to Category B Crossword | |
| return { | |
| "category": "B", | |
| "title": f"Vocabulary: {topic}", | |
| "game_config": { | |
| "engine": "crossword_puzzle", | |
| "words": [ | |
| {"word": "ASSESSMENT", "clue": "The evaluation or estimation of the nature, quality, or ability of someone or something"}, | |
| {"word": "LEARNING", "clue": "The acquisition of knowledge or skills through experience, study, or by being taught"} | |
| ] | |
| } | |
| } | |
| def assessment_hub(): | |
| """Centralized Universal Assessment Hub""" | |
| return render_template('assessment_hub.html') | |
| def get_available_modules(): | |
| """Return all installed modules""" | |
| return jsonify({ | |
| 'success': True, | |
| 'modules': module_manager.get_all_modules() | |
| }) | |
| def start_module(): | |
| """Initialize a session for a specific module""" | |
| data = request.json | |
| module_id = data.get('module_id') | |
| if not module_id: | |
| return jsonify({'success': False, 'error': 'No module_id provided'}) | |
| if module_id == 'dynamic_generated': | |
| learning_state = session.get('learning_state') | |
| if learning_state and 'active_module' in learning_state: | |
| return jsonify({ | |
| 'success': True, | |
| 'session_data': learning_state['active_module'] | |
| }) | |
| return jsonify({'success': False, 'error': 'No dynamic module active'}) | |
| session_data = module_manager.create_module_session(module_id) | |
| if not session_data: | |
| return jsonify({'success': False, 'error': 'Module not found'}) | |
| # Store the active module session in Flask session | |
| learning_state = session.get('learning_state', {}) | |
| learning_state['active_module'] = session_data | |
| learning_state['category'] = session_data['category'] | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| return jsonify({ | |
| 'success': True, | |
| 'session_data': session_data | |
| }) | |
| def get_game_content(): | |
| """Retrieve the generated or configured content for the current session""" | |
| learning_state = session.get('learning_state') | |
| # Check for active modular session first | |
| if learning_state and 'active_module' in learning_state: | |
| return jsonify({ | |
| 'success': True, | |
| 'content': { | |
| 'title': learning_state['active_module']['title'], | |
| 'category': learning_state['active_module']['category'], | |
| 'game_config': learning_state['active_module']['config'], | |
| 'session_id': learning_state['active_module']['session_id'] | |
| } | |
| }) | |
| # Fallback to older generative flow | |
| if not learning_state or 'game_content' not in learning_state: | |
| return jsonify({'success': False, 'error': 'No active game session'}) | |
| return jsonify({ | |
| 'success': True, | |
| 'content': learning_state['game_content'] | |
| }) | |
| # Storage for session telemetry | |
| session_telemetry = {} | |
| def receive_telemetry(): | |
| """Receive granular interaction data from assessment games""" | |
| try: | |
| data = request.json | |
| session_id = data.get('session_id') | |
| if not session_id: | |
| return jsonify({'success': False, 'error': 'No session_id'}) | |
| if session_id not in session_telemetry: | |
| session_telemetry[session_id] = [] | |
| session_telemetry[session_id].append(data) | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def finalize_assessment(): | |
| """Finalize assessment and determine level based on telemetry""" | |
| try: | |
| data = request.json | |
| session_id = data.get('session_id') | |
| category = data.get('category', 'A') # Default to circuit sim | |
| telemetry = session_telemetry.get(session_id, []) | |
| if not telemetry: | |
| return jsonify({'success': False, 'error': 'No telemetry data found for session'}) | |
| if category == 'A': | |
| metrics = diagnostic_engine.calculate_category_a_metrics(telemetry) | |
| else: | |
| metrics = diagnostic_engine.calculate_category_b_metrics(telemetry) | |
| p_score = diagnostic_engine.compute_composite_score(category, metrics) | |
| # Get previous score from session if available | |
| learning_state = session.get('learning_state', {}) | |
| previous_p = learning_state.get('p_score') | |
| level = diagnostic_engine.determine_level(p_score, previous_p) | |
| # Update session learning state | |
| if not learning_state: | |
| learning_state = {} | |
| learning_state['determined_level'] = level.value.lower() | |
| learning_state['p_score'] = p_score | |
| learning_state['assessment_complete'] = True | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| return jsonify({ | |
| 'success': True, | |
| 'level': level.value, | |
| 'p_score': p_score, | |
| 'metrics': metrics | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| # Learning level definitions | |
| LEARNING_LEVELS = { | |
| "beginner": { | |
| "description": "Basic understanding, needs fundamental concepts", | |
| "complexity": "simple", | |
| "question_types": ["basic definitions", "simple examples", "fundamental concepts"] | |
| }, | |
| "intermediate": { | |
| "description": "Understands basics, needs practical applications", | |
| "complexity": "moderate", | |
| "question_types": ["applications", "moderate problems", "connecting concepts"] | |
| }, | |
| "advanced": { | |
| "description": "Strong understanding, needs advanced topics", | |
| "complexity": "complex", | |
| "question_types": ["complex problems", "critical thinking", "advanced applications"] | |
| } | |
| } | |
| def clean_response(response): | |
| """Remove unwanted formatting from AI responses""" | |
| cleaned_text = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL) | |
| cleaned_text = re.sub(r"(\*\*|\*|\[|\]|###|##|#)", "", cleaned_text) | |
| cleaned_text = re.sub(r"\\", "", cleaned_text) | |
| cleaned_text = re.sub(r"---", "", cleaned_text) | |
| return cleaned_text.strip() | |
| def generate_mastery_test(): | |
| """Generate a mastery test for current topic""" | |
| try: | |
| learning_state = session.get('learning_state') | |
| if not learning_state or not learning_state.get('tutorial'): | |
| return jsonify({'success': False, 'error': 'No tutorial available'}) | |
| topic = learning_state['current_topic'] | |
| tutorial = learning_state['tutorial'] | |
| prompt = f"""Based on this tutorial about {topic}, create ONE challenging question to test mastery. | |
| Tutorial: | |
| {tutorial[:1000]}... | |
| Format: | |
| Q. [Question] | |
| A) [Option] | |
| B) [Option] | |
| C) [Option] | |
| D) [Option] | |
| Correct: [Letter] | |
| Explanation: [Why this is correct]""" | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| questions = parse_assessment_questions(clean_response(response.content)) | |
| return jsonify({ | |
| 'success': True, | |
| 'question': questions[0] if questions else None | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def submit_mastery_test(): | |
| """Check mastery test answer""" | |
| try: | |
| answer = request.json.get('answer') | |
| learning_state = session.get('learning_state') | |
| if not learning_state: | |
| return jsonify({'success': False, 'error': 'No session'}) | |
| # Store the question that was asked | |
| current_question = learning_state.get('mastery_question') | |
| if not current_question: | |
| return jsonify({'success': False, 'error': 'No test question'}) | |
| is_correct = answer.strip().lower() == current_question['correct'].strip().lower() | |
| # Generate suggested topics if passed | |
| suggested_topics = [] | |
| if is_correct: | |
| topic = learning_state['current_topic'] | |
| level = learning_state['determined_level'] | |
| prompt = f"""Suggest 3 related topics to learn after mastering {topic} at {level} level. | |
| Format: | |
| 1. Topic Title | Brief description | Suggested question | |
| 2. Topic Title | Brief description | Suggested question | |
| 3. Topic Title | Brief description | Suggested question""" | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| lines = clean_response(response.content).split('\n') | |
| for line in lines: | |
| if '|' in line: | |
| parts = line.split('|') | |
| if len(parts) >= 3: | |
| suggested_topics.append({ | |
| 'title': parts[0].strip().lstrip('123. '), | |
| 'description': parts[1].strip(), | |
| 'question': parts[2].strip() | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'is_correct': is_correct, | |
| 'explanation': current_question['explanation'], | |
| 'suggested_topics': suggested_topics | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def extract_topic_from_question(question): | |
| """Extract the main topic from user's question""" | |
| prompt = f""" | |
| Analyze the following question and extract the main learning topic or subject. | |
| Return ONLY the topic name in 1-3 words. | |
| Question: {question} | |
| Topic:""" | |
| try: | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| topic = clean_response(response.content).strip() | |
| return topic | |
| except Exception as e: | |
| return "general knowledge" | |
| def generate_assessment_questions(topic, level, count=4): | |
| """Generate assessment questions for a specific level""" | |
| level_info = LEARNING_LEVELS[level] | |
| prompt = f""" | |
| Generate {count} {level}-level assessment questions about {topic}. | |
| Level: {level} - {level_info['description']} | |
| Question types: {', '.join(level_info['question_types'])} | |
| Format EXACTLY as follows for each question: | |
| Q1. [Question text] | |
| A) [Option A] | |
| B) [Option B] | |
| C) [Option C] | |
| D) [Option D] | |
| Correct: A | |
| Explanation: [Brief explanation] | |
| IMPORTANT: For "Correct:", put ONLY the letter (A, B, C, or D), nothing else. | |
| Make questions appropriate for {level} level understanding of {topic}. | |
| """ | |
| try: | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| return clean_response(response.content) | |
| except Exception as e: | |
| return f"Error generating questions: {str(e)}" | |
| def assess_user_level(topic, user_answers, current_level): | |
| """Assess user's level based on their answers""" | |
| correct_count = sum(1 for answer in user_answers if answer["is_correct"]) | |
| total_questions = len(user_answers) | |
| score = correct_count / total_questions if total_questions > 0 else 0 | |
| if current_level == "advanced": | |
| if score >= 0.75: | |
| return "advanced", "excellent" | |
| elif score >= 0.5: | |
| return "intermediate", "good" | |
| else: | |
| return "beginner", "needs_fundamentals" | |
| elif current_level == "intermediate": | |
| if score >= 0.75: | |
| return "advanced", "ready_for_advanced" | |
| elif score >= 0.5: | |
| return "intermediate", "solid" | |
| else: | |
| return "beginner", "needs_basics" | |
| else: # beginner | |
| if score >= 0.75: | |
| return "intermediate", "ready_for_intermediate" | |
| elif score >= 0.5: | |
| return "beginner", "progressing" | |
| else: | |
| return "beginner", "needs_more_basics" | |
| def generate_tutorial(topic, level, assessment_feedback): | |
| """Generate personalized tutorial based on user's level""" | |
| level_info = LEARNING_LEVELS[level] | |
| prompt = f""" | |
| Create a personalized tutorial for {topic} at {level} level. | |
| User Level: {level} | |
| Assessment Feedback: {assessment_feedback} | |
| Learning Needs: {level_info['description']} | |
| IMPORTANT: The user was assessed using a gamified diagnostic. They scored {assessment_feedback} in areas like Solution Efficiency and Recall Accuracy. | |
| Structure the tutorial with clear sections: | |
| Key Concepts | |
| Explain 3-5 fundamental ideas about {topic} | |
| Practical Examples | |
| Provide 2-3 relevant real-world examples | |
| Common Mistakes to Avoid | |
| List common errors beginners make | |
| Practice Exercise | |
| Include 1-2 problems with solutions | |
| Next Steps | |
| Suggest what to learn next | |
| Make it engaging, concise, and tailored to {level} understanding. | |
| Focus on helping the user progress from their current level. | |
| Use proper formatting but avoid special characters like ====== or excessive asterisks. | |
| """ | |
| try: | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| return clean_response(response.content) | |
| except Exception as e: | |
| return f"Error generating tutorial: {str(e)}" | |
| def handle_followup_question(question, learning_context): | |
| """Handle follow-up questions after tutorial""" | |
| topic = learning_context.get('current_topic', 'the topic') | |
| level = learning_context.get('determined_level', 'intermediate') | |
| tutorial = learning_context.get('tutorial', '') | |
| prompt = f""" | |
| You are an AI tutor helping a student learn about {topic} at {level} level. | |
| Previous Tutorial Context: | |
| {tutorial[:500]}... | |
| Student's Follow-up Question: {question} | |
| Provide a clear, concise answer appropriate for their {level} level understanding. | |
| If they need clarification, provide it with relevant examples. | |
| If they want to go deeper, offer more advanced insights. | |
| If they want practice problems, generate appropriate exercises. | |
| """ | |
| try: | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| return clean_response(response.content) | |
| except Exception as e: | |
| return f"Error answering follow-up: {str(e)}" | |
| def parse_assessment_questions(assessment_text): | |
| """Parse the generated assessment questions into structured format - FIXED VERSION""" | |
| questions = [] | |
| lines = [line.strip() for line in assessment_text.split('\n') if line.strip()] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| if line.startswith('Q') and '.' in line: | |
| question = { | |
| "text": line.split('.', 1)[1].strip(), | |
| "options": [], | |
| "correct": "", | |
| "explanation": "" | |
| } | |
| # Get options (A, B, C, D) | |
| for j in range(1, 5): | |
| if i + j < len(lines) and any(lines[i + j].startswith(prefix) for prefix in ['A)', 'B)', 'C)', 'D)']): | |
| question["options"].append(lines[i + j]) | |
| else: | |
| break | |
| # Get correct answer and explanation | |
| for j in range(i + len(question["options"]) + 1, min(i + len(question["options"]) + 5, len(lines))): | |
| if j >= len(lines): | |
| break | |
| line_j = lines[j] | |
| if line_j.startswith('Correct:'): | |
| # Extract ONLY the letter (A, B, C, or D) | |
| correct_answer = line_j.split(':', 1)[1].strip() | |
| # Get just the first letter | |
| question["correct"] = correct_answer[0].upper() if correct_answer else "" | |
| elif line_j.startswith('Explanation:'): | |
| question["explanation"] = line_j.split(':', 1)[1].strip() | |
| if question["options"] and question["correct"]: | |
| questions.append(question) | |
| i += len(question["options"]) + 3 | |
| else: | |
| i += 1 | |
| return questions if questions else [{ | |
| "text": "What is the basic concept of this topic?", | |
| "options": ["A) Fundamental idea", "B) Advanced concept", "C) Complex theory", "D) Basic principle"], | |
| "correct": "A", | |
| "explanation": "This assesses basic understanding of the topic." | |
| }] | |
| def speech_playback(text): | |
| """Convert text to speech - FIXED VERSION""" | |
| try: | |
| unique_id = str(uuid.uuid4()) | |
| audio_dir = "static/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| audio_file = f"audio/output_audio_{unique_id}.mp3" # Relative path for URL | |
| full_path = f"static/{audio_file}" # Full path for saving | |
| # Limit text length for TTS | |
| text_for_speech = text[:500] if len(text) > 500 else text | |
| tts = gtts.gTTS(text_for_speech, lang='en') | |
| tts.save(full_path) | |
| return audio_file # Return relative path | |
| except Exception as e: | |
| print(f"Error in speech_playback: {e}") | |
| return None | |
| # Document processing functions | |
| def extract_text_from_pdf(pdf_path): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text = "\n".join([page.get_text("text") for page in doc]) | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from PDF: {str(e)}" | |
| def extract_text_from_docx(docx_path): | |
| try: | |
| doc = docx.Document(docx_path) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from Word document: {str(e)}" | |
| def extract_text_from_pptx(pptx_path): | |
| try: | |
| presentation = Presentation(pptx_path) | |
| text = "" | |
| for slide in presentation.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text += shape.text + "\n" | |
| return text if text.strip() else "No extractable text found." | |
| except Exception as e: | |
| return f"Error extracting text from PowerPoint: {str(e)}" | |
| def retrieve_documents(query): | |
| """Retrieve relevant documents from vectorstore""" | |
| try: | |
| results = vectorstore.similarity_search(query, k=3) | |
| return [doc.page_content for doc in results] | |
| except: | |
| return [] | |
| def chat_with_groq(user_input, chat_history): | |
| """Original chat function for normal conversations""" | |
| try: | |
| relevant_docs = retrieve_documents(user_input) | |
| context = "\n".join(relevant_docs) if relevant_docs else "No relevant documents found." | |
| system_prompt = "You are a helpful AI tutor assistant. Answer questions accurately, clearly, and engagingly." | |
| conversation_history = "\n".join([f"User: {msg['content']}" if msg['role'] == 'user' else f"AI: {msg['content']}" | |
| for msg in chat_history[-6:]]) | |
| prompt = f"{system_prompt}\n\nContext:\n{context}\n\n{conversation_history}\n\nUser: {user_input}\n\nAI:" | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| cleaned_response = clean_response(response.content) | |
| chat_history.append({"role": "user", "content": user_input}) | |
| chat_history.append({"role": "assistant", "content": cleaned_response}) | |
| audio_file = speech_playback(cleaned_response) | |
| return cleaned_response, audio_file | |
| except Exception as e: | |
| return f"Error: {str(e)}", None | |
| # Flask Routes | |
| def index(): | |
| if 'chat_history' not in session: | |
| session['chat_history'] = [] | |
| if 'learning_state' not in session: | |
| session['learning_state'] = None | |
| return render_template('index.html') | |
| def crossword(): | |
| return render_template('crossword_puzzle.html') | |
| # Serve static audio files | |
| def serve_static(filename): | |
| return send_from_directory('static', filename) | |
| def chat(): | |
| """Handle normal chat messages""" | |
| try: | |
| user_input = request.json.get('message') | |
| if not user_input: | |
| return jsonify({'success': False, 'error': 'No message provided'}) | |
| if any(keyword in user_input.lower() for keyword in ['assess my level', 'test my knowledge', 'adaptive learning', 'personalized tutorial']): | |
| return jsonify({ | |
| 'success': True, | |
| 'adaptive_learning': True, | |
| 'message': "I see you want a personalized learning experience! Click the 'Adaptive Learning' tab to start with an assessment of your knowledge level." | |
| }) | |
| # Get history from request (stateless) or session (stateful) | |
| req_data = request.json | |
| chat_history = req_data.get('history', session.get('chat_history', [])) | |
| response, audio_file = chat_with_groq(user_input, chat_history) | |
| # Save back to session if we're using sessions | |
| session['chat_history'] = chat_history | |
| session.modified = True | |
| audio_url = f"/static/{audio_file}" if audio_file else None | |
| return jsonify({ | |
| 'success': True, | |
| 'response': response, | |
| 'audio_url': audio_url, | |
| 'adaptive_learning': False | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def start_adaptive_learning(): | |
| """Start the adaptive learning process - FIXED VERSION""" | |
| try: | |
| user_question = request.json.get('question') | |
| if not user_question: | |
| return jsonify({'success': False, 'error': 'No question provided'}) | |
| # Extract topic from question | |
| topic = extract_topic_from_question(user_question) | |
| print(f"Extracted topic: {topic}") # Debug | |
| learning_state = { | |
| 'current_topic': topic, | |
| 'current_level': 'advanced', | |
| 'assessment_questions': [], | |
| 'user_answers': [], | |
| 'current_question_index': 0, | |
| 'assessment_complete': False, | |
| 'determined_level': None, | |
| 'tutorial_generated': False, | |
| 'original_question': user_question, | |
| 'tutorial': '', | |
| 'followup_history': [] | |
| } | |
| # Determine if we have a pre-built module or need to generate | |
| assigned_module = None | |
| all_modules = module_manager.get_all_modules() | |
| # Simple keyword matching against installed modules | |
| for mod in all_modules: | |
| if mod.get('subject', '').lower() in topic.lower() or topic.lower() in mod.get('title', '').lower(): | |
| assigned_module = mod | |
| break | |
| if assigned_module: | |
| category = assigned_module['category'] | |
| message = f"I see you're interested in {topic}! Let's head to the Hub to load the [{assigned_module['title']}] module." | |
| # Auto-start the session | |
| sess_data = module_manager.create_module_session(assigned_module['id']) | |
| learning_state['category'] = category | |
| learning_state['active_module'] = sess_data | |
| game_url = f"/hub?auto_load={assigned_module['id']}" | |
| title = assigned_module['title'] | |
| else: | |
| # Dynamic topic classification and content generation (Fallback) | |
| generated_assessment = generate_assessment_content(topic, 'advanced') | |
| category = generated_assessment.get('category', 'B') | |
| learning_state['category'] = category | |
| learning_state['game_content'] = generated_assessment | |
| if category == 'A': | |
| game_url = '/games' | |
| message = f"I see you're interested in {topic}! I've prepared a simulation challenge to test your understanding." | |
| else: | |
| # Instead of hardcoding /puzzles, let the Hub inject the dynamic Crossword engine | |
| # Actually, we can use the Hub mechanism for generated content too. | |
| # Just mock a session_data object for the Hub. | |
| sess_data = { | |
| 'session_id': f"session_{int(time.time())}", | |
| 'module_id': 'dynamic_generated', | |
| 'category': category, | |
| 'title': generated_assessment.get('title', f"Assessment: {topic}"), | |
| 'config': generated_assessment.get('game_config', {}) | |
| } | |
| learning_state['active_module'] = sess_data | |
| game_url = '/hub?auto_load=dynamic_generated' | |
| message = f"I see you're interested in {topic}! Let's assess your knowledge with a dynamically generated crossword puzzle." | |
| title = generated_assessment.get('title', f"Assessment: {topic}") | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| return jsonify({ | |
| 'success': True, | |
| 'topic': topic, | |
| 'use_game': True, | |
| 'game_url': game_url, | |
| 'category': category, | |
| 'message': message, | |
| 'title': title | |
| }) | |
| # Generate questions for the topic (OLD MCQ FLOW - keep as fallback if needed) | |
| assessment_text = generate_assessment_questions(topic, 'advanced', 4) | |
| print(f"Generated assessment: {assessment_text[:200]}...") # Debug | |
| questions = parse_assessment_questions(assessment_text) | |
| print(f"Parsed {len(questions)} questions") # Debug | |
| learning_state['assessment_questions'] = questions | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| first_question = questions[0] | |
| print(f"First question correct answer: {first_question['correct']}") # Debug | |
| return jsonify({ | |
| 'success': True, | |
| 'topic': topic, | |
| 'message': f"I'll help you learn about {topic}. Let me assess your current level starting with some challenging questions.", | |
| 'question': first_question, | |
| 'question_number': 1, | |
| 'total_questions': len(questions), | |
| 'current_level': 'advanced' | |
| }) | |
| except Exception as e: | |
| print(f"Error in start_adaptive_learning: {str(e)}") # Debug | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def submit_assessment_answer(): | |
| """Process user's answer in adaptive learning - FIXED VERSION""" | |
| try: | |
| user_answer = request.json.get('answer') | |
| learning_state = session.get('learning_state') | |
| if not learning_state: | |
| return jsonify({'success': False, 'error': 'No active learning session'}) | |
| question_index = learning_state['current_question_index'] | |
| questions = learning_state['assessment_questions'] | |
| if question_index >= len(questions): | |
| return jsonify({'success': False, 'error': 'No more questions'}) | |
| current_question = questions[question_index] | |
| # Extract just the letter from user answer (A, B, C, or D) | |
| user_letter = user_answer.strip().upper() | |
| if user_letter.startswith(('A)', 'B)', 'C)', 'D)')): | |
| user_letter = user_letter[0] | |
| elif len(user_letter) > 1: | |
| user_letter = user_letter[0] | |
| correct_letter = current_question['correct'].strip().upper() | |
| if correct_letter.startswith(('A)', 'B)', 'C)', 'D)')): | |
| correct_letter = correct_letter[0] | |
| is_correct = user_letter == correct_letter | |
| print(f"User answer: {user_answer} -> {user_letter}, Correct: {correct_letter}, Match: {is_correct}") # Debug | |
| learning_state['user_answers'].append({ | |
| 'question': current_question['text'], | |
| 'user_answer': user_answer, | |
| 'correct_answer': current_question['correct'], | |
| 'is_correct': is_correct, | |
| 'explanation': current_question['explanation'] | |
| }) | |
| learning_state['current_question_index'] += 1 | |
| next_question_index = learning_state['current_question_index'] | |
| response_data = { | |
| 'success': True, | |
| 'is_correct': is_correct, | |
| 'explanation': current_question['explanation'], | |
| 'correct_answer': current_question['correct'], | |
| 'auto_advance': True # Signal to auto-advance | |
| } | |
| # Check if assessment is complete | |
| if next_question_index >= len(questions): | |
| topic = learning_state['current_topic'] | |
| current_level = learning_state['current_level'] | |
| user_answers = learning_state['user_answers'] | |
| determined_level, feedback = assess_user_level(topic, user_answers, current_level) | |
| learning_state['determined_level'] = determined_level | |
| learning_state['assessment_complete'] = True | |
| # Generate tutorial | |
| tutorial = generate_tutorial(topic, determined_level, feedback) | |
| learning_state['tutorial'] = tutorial | |
| learning_state['tutorial_generated'] = True | |
| response_data.update({ | |
| 'assessment_complete': True, | |
| 'determined_level': determined_level, | |
| 'feedback': feedback, | |
| 'tutorial': tutorial, | |
| 'message': f"Assessment complete! Your level: {determined_level}. Here's your personalized tutorial:" | |
| }) | |
| else: | |
| # Send next question | |
| next_question = questions[next_question_index] | |
| response_data.update({ | |
| 'assessment_complete': False, | |
| 'next_question': next_question, | |
| 'question_number': next_question_index + 1, | |
| 'total_questions': len(questions), | |
| 'current_level': learning_state['current_level'] | |
| }) | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| return jsonify(response_data) | |
| except Exception as e: | |
| print(f"Error in submit_assessment_answer: {str(e)}") # Debug | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def followup_question(): | |
| """Handle follow-up questions after tutorial""" | |
| try: | |
| question = request.json.get('question') | |
| learning_state = session.get('learning_state') | |
| if not learning_state or not learning_state.get('tutorial_generated'): | |
| return jsonify({'success': False, 'error': 'No active tutorial session'}) | |
| answer = handle_followup_question(question, learning_state) | |
| learning_state['followup_history'].append({ | |
| 'question': question, | |
| 'answer': answer, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| session['learning_state'] = learning_state | |
| session.modified = True | |
| audio_file = speech_playback(answer) | |
| audio_url = f"/static/{audio_file}" if audio_file else None | |
| return jsonify({ | |
| 'success': True, | |
| 'answer': answer, | |
| 'audio_url': audio_url | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def get_tutorial_audio(): | |
| """Generate audio for tutorial""" | |
| try: | |
| learning_state = session.get('learning_state') | |
| if not learning_state or not learning_state.get('tutorial_generated'): | |
| return jsonify({'success': False, 'error': 'No tutorial available'}) | |
| tutorial = learning_state.get('tutorial', '') | |
| audio_file = speech_playback(tutorial) | |
| audio_url = f"/static/{audio_file}" if audio_file else None | |
| return jsonify({ | |
| 'success': True, | |
| 'audio_url': audio_url | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def upload_document(): | |
| """Handle document uploads""" | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({'success': False, 'error': 'No file uploaded'}) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'success': False, 'error': 'No file selected'}) | |
| temp_path = os.path.join(tempfile.gettempdir(), file.filename) | |
| file.save(temp_path) | |
| file_extension = os.path.splitext(file.filename)[-1].lower() | |
| if file_extension == ".pdf": | |
| content = extract_text_from_pdf(temp_path) | |
| elif file_extension == ".docx": | |
| content = extract_text_from_docx(temp_path) | |
| elif file_extension == ".pptx": | |
| content = extract_text_from_pptx(temp_path) | |
| else: | |
| with open(temp_path, "r", encoding="utf-8", errors="replace") as f: | |
| content = f.read() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
| documents = [Document(page_content=chunk) for chunk in text_splitter.split_text(content)] | |
| vectorstore.add_documents(documents) | |
| quiz_prompt = "Generate a 5-question quiz based on the following content. Include multiple choice questions with answers." | |
| prompt = f"{quiz_prompt}\n\nContent:\n{content}" | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| quiz = clean_response(response.content) | |
| os.remove(temp_path) | |
| return jsonify({ | |
| 'success': True, | |
| 'quiz': quiz, | |
| 'file_type': file_extension | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| def clear_chat(): | |
| """Clear chat history""" | |
| session['chat_history'] = [] | |
| session.modified = True | |
| return jsonify({'success': True}) | |
| def reset_learning(): | |
| """Reset adaptive learning session""" | |
| session['learning_state'] = None | |
| session.modified = True | |
| return jsonify({'success': True}) | |
| def games(): | |
| #return html file for games | |
| return render_template('circuit_simulation.html') | |
| def puzzles(): | |
| #return html file for games | |
| return render_template('anatomy_puzzle.html') | |
| def generate_trivia(): | |
| try: | |
| data = request.json | |
| prompt = data.get('prompt') | |
| if not prompt: | |
| return jsonify({'success': False, 'error': 'No prompt provided'}), 400 | |
| response = chat_model.invoke([HumanMessage(content=prompt)]) | |
| return jsonify({'success': True, 'response': response.content}) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| # Use the port assigned by Hugging Face/Vercel or default to 7860 | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(debug=True, host='0.0.0.0', port=port) |