import streamlit as st import json from datetime import datetime import os from huggingface_hub import HfApi from pathlib import Path import tempfile import anthropic class LearningPaths: def __init__(self): self.initialize_storage() self.initialize_llm() self.load_curriculum() self.content_generation_cache = {} def initialize_storage(self): """Initialize HuggingFace storage connection and create repository if needed""" try: self.hf_token = st.secrets.get("HF_TOKEN") or os.getenv('HF_TOKEN') if not self.hf_token: # st.error("HF_TOKEN not found in secrets or environment variables. Using local storage instead.") self.use_local_storage = True return self.api = HfApi(token=self.hf_token) self.repo_id = (st.secrets.get("HF_REPO_ID") or os.getenv('HF_REPO_ID') or f'{self.api.whoami()["name"]}/eduai-content') try: self.api.repo_info(repo_id=self.repo_id, repo_type="dataset") except Exception: st.info(f"Creating new HuggingFace dataset repository: {self.repo_id}") self.api.create_repo( repo_id=self.repo_id, repo_type="dataset", private=True ) self.use_local_storage = False except Exception as e: st.error(f"Error initializing HuggingFace storage: {e}") st.info("Falling back to local storage") self.use_local_storage = True def initialize_llm(self): """Initialize Anthropic/Claude client""" try: api_key = st.secrets.get("ANTHROPIC_API_KEY") or os.getenv('ANTHROPIC_API_KEY') if not api_key: st.error("ANTHROPIC_API_KEY not found in secrets or environment variables") return self.client = anthropic.Anthropic(api_key=api_key) except Exception as e: st.error(f"Error initializing LLM: {e}") def generate_module_content_async(self, module_id: str): """Asynchronous content generation with caching""" if module_id in self.content_generation_cache: return self.content_generation_cache[module_id] module = None for path in self.curriculum.values(): for m in path['modules']: if m['id'] == module_id: module = m break if module: break if not module: return None # Use a callback to trigger content generation and display if st.button(f"Generate Content for {module['name']}", key=f"generate_{module['id']}"): with st.spinner("Generating learning content..."): # Use st.spinner for visual feedback try: content = self.generate_module_content(module) # Call the content generation function self.content_generation_cache[module_id] = content st.session_state[f"module_content_{module_id}"] = content # Store content in session state st.experimental_rerun() # Rerun to display the content except Exception as e: st.error(f"Error generating content: {e}") def generate_module_content(self, module): """Generates and returns the content for a module.""" message = self.client.messages.create( model="claude-3-opus-20240229", max_tokens=1024, temperature=0.7, system="You are an expert curriculum designer and educator. Create detailed, engaging learning content with clear examples and practice exercises.", messages=[{ "role": "user", "content": f"""Create comprehensive learning content for: {module['name']} Include: 1. A clear introduction 2. Key concepts: {', '.join(module['concepts'])} 3. Detailed explanations with examples 4. Practice exercises 5. A 5-question multiple choice quiz Target difficulty level: {module['difficulty']} Format the response as JSON with the following structure: {{ "introduction": "text", "sections": [ {{"title": "string", "content": "text", "examples": []}} ], "exercises": [ {{"title": "string", "description": "text", "solution": "text"}} ], "quiz": {{ "questions": [ {{"question": "text", "options": ["A", "B", "C", "D"], "correct": 0}} ] }} }}""" }] ) try: content = json.loads(message.content[0].text) content_path = f"content/{module['id']}.json" self.save_to_storage(content, content_path) return content except json.JSONDecodeError: st.error("Error decoding the response from the Anthropic API. Please try again later.") return None def get_module_content(self, module_id: str) -> dict: """Get module content from session state if available, otherwise load from storage""" if f"module_content_{module_id}" in st.session_state: return st.session_state[f"module_content_{module_id}"] content_path = f"content/{module_id}.json" content = self.load_from_storage(content_path) if not content: return None st.session_state[f"module_content_{module_id}"] = content # Store content in session state return content def display(self): """Display learning paths interface""" st.header("Learning Paths") selected_path = st.selectbox( "Select Learning Path", options=list(self.curriculum.keys()), format_func=lambda x: self.curriculum[x]['name'] ) if selected_path: self.display_path_content(selected_path) def display_path_content(self, path_id: str): """Display content for selected path""" path = self.curriculum[path_id] st.subheader(path['name']) st.write(path['description']) if path['prerequisites']: prereqs_met = self.check_prerequisites(path['prerequisites']) if not prereqs_met: st.warning("⚠️ Please complete the prerequisite paths first: " + ", ".join([self.curriculum[p]['name'] for p in path['prerequisites']])) return progress = self.get_path_progress(path_id) st.progress(progress, f"Progress: {int(progress * 100)}%") self.display_modules(path['modules']) def display_modules(self, modules: list): for module in modules: with st.expander(f"📚 {module['name']} ({module['difficulty']})"): completed = self.is_module_complete(module['id']) if completed: st.success("✅ Module completed!") # Check if content has already been generated content = self.get_module_content(module['id']) self.display_module_content(module, content) # Display the content (if available) def display_module_content(self, module: dict, content: dict): """Display module content""" if content: st.write(content['introduction']) for section in content['sections']: st.markdown(f"### {section['title']}") st.write(section['content']) if 'examples' in section: for example in section['examples']: st.code(example) st.markdown("### Practice Exercises") for exercise in content['exercises']: with st.expander(exercise['title']): st.write(exercise['description']) if st.button("Show Solution", key=f"sol_{module['id']}_{exercise['title']}"): st.code(exercise['solution']) st.markdown("### Quiz") self.display_quiz(content['quiz'], module['id']) else: st.info("Content not generated. Click 'Generate Content' to create learning materials.") def display_quiz(self, quiz: dict, module_id: str): """Display quiz for a module""" correct_answers = 0 total_questions = len(quiz['questions']) for i, q in enumerate(quiz['questions']): st.write(f"\n**Question {i+1}:** {q['question']}") answer = st.radio( "Select your answer:", q['options'], key=f"quiz_{module_id}_{i}" ) if answer == q['options'][q['correct']]: correct_answers += 1 if st.button("Submit Quiz", key=f"submit_{module_id}"): score = correct_answers / total_questions if score >= 0.8: st.success(f"🎉 Congratulations! Score: {score*100:.0f}%") self.complete_module(module_id) else: st.warning(f"Score: {score*100:.0f}%. You need 80% to complete the module.") def load_curriculum(self): """Load or initialize curriculum structure""" curriculum = self.load_from_storage("curriculum.json") if curriculum: self.curriculum = curriculum return self.curriculum = { 'python_basics': { 'name': 'Python Programming Basics', 'description': 'Master the fundamental concepts of Python programming.', 'prerequisites': [], 'modules': [ { 'id': 'intro_python', 'name': 'Introduction to Python', 'concepts': ['programming_basics', 'python_environment', 'basic_syntax'], 'difficulty': 'beginner', 'estimated_hours': 2 }, { 'id': 'variables_types', 'name': 'Variables and Data Types', 'concepts': ['variables', 'numbers', 'strings', 'type_conversion'], 'difficulty': 'beginner', 'estimated_hours': 3 } ] }, 'data_structures': { 'name': 'Data Structures', 'description': 'Learn essential Python data structures and their operations.', 'prerequisites': ['python_basics'], 'modules': [ { 'id': 'lists_tuples', 'name': 'Lists and Tuples', 'concepts': ['list_operations', 'tuple_basics', 'sequence_types'], 'difficulty': 'intermediate', 'estimated_hours': 4 }, { 'id': 'dictionaries', 'name': 'Dictionaries', 'concepts': ['dict_operations', 'key_value_pairs', 'dict_methods'], 'difficulty': 'intermediate', 'estimated_hours': 3 } ] } } self.save_to_storage(self.curriculum, "curriculum.json") def save_to_storage(self, content: dict, path: str): """Save content to storage (HuggingFace or local)""" if self.use_local_storage: return self.save_locally(content, path) try: with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp: json.dump(content, tmp, indent=2) tmp.flush() self.api.upload_file( path_or_fileobj=tmp.name, path_in_repo=path, repo_id=self.repo_id, repo_type="dataset" ) except Exception as e: st.error(f"Error saving to HuggingFace: {e}") return self.save_locally(content, path) def load_from_storage(self, path: str) -> dict: """Load content from storage (HuggingFace or local)""" if self.use_local_storage: return self.load_locally(path) try: content = self.api.download_file( repo_id=self.repo_id, filename=path, repo_type="dataset" ) return json.loads(content) except Exception: return self.load_locally(path) def save_locally(self, content: dict, path: str): """Save content to local storage""" try: local_dir = Path("data") local_dir.mkdir(exist_ok=True) file_path = local_dir / path file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w') as f: json.dump(content, f, indent=2) except Exception as e: st.error(f"Error saving locally: {e}") def load_locally(self, path: str) -> dict: """Load content from local storage""" try: file_path = Path("data") / path if not file_path.exists(): return None with open(file_path, 'r') as f: return json.load(f) except Exception: return None def check_prerequisites(self, prerequisites: list) -> bool: """Check if prerequisites are met""" if 'completed_modules' not in st.session_state: st.session_state.completed_modules = [] for prereq in prerequisites: prereq_modules = {m['id'] for m in self.curriculum[prereq]['modules']} if not prereq_modules.issubset(set(st.session_state.completed_modules)): return False return True def get_path_progress(self, path_id: str) -> float: """Calculate progress percentage for a path""" if 'completed_modules' not in st.session_state: st.session_state.completed_modules = [] path_modules = {m['id'] for m in self.curriculum[path_id]['modules']} completed = path_modules.intersection(set(st.session_state.completed_modules)) return len(completed) / len(path_modules) def is_module_complete(self, module_id: str) -> bool: """Check if a module is completed""" if 'completed_modules' not in st.session_state: st.session_state.completed_modules = [] return module_id in st.session_state.completed_modules def complete_module(self, module_id: str): """Mark a module as complete and update progress""" if 'completed_modules' not in st.session_state: st.session_state.completed_modules = [] if module_id not in st.session_state.completed_modules: st.session_state.completed_modules.append(module_id) progress = { 'completed_modules': st.session_state.completed_modules, 'last_active': datetime.now().isoformat() } progress_path = f"progress/{st.session_state.username}.json" self.save_to_storage(progress, progress_path) def load_user_progress(self, username: str): """Load user progress from storage""" progress_path = f"progress/{username}.json" progress = self.load_from_storage(progress_path) if progress: st.session_state.completed_modules = progress.get('completed_modules', [])