Spaces:
Build error
Build error
| import streamlit as st | |
| import json | |
| from datetime import datetime | |
| import os | |
| from huggingface_hub import HfApi | |
| from pathlib import Path | |
| import tempfile | |
| import anthropic | |
| class LearningPaths: | |
| def __init__(self): | |
| self.initialize_storage() | |
| self.initialize_llm() | |
| self.load_curriculum() | |
| self.content_generation_cache = {} | |
| def initialize_storage(self): | |
| """Initialize HuggingFace storage connection and create repository if needed""" | |
| try: | |
| self.hf_token = st.secrets.get("HF_TOKEN") or os.getenv('HF_TOKEN') | |
| if not self.hf_token: | |
| # st.error("HF_TOKEN not found in secrets or environment variables. Using local storage instead.") | |
| self.use_local_storage = True | |
| return | |
| self.api = HfApi(token=self.hf_token) | |
| self.repo_id = (st.secrets.get("HF_REPO_ID") or | |
| os.getenv('HF_REPO_ID') or | |
| f'{self.api.whoami()["name"]}/eduai-content') | |
| try: | |
| self.api.repo_info(repo_id=self.repo_id, repo_type="dataset") | |
| except Exception: | |
| st.info(f"Creating new HuggingFace dataset repository: {self.repo_id}") | |
| self.api.create_repo( | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| private=True | |
| ) | |
| self.use_local_storage = False | |
| except Exception as e: | |
| st.error(f"Error initializing HuggingFace storage: {e}") | |
| st.info("Falling back to local storage") | |
| self.use_local_storage = True | |
| def initialize_llm(self): | |
| """Initialize Anthropic/Claude client""" | |
| try: | |
| api_key = st.secrets.get("ANTHROPIC_API_KEY") or os.getenv('ANTHROPIC_API_KEY') | |
| if not api_key: | |
| st.error("ANTHROPIC_API_KEY not found in secrets or environment variables") | |
| return | |
| self.client = anthropic.Anthropic(api_key=api_key) | |
| except Exception as e: | |
| st.error(f"Error initializing LLM: {e}") | |
| def generate_module_content_async(self, module_id: str): | |
| """Asynchronous content generation with caching""" | |
| if module_id in self.content_generation_cache: | |
| return self.content_generation_cache[module_id] | |
| module = None | |
| for path in self.curriculum.values(): | |
| for m in path['modules']: | |
| if m['id'] == module_id: | |
| module = m | |
| break | |
| if module: | |
| break | |
| if not module: | |
| return None | |
| # Use a callback to trigger content generation and display | |
| if st.button(f"Generate Content for {module['name']}", key=f"generate_{module['id']}"): | |
| with st.spinner("Generating learning content..."): # Use st.spinner for visual feedback | |
| try: | |
| content = self.generate_module_content(module) # Call the content generation function | |
| self.content_generation_cache[module_id] = content | |
| st.session_state[f"module_content_{module_id}"] = content # Store content in session state | |
| st.experimental_rerun() # Rerun to display the content | |
| except Exception as e: | |
| st.error(f"Error generating content: {e}") | |
| def generate_module_content(self, module): | |
| """Generates and returns the content for a module.""" | |
| message = self.client.messages.create( | |
| model="claude-3-opus-20240229", | |
| max_tokens=1024, | |
| temperature=0.7, | |
| system="You are an expert curriculum designer and educator. Create detailed, engaging learning content with clear examples and practice exercises.", | |
| messages=[{ | |
| "role": "user", | |
| "content": f"""Create comprehensive learning content for: {module['name']} | |
| Include: | |
| 1. A clear introduction | |
| 2. Key concepts: {', '.join(module['concepts'])} | |
| 3. Detailed explanations with examples | |
| 4. Practice exercises | |
| 5. A 5-question multiple choice quiz | |
| Target difficulty level: {module['difficulty']} | |
| Format the response as JSON with the following structure: | |
| {{ | |
| "introduction": "text", | |
| "sections": [ | |
| {{"title": "string", "content": "text", "examples": []}} | |
| ], | |
| "exercises": [ | |
| {{"title": "string", "description": "text", "solution": "text"}} | |
| ], | |
| "quiz": {{ | |
| "questions": [ | |
| {{"question": "text", "options": ["A", "B", "C", "D"], "correct": 0}} | |
| ] | |
| }} | |
| }}""" | |
| }] | |
| ) | |
| try: | |
| content = json.loads(message.content[0].text) | |
| content_path = f"content/{module['id']}.json" | |
| self.save_to_storage(content, content_path) | |
| return content | |
| except json.JSONDecodeError: | |
| st.error("Error decoding the response from the Anthropic API. Please try again later.") | |
| return None | |
| def get_module_content(self, module_id: str) -> dict: | |
| """Get module content from session state if available, otherwise load from storage""" | |
| if f"module_content_{module_id}" in st.session_state: | |
| return st.session_state[f"module_content_{module_id}"] | |
| content_path = f"content/{module_id}.json" | |
| content = self.load_from_storage(content_path) | |
| if not content: | |
| return None | |
| st.session_state[f"module_content_{module_id}"] = content # Store content in session state | |
| return content | |
| def display(self): | |
| """Display learning paths interface""" | |
| st.header("Learning Paths") | |
| selected_path = st.selectbox( | |
| "Select Learning Path", | |
| options=list(self.curriculum.keys()), | |
| format_func=lambda x: self.curriculum[x]['name'] | |
| ) | |
| if selected_path: | |
| self.display_path_content(selected_path) | |
| def display_path_content(self, path_id: str): | |
| """Display content for selected path""" | |
| path = self.curriculum[path_id] | |
| st.subheader(path['name']) | |
| st.write(path['description']) | |
| if path['prerequisites']: | |
| prereqs_met = self.check_prerequisites(path['prerequisites']) | |
| if not prereqs_met: | |
| st.warning("⚠️ Please complete the prerequisite paths first: " + | |
| ", ".join([self.curriculum[p]['name'] for p in path['prerequisites']])) | |
| return | |
| progress = self.get_path_progress(path_id) | |
| st.progress(progress, f"Progress: {int(progress * 100)}%") | |
| self.display_modules(path['modules']) | |
| def display_modules(self, modules: list): | |
| for module in modules: | |
| with st.expander(f"📚 {module['name']} ({module['difficulty']})"): | |
| completed = self.is_module_complete(module['id']) | |
| if completed: | |
| st.success("✅ Module completed!") | |
| # Check if content has already been generated | |
| content = self.get_module_content(module['id']) | |
| self.display_module_content(module, content) # Display the content (if available) | |
| def display_module_content(self, module: dict, content: dict): | |
| """Display module content""" | |
| if content: | |
| st.write(content['introduction']) | |
| for section in content['sections']: | |
| st.markdown(f"### {section['title']}") | |
| st.write(section['content']) | |
| if 'examples' in section: | |
| for example in section['examples']: | |
| st.code(example) | |
| st.markdown("### Practice Exercises") | |
| for exercise in content['exercises']: | |
| with st.expander(exercise['title']): | |
| st.write(exercise['description']) | |
| if st.button("Show Solution", key=f"sol_{module['id']}_{exercise['title']}"): | |
| st.code(exercise['solution']) | |
| st.markdown("### Quiz") | |
| self.display_quiz(content['quiz'], module['id']) | |
| else: | |
| st.info("Content not generated. Click 'Generate Content' to create learning materials.") | |
| def display_quiz(self, quiz: dict, module_id: str): | |
| """Display quiz for a module""" | |
| correct_answers = 0 | |
| total_questions = len(quiz['questions']) | |
| for i, q in enumerate(quiz['questions']): | |
| st.write(f"\n**Question {i+1}:** {q['question']}") | |
| answer = st.radio( | |
| "Select your answer:", | |
| q['options'], | |
| key=f"quiz_{module_id}_{i}" | |
| ) | |
| if answer == q['options'][q['correct']]: | |
| correct_answers += 1 | |
| if st.button("Submit Quiz", key=f"submit_{module_id}"): | |
| score = correct_answers / total_questions | |
| if score >= 0.8: | |
| st.success(f"🎉 Congratulations! Score: {score*100:.0f}%") | |
| self.complete_module(module_id) | |
| else: | |
| st.warning(f"Score: {score*100:.0f}%. You need 80% to complete the module.") | |
| def load_curriculum(self): | |
| """Load or initialize curriculum structure""" | |
| curriculum = self.load_from_storage("curriculum.json") | |
| if curriculum: | |
| self.curriculum = curriculum | |
| return | |
| self.curriculum = { | |
| 'python_basics': { | |
| 'name': 'Python Programming Basics', | |
| 'description': 'Master the fundamental concepts of Python programming.', | |
| 'prerequisites': [], | |
| 'modules': [ | |
| { | |
| 'id': 'intro_python', | |
| 'name': 'Introduction to Python', | |
| 'concepts': ['programming_basics', 'python_environment', 'basic_syntax'], | |
| 'difficulty': 'beginner', | |
| 'estimated_hours': 2 | |
| }, | |
| { | |
| 'id': 'variables_types', | |
| 'name': 'Variables and Data Types', | |
| 'concepts': ['variables', 'numbers', 'strings', 'type_conversion'], | |
| 'difficulty': 'beginner', | |
| 'estimated_hours': 3 | |
| } | |
| ] | |
| }, | |
| 'data_structures': { | |
| 'name': 'Data Structures', | |
| 'description': 'Learn essential Python data structures and their operations.', | |
| 'prerequisites': ['python_basics'], | |
| 'modules': [ | |
| { | |
| 'id': 'lists_tuples', | |
| 'name': 'Lists and Tuples', | |
| 'concepts': ['list_operations', 'tuple_basics', 'sequence_types'], | |
| 'difficulty': 'intermediate', | |
| 'estimated_hours': 4 | |
| }, | |
| { | |
| 'id': 'dictionaries', | |
| 'name': 'Dictionaries', | |
| 'concepts': ['dict_operations', 'key_value_pairs', 'dict_methods'], | |
| 'difficulty': 'intermediate', | |
| 'estimated_hours': 3 | |
| } | |
| ] | |
| } | |
| } | |
| self.save_to_storage(self.curriculum, "curriculum.json") | |
| def save_to_storage(self, content: dict, path: str): | |
| """Save content to storage (HuggingFace or local)""" | |
| if self.use_local_storage: | |
| return self.save_locally(content, path) | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp: | |
| json.dump(content, tmp, indent=2) | |
| tmp.flush() | |
| self.api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=path, | |
| repo_id=self.repo_id, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error saving to HuggingFace: {e}") | |
| return self.save_locally(content, path) | |
| def load_from_storage(self, path: str) -> dict: | |
| """Load content from storage (HuggingFace or local)""" | |
| if self.use_local_storage: | |
| return self.load_locally(path) | |
| try: | |
| content = self.api.download_file( | |
| repo_id=self.repo_id, | |
| filename=path, | |
| repo_type="dataset" | |
| ) | |
| return json.loads(content) | |
| except Exception: | |
| return self.load_locally(path) | |
| def save_locally(self, content: dict, path: str): | |
| """Save content to local storage""" | |
| try: | |
| local_dir = Path("data") | |
| local_dir.mkdir(exist_ok=True) | |
| file_path = local_dir / path | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(file_path, 'w') as f: | |
| json.dump(content, f, indent=2) | |
| except Exception as e: | |
| st.error(f"Error saving locally: {e}") | |
| def load_locally(self, path: str) -> dict: | |
| """Load content from local storage""" | |
| try: | |
| file_path = Path("data") / path | |
| if not file_path.exists(): | |
| return None | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| except Exception: | |
| return None | |
| def check_prerequisites(self, prerequisites: list) -> bool: | |
| """Check if prerequisites are met""" | |
| if 'completed_modules' not in st.session_state: | |
| st.session_state.completed_modules = [] | |
| for prereq in prerequisites: | |
| prereq_modules = {m['id'] for m in self.curriculum[prereq]['modules']} | |
| if not prereq_modules.issubset(set(st.session_state.completed_modules)): | |
| return False | |
| return True | |
| def get_path_progress(self, path_id: str) -> float: | |
| """Calculate progress percentage for a path""" | |
| if 'completed_modules' not in st.session_state: | |
| st.session_state.completed_modules = [] | |
| path_modules = {m['id'] for m in self.curriculum[path_id]['modules']} | |
| completed = path_modules.intersection(set(st.session_state.completed_modules)) | |
| return len(completed) / len(path_modules) | |
| def is_module_complete(self, module_id: str) -> bool: | |
| """Check if a module is completed""" | |
| if 'completed_modules' not in st.session_state: | |
| st.session_state.completed_modules = [] | |
| return module_id in st.session_state.completed_modules | |
| def complete_module(self, module_id: str): | |
| """Mark a module as complete and update progress""" | |
| if 'completed_modules' not in st.session_state: | |
| st.session_state.completed_modules = [] | |
| if module_id not in st.session_state.completed_modules: | |
| st.session_state.completed_modules.append(module_id) | |
| progress = { | |
| 'completed_modules': st.session_state.completed_modules, | |
| 'last_active': datetime.now().isoformat() | |
| } | |
| progress_path = f"progress/{st.session_state.username}.json" | |
| self.save_to_storage(progress, progress_path) | |
| def load_user_progress(self, username: str): | |
| """Load user progress from storage""" | |
| progress_path = f"progress/{username}.json" | |
| progress = self.load_from_storage(progress_path) | |
| if progress: | |
| st.session_state.completed_modules = progress.get('completed_modules', []) |