""" Studio Manager - Handles Notebook, Flashcards, and Quiz storage and operations """ import json from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime, timedelta import uuid from models.studio_models import ( NotebookEntry, NotebookEntryCreate, NotebookEntryUpdate, Flashcard, FlashcardCreate, FlashcardUpdate, FlashcardReview, Quiz, QuizCreate, QuizResult, QuizHistory, QuizAnswer, MasteryLevel, DifficultyLevel ) import config class StudioManager: """Manages all Studio features: Notebook, Flashcards, Quiz""" def __init__(self): """Initialize studio manager with data directories""" self.studio_dir = config.DATA_DIR / "studio" self.notebooks_dir = self.studio_dir / "notebooks" self.notebook_dir = self.studio_dir / "notebook" self.flashcards_dir = self.studio_dir / "flashcards" self.quizzes_dir = self.studio_dir / "quizzes" self.quiz_results_dir = self.studio_dir / "quiz_results" # Create directories for directory in [self.notebooks_dir, self.notebook_dir, self.flashcards_dir, self.quizzes_dir, self.quiz_results_dir]: directory.mkdir(parents=True, exist_ok=True) def _get_notebook_file_path(self, space_id: str) -> Path: """Get the metadata file path for a space notebook.""" return self.notebooks_dir / f"{space_id}.json" def ensure_space_notebook(self, space_id: str, space_name: str = "") -> Dict[str, Any]: """Create notebook metadata for a space if it does not exist.""" file_path = self._get_notebook_file_path(space_id) if file_path.exists(): with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) now = datetime.now().isoformat() notebook_name = space_name.strip() if space_name and space_name.strip() else space_id notebook_data = { "id": space_id, "space_id": space_id, "name": notebook_name, "created_at": now, "updated_at": now } with open(file_path, 'w', encoding='utf-8') as f: json.dump(notebook_data, f, indent=2) return notebook_data def get_space_notebook(self, space_id: str) -> Optional[Dict[str, Any]]: """Get notebook metadata for a specific space.""" file_path = self._get_notebook_file_path(space_id) if not file_path.exists(): return None with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) def _derive_title_from_question(self, question: str) -> str: """Generate a readable title from a chat question.""" question = (question or "").strip() if not question: return "Chat Note" title = question.replace('\n', ' ') return title[:80] + "..." if len(title) > 80 else title # ======================================================================== # NOTEBOOK OPERATIONS # ======================================================================== def create_notebook_entry(self, entry_data: NotebookEntryCreate) -> NotebookEntry: """Create a new notebook entry""" # Ensure a notebook record exists for this space. self.ensure_space_notebook(entry_data.space_id) entry = NotebookEntry( id=str(uuid.uuid4()), **entry_data.dict() ) # Save to file file_path = self.notebook_dir / f"{entry.id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(entry.dict(), f, indent=2, default=str) return entry def create_notebook_entry_from_chat( self, space_id: str, question: str, answer: str, chat_id: Optional[str] = None, assistant_timestamp: Optional[str] = None, tags: Optional[List[str]] = None, space_name: str = "" ) -> NotebookEntry: """Create a notebook entry from a chat Q/A pair.""" self.ensure_space_notebook(space_id, space_name=space_name) metadata: Dict[str, Any] = { "question": question, "assistant_timestamp": assistant_timestamp, } if chat_id: metadata["chat_id"] = chat_id entry_data = NotebookEntryCreate( space_id=space_id, title=self._derive_title_from_question(question), content=f"Q: {question.strip()}\n\nA: {answer.strip()}", source_type="chat", source_id=chat_id, tags=tags or ["chat"], metadata=metadata ) entry = self.create_notebook_entry(entry_data) # Update notebook metadata timestamp. notebook_data = self.ensure_space_notebook(space_id, space_name=space_name) notebook_data["updated_at"] = datetime.now().isoformat() with open(self._get_notebook_file_path(space_id), 'w', encoding='utf-8') as f: json.dump(notebook_data, f, indent=2) return entry def get_notebook_entry(self, entry_id: str) -> Optional[NotebookEntry]: """Get a single notebook entry by ID""" file_path = self.notebook_dir / f"{entry_id}.json" if not file_path.exists(): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return NotebookEntry(**data) def list_notebook_entries(self, space_id: Optional[str] = None) -> List[NotebookEntry]: """List all notebook entries, optionally filtered by space""" entries = [] for file_path in self.notebook_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) entry = NotebookEntry(**data) # Filter by space if specified if space_id is None or entry.space_id == space_id: entries.append(entry) except Exception as e: print(f"Error loading notebook entry {file_path}: {e}") # Sort by updated_at descending entries.sort(key=lambda x: x.updated_at, reverse=True) return entries def update_notebook_entry(self, entry_id: str, update_data: NotebookEntryUpdate) -> Optional[NotebookEntry]: """Update an existing notebook entry""" entry = self.get_notebook_entry(entry_id) if not entry: return None # Update fields update_dict = update_data.dict(exclude_unset=True) for key, value in update_dict.items(): setattr(entry, key, value) entry.updated_at = datetime.now() # Save file_path = self.notebook_dir / f"{entry_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(entry.dict(), f, indent=2, default=str) return entry def delete_notebook_entry(self, entry_id: str) -> bool: """Delete a notebook entry""" file_path = self.notebook_dir / f"{entry_id}.json" if file_path.exists(): file_path.unlink() return True return False # ======================================================================== # FLASHCARD OPERATIONS # ======================================================================== def create_flashcard(self, card_data: FlashcardCreate) -> Flashcard: """Create a new flashcard""" card = Flashcard( id=str(uuid.uuid4()), **card_data.dict() ) # Save to file file_path = self.flashcards_dir / f"{card.id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(card.dict(), f, indent=2, default=str) return card def get_flashcard(self, card_id: str) -> Optional[Flashcard]: """Get a single flashcard by ID""" file_path = self.flashcards_dir / f"{card_id}.json" if not file_path.exists(): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return Flashcard(**data) def list_flashcards(self, space_id: Optional[str] = None, mastery: Optional[MasteryLevel] = None) -> List[Flashcard]: """List all flashcards, optionally filtered""" cards = [] for file_path in self.flashcards_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) card = Flashcard(**data) # Apply filters if space_id and card.space_id != space_id: continue if mastery and card.mastery != mastery: continue cards.append(card) except Exception as e: print(f"Error loading flashcard {file_path}: {e}") # Sort by next_review date (cards due for review first) cards.sort(key=lambda x: x.next_review or datetime.now()) return cards def update_flashcard(self, card_id: str, update_data: FlashcardUpdate) -> Optional[Flashcard]: """Update a flashcard""" card = self.get_flashcard(card_id) if not card: return None # Update fields update_dict = update_data.dict(exclude_unset=True) for key, value in update_dict.items(): setattr(card, key, value) # Save file_path = self.flashcards_dir / f"{card_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(card.dict(), f, indent=2, default=str) return card def review_flashcard(self, card_id: str, review: FlashcardReview) -> Optional[Flashcard]: """Record a flashcard review and update mastery level""" card = self.get_flashcard(card_id) if not card: return None # Update review stats card.review_count += 1 if review.correct: card.correct_count += 1 card.last_reviewed = datetime.now() # Update mastery level based on performance accuracy = card.correct_count / card.review_count if card.review_count > 0 else 0 if accuracy >= 0.9 and card.review_count >= 5: card.mastery = MasteryLevel.MASTERED card.next_review = datetime.now() + timedelta(days=30) elif accuracy >= 0.7 and card.review_count >= 3: card.mastery = MasteryLevel.REVIEWING card.next_review = datetime.now() + timedelta(days=7) elif card.review_count >= 1: card.mastery = MasteryLevel.LEARNING card.next_review = datetime.now() + timedelta(days=1) else: card.mastery = MasteryLevel.NEW card.next_review = datetime.now() # Save file_path = self.flashcards_dir / f"{card_id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(card.dict(), f, indent=2, default=str) return card def delete_flashcard(self, card_id: str) -> bool: """Delete a flashcard""" file_path = self.flashcards_dir / f"{card_id}.json" if file_path.exists(): file_path.unlink() return True return False # ======================================================================== # QUIZ OPERATIONS # ======================================================================== def create_quiz(self, quiz_data: QuizCreate) -> Quiz: """Create a new quiz""" quiz = Quiz( id=str(uuid.uuid4()), **quiz_data.dict() ) # Save to file file_path = self.quizzes_dir / f"{quiz.id}.json" with open(file_path, 'w', encoding='utf-8') as f: json.dump(quiz.dict(), f, indent=2, default=str) return quiz def get_quiz(self, quiz_id: str) -> Optional[Quiz]: """Get a quiz by ID""" file_path = self.quizzes_dir / f"{quiz_id}.json" if not file_path.exists(): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return Quiz(**data) def list_quizzes(self, space_id: Optional[str] = None) -> List[Quiz]: """List all quizzes, optionally filtered by space""" quizzes = [] for file_path in self.quizzes_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) quiz = Quiz(**data) if space_id is None or quiz.space_id == space_id: quizzes.append(quiz) except Exception as e: print(f"Error loading quiz {file_path}: {e}") # Sort by created_at descending quizzes.sort(key=lambda x: x.created_at, reverse=True) return quizzes def delete_quiz(self, quiz_id: str) -> bool: """Delete a quiz""" file_path = self.quizzes_dir / f"{quiz_id}.json" if file_path.exists(): file_path.unlink() return True return False def submit_quiz(self, quiz_id: str, answers: List[QuizAnswer]) -> Optional[QuizResult]: """Submit quiz answers and calculate results""" quiz = self.get_quiz(quiz_id) if not quiz: return None # Create answer lookup answer_dict = {ans.question_id: ans for ans in answers} # Calculate results total_points = sum(q.points for q in quiz.questions) correct_count = 0 incorrect_count = 0 earned_points = 0 detailed_answers = [] for question in quiz.questions: user_answer = answer_dict.get(question.id) is_correct = False if user_answer: # Normalize answers for comparison correct_ans = question.correct_answer.strip().lower() user_ans = user_answer.answer.strip().lower() is_correct = correct_ans == user_ans if is_correct: correct_count += 1 earned_points += question.points else: incorrect_count += 1 else: incorrect_count += 1 detailed_answers.append({ "question_id": question.id, "question": question.question, "user_answer": user_answer.answer if user_answer else None, "correct_answer": question.correct_answer, "is_correct": is_correct, "explanation": question.explanation, "points": question.points if is_correct else 0 }) # Create result result = QuizResult( quiz_id=quiz_id, submission_id=str(uuid.uuid4()), total_questions=len(quiz.questions), correct_answers=correct_count, incorrect_answers=incorrect_count, score_percentage=(correct_count / len(quiz.questions) * 100) if quiz.questions else 0, total_points=total_points, earned_points=earned_points, answers=detailed_answers ) # Save result result_file = self.quiz_results_dir / f"{result.submission_id}.json" with open(result_file, 'w', encoding='utf-8') as f: json.dump(result.dict(), f, indent=2, default=str) return result def get_quiz_history(self, quiz_id: str) -> QuizHistory: """Get quiz attempt history""" quiz = self.get_quiz(quiz_id) if not quiz: return None # Load all results for this quiz results = [] for file_path in self.quiz_results_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) result = QuizResult(**data) if result.quiz_id == quiz_id: results.append(result) except Exception as e: print(f"Error loading quiz result {file_path}: {e}") # Calculate statistics scores = [r.score_percentage for r in results] if results else [0] history = QuizHistory( quiz_id=quiz_id, space_id=quiz.space_id, quiz_title=quiz.title, results=results, best_score=max(scores), average_score=sum(scores) / len(scores) if scores else 0, attempts_count=len(results) ) return history