notebook-backend / utils /studio_manager.py
mohhhhhit's picture
first init
3736c33 verified
"""
Studio Manager - Handles Notebook, Flashcards, and Quiz storage and operations
"""
import json
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
import uuid
from models.studio_models import (
NotebookEntry, NotebookEntryCreate, NotebookEntryUpdate,
Flashcard, FlashcardCreate, FlashcardUpdate, FlashcardReview,
Quiz, QuizCreate, QuizResult, QuizHistory, QuizAnswer,
MasteryLevel, DifficultyLevel
)
import config
class StudioManager:
"""Manages all Studio features: Notebook, Flashcards, Quiz"""
def __init__(self):
"""Initialize studio manager with data directories"""
self.studio_dir = config.DATA_DIR / "studio"
self.notebooks_dir = self.studio_dir / "notebooks"
self.notebook_dir = self.studio_dir / "notebook"
self.flashcards_dir = self.studio_dir / "flashcards"
self.quizzes_dir = self.studio_dir / "quizzes"
self.quiz_results_dir = self.studio_dir / "quiz_results"
# Create directories
for directory in [self.notebooks_dir, self.notebook_dir, self.flashcards_dir,
self.quizzes_dir, self.quiz_results_dir]:
directory.mkdir(parents=True, exist_ok=True)
def _get_notebook_file_path(self, space_id: str) -> Path:
"""Get the metadata file path for a space notebook."""
return self.notebooks_dir / f"{space_id}.json"
def ensure_space_notebook(self, space_id: str, space_name: str = "") -> Dict[str, Any]:
"""Create notebook metadata for a space if it does not exist."""
file_path = self._get_notebook_file_path(space_id)
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
now = datetime.now().isoformat()
notebook_name = space_name.strip() if space_name and space_name.strip() else space_id
notebook_data = {
"id": space_id,
"space_id": space_id,
"name": notebook_name,
"created_at": now,
"updated_at": now
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(notebook_data, f, indent=2)
return notebook_data
def get_space_notebook(self, space_id: str) -> Optional[Dict[str, Any]]:
"""Get notebook metadata for a specific space."""
file_path = self._get_notebook_file_path(space_id)
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def _derive_title_from_question(self, question: str) -> str:
"""Generate a readable title from a chat question."""
question = (question or "").strip()
if not question:
return "Chat Note"
title = question.replace('\n', ' ')
return title[:80] + "..." if len(title) > 80 else title
# ========================================================================
# NOTEBOOK OPERATIONS
# ========================================================================
def create_notebook_entry(self, entry_data: NotebookEntryCreate) -> NotebookEntry:
"""Create a new notebook entry"""
# Ensure a notebook record exists for this space.
self.ensure_space_notebook(entry_data.space_id)
entry = NotebookEntry(
id=str(uuid.uuid4()),
**entry_data.dict()
)
# Save to file
file_path = self.notebook_dir / f"{entry.id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(entry.dict(), f, indent=2, default=str)
return entry
def create_notebook_entry_from_chat(
self,
space_id: str,
question: str,
answer: str,
chat_id: Optional[str] = None,
assistant_timestamp: Optional[str] = None,
tags: Optional[List[str]] = None,
space_name: str = ""
) -> NotebookEntry:
"""Create a notebook entry from a chat Q/A pair."""
self.ensure_space_notebook(space_id, space_name=space_name)
metadata: Dict[str, Any] = {
"question": question,
"assistant_timestamp": assistant_timestamp,
}
if chat_id:
metadata["chat_id"] = chat_id
entry_data = NotebookEntryCreate(
space_id=space_id,
title=self._derive_title_from_question(question),
content=f"Q: {question.strip()}\n\nA: {answer.strip()}",
source_type="chat",
source_id=chat_id,
tags=tags or ["chat"],
metadata=metadata
)
entry = self.create_notebook_entry(entry_data)
# Update notebook metadata timestamp.
notebook_data = self.ensure_space_notebook(space_id, space_name=space_name)
notebook_data["updated_at"] = datetime.now().isoformat()
with open(self._get_notebook_file_path(space_id), 'w', encoding='utf-8') as f:
json.dump(notebook_data, f, indent=2)
return entry
def get_notebook_entry(self, entry_id: str) -> Optional[NotebookEntry]:
"""Get a single notebook entry by ID"""
file_path = self.notebook_dir / f"{entry_id}.json"
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return NotebookEntry(**data)
def list_notebook_entries(self, space_id: Optional[str] = None) -> List[NotebookEntry]:
"""List all notebook entries, optionally filtered by space"""
entries = []
for file_path in self.notebook_dir.glob("*.json"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
entry = NotebookEntry(**data)
# Filter by space if specified
if space_id is None or entry.space_id == space_id:
entries.append(entry)
except Exception as e:
print(f"Error loading notebook entry {file_path}: {e}")
# Sort by updated_at descending
entries.sort(key=lambda x: x.updated_at, reverse=True)
return entries
def update_notebook_entry(self, entry_id: str, update_data: NotebookEntryUpdate) -> Optional[NotebookEntry]:
"""Update an existing notebook entry"""
entry = self.get_notebook_entry(entry_id)
if not entry:
return None
# Update fields
update_dict = update_data.dict(exclude_unset=True)
for key, value in update_dict.items():
setattr(entry, key, value)
entry.updated_at = datetime.now()
# Save
file_path = self.notebook_dir / f"{entry_id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(entry.dict(), f, indent=2, default=str)
return entry
def delete_notebook_entry(self, entry_id: str) -> bool:
"""Delete a notebook entry"""
file_path = self.notebook_dir / f"{entry_id}.json"
if file_path.exists():
file_path.unlink()
return True
return False
# ========================================================================
# FLASHCARD OPERATIONS
# ========================================================================
def create_flashcard(self, card_data: FlashcardCreate) -> Flashcard:
"""Create a new flashcard"""
card = Flashcard(
id=str(uuid.uuid4()),
**card_data.dict()
)
# Save to file
file_path = self.flashcards_dir / f"{card.id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(card.dict(), f, indent=2, default=str)
return card
def get_flashcard(self, card_id: str) -> Optional[Flashcard]:
"""Get a single flashcard by ID"""
file_path = self.flashcards_dir / f"{card_id}.json"
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return Flashcard(**data)
def list_flashcards(self, space_id: Optional[str] = None,
mastery: Optional[MasteryLevel] = None) -> List[Flashcard]:
"""List all flashcards, optionally filtered"""
cards = []
for file_path in self.flashcards_dir.glob("*.json"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
card = Flashcard(**data)
# Apply filters
if space_id and card.space_id != space_id:
continue
if mastery and card.mastery != mastery:
continue
cards.append(card)
except Exception as e:
print(f"Error loading flashcard {file_path}: {e}")
# Sort by next_review date (cards due for review first)
cards.sort(key=lambda x: x.next_review or datetime.now())
return cards
def update_flashcard(self, card_id: str, update_data: FlashcardUpdate) -> Optional[Flashcard]:
"""Update a flashcard"""
card = self.get_flashcard(card_id)
if not card:
return None
# Update fields
update_dict = update_data.dict(exclude_unset=True)
for key, value in update_dict.items():
setattr(card, key, value)
# Save
file_path = self.flashcards_dir / f"{card_id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(card.dict(), f, indent=2, default=str)
return card
def review_flashcard(self, card_id: str, review: FlashcardReview) -> Optional[Flashcard]:
"""Record a flashcard review and update mastery level"""
card = self.get_flashcard(card_id)
if not card:
return None
# Update review stats
card.review_count += 1
if review.correct:
card.correct_count += 1
card.last_reviewed = datetime.now()
# Update mastery level based on performance
accuracy = card.correct_count / card.review_count if card.review_count > 0 else 0
if accuracy >= 0.9 and card.review_count >= 5:
card.mastery = MasteryLevel.MASTERED
card.next_review = datetime.now() + timedelta(days=30)
elif accuracy >= 0.7 and card.review_count >= 3:
card.mastery = MasteryLevel.REVIEWING
card.next_review = datetime.now() + timedelta(days=7)
elif card.review_count >= 1:
card.mastery = MasteryLevel.LEARNING
card.next_review = datetime.now() + timedelta(days=1)
else:
card.mastery = MasteryLevel.NEW
card.next_review = datetime.now()
# Save
file_path = self.flashcards_dir / f"{card_id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(card.dict(), f, indent=2, default=str)
return card
def delete_flashcard(self, card_id: str) -> bool:
"""Delete a flashcard"""
file_path = self.flashcards_dir / f"{card_id}.json"
if file_path.exists():
file_path.unlink()
return True
return False
# ========================================================================
# QUIZ OPERATIONS
# ========================================================================
def create_quiz(self, quiz_data: QuizCreate) -> Quiz:
"""Create a new quiz"""
quiz = Quiz(
id=str(uuid.uuid4()),
**quiz_data.dict()
)
# Save to file
file_path = self.quizzes_dir / f"{quiz.id}.json"
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(quiz.dict(), f, indent=2, default=str)
return quiz
def get_quiz(self, quiz_id: str) -> Optional[Quiz]:
"""Get a quiz by ID"""
file_path = self.quizzes_dir / f"{quiz_id}.json"
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return Quiz(**data)
def list_quizzes(self, space_id: Optional[str] = None) -> List[Quiz]:
"""List all quizzes, optionally filtered by space"""
quizzes = []
for file_path in self.quizzes_dir.glob("*.json"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
quiz = Quiz(**data)
if space_id is None or quiz.space_id == space_id:
quizzes.append(quiz)
except Exception as e:
print(f"Error loading quiz {file_path}: {e}")
# Sort by created_at descending
quizzes.sort(key=lambda x: x.created_at, reverse=True)
return quizzes
def delete_quiz(self, quiz_id: str) -> bool:
"""Delete a quiz"""
file_path = self.quizzes_dir / f"{quiz_id}.json"
if file_path.exists():
file_path.unlink()
return True
return False
def submit_quiz(self, quiz_id: str, answers: List[QuizAnswer]) -> Optional[QuizResult]:
"""Submit quiz answers and calculate results"""
quiz = self.get_quiz(quiz_id)
if not quiz:
return None
# Create answer lookup
answer_dict = {ans.question_id: ans for ans in answers}
# Calculate results
total_points = sum(q.points for q in quiz.questions)
correct_count = 0
incorrect_count = 0
earned_points = 0
detailed_answers = []
for question in quiz.questions:
user_answer = answer_dict.get(question.id)
is_correct = False
if user_answer:
# Normalize answers for comparison
correct_ans = question.correct_answer.strip().lower()
user_ans = user_answer.answer.strip().lower()
is_correct = correct_ans == user_ans
if is_correct:
correct_count += 1
earned_points += question.points
else:
incorrect_count += 1
else:
incorrect_count += 1
detailed_answers.append({
"question_id": question.id,
"question": question.question,
"user_answer": user_answer.answer if user_answer else None,
"correct_answer": question.correct_answer,
"is_correct": is_correct,
"explanation": question.explanation,
"points": question.points if is_correct else 0
})
# Create result
result = QuizResult(
quiz_id=quiz_id,
submission_id=str(uuid.uuid4()),
total_questions=len(quiz.questions),
correct_answers=correct_count,
incorrect_answers=incorrect_count,
score_percentage=(correct_count / len(quiz.questions) * 100) if quiz.questions else 0,
total_points=total_points,
earned_points=earned_points,
answers=detailed_answers
)
# Save result
result_file = self.quiz_results_dir / f"{result.submission_id}.json"
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(result.dict(), f, indent=2, default=str)
return result
def get_quiz_history(self, quiz_id: str) -> QuizHistory:
"""Get quiz attempt history"""
quiz = self.get_quiz(quiz_id)
if not quiz:
return None
# Load all results for this quiz
results = []
for file_path in self.quiz_results_dir.glob("*.json"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
result = QuizResult(**data)
if result.quiz_id == quiz_id:
results.append(result)
except Exception as e:
print(f"Error loading quiz result {file_path}: {e}")
# Calculate statistics
scores = [r.score_percentage for r in results] if results else [0]
history = QuizHistory(
quiz_id=quiz_id,
space_id=quiz.space_id,
quiz_title=quiz.title,
results=results,
best_score=max(scores),
average_score=sum(scores) / len(scores) if scores else 0,
attempts_count=len(results)
)
return history