open-notebook / open_notebook /services /study_plan_service.py
baveshraam's picture
FIX: SurrealDB 2.0 migration syntax and Frontend/CORS link
f871fed
"""
Study Plan Service.
AI-generated personalized study schedules based on content and deadlines.
"""
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any
import json
from loguru import logger
def _get_now_tz():
"""Get current time that matches deadline timezone (UTC or naive based on SurrealDB return)."""
return datetime.now(timezone.utc)
def _safe_days_diff(deadline, now):
"""Safely calculate days difference handling timezone mismatches."""
try:
# If deadline is timezone-aware and now is naive, make now aware
if hasattr(deadline, 'tzinfo') and deadline.tzinfo is not None:
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)
# If deadline is naive and now is aware, make deadline aware
elif now.tzinfo is not None:
deadline = deadline.replace(tzinfo=timezone.utc)
return max(0, (deadline - now).days)
except Exception:
return 0
from open_notebook.domain.study_plan import (
StudyPlan, StudyPlanCreate, StudyPlanUpdate, PlanStatus,
StudyTopic, StudyTopicCreate, StudyTopicUpdate, TopicStatus, TopicDifficulty,
StudySession, StudySessionCreate, StudySessionUpdate, SessionStatus, SessionType,
PlanAdjustment, AdjustmentType, AdjustmentStatus,
StudyPlanFull, StudyPlanWithTopics, StudyPlanStats,
DailySchedule, WeeklySchedule,
TopicAnalysis, PlanGenerationRequest, PlanGenerationResult
)
from open_notebook.database.repository import repo, ensure_record_id
from open_notebook.graphs.utils import provision_langchain_model
class StudyPlanService:
"""Service for managing study plans."""
# Difficulty multipliers for time estimation
DIFFICULTY_MULTIPLIERS = {
TopicDifficulty.EASY: 0.8,
TopicDifficulty.MEDIUM: 1.0,
TopicDifficulty.HARD: 1.3,
}
# Session type durations in minutes
SESSION_DURATIONS = {
SessionType.LEARN: 45,
SessionType.REVIEW: 30,
SessionType.PRACTICE: 45,
SessionType.QUIZ: 30,
}
async def create_plan(self, data: StudyPlanCreate) -> StudyPlan:
"""Create a new study plan."""
now = datetime.now()
plan_data = {
"notebook_id": data.notebook_id,
"title": data.title,
"description": data.description,
"deadline": data.deadline, # Pass datetime directly, not isoformat
"available_hours_per_day": data.available_hours_per_day,
"total_study_hours": 0.0,
"status": PlanStatus.ACTIVE.value,
"progress_percentage": 0.0,
"created_at": now, # Pass datetime directly
"updated_at": now, # Pass datetime directly
}
result = await repo.create("study_plan", plan_data)
logger.debug(f"repo.create result type: {type(result)}, value: {result}")
# repo.create returns a list, extract first element
if isinstance(result, list) and len(result) > 0:
result = result[0]
logger.debug(f"After list extraction type: {type(result)}, value: {result}")
return self._parse_plan(result)
async def get_plan(self, plan_id: str) -> Optional[StudyPlan]:
"""Get a study plan by ID."""
result = await repo.get(plan_id if ":" in plan_id else f"study_plan:{plan_id}")
if not result:
return None
return self._parse_plan(result)
async def get_plan_full(self, plan_id: str) -> Optional[StudyPlanFull]:
"""Get complete study plan with all related data."""
plan = await self.get_plan(plan_id)
if not plan:
return None
topics = await self.get_topics_for_plan(plan_id)
sessions = await self.get_sessions_for_plan(plan_id)
adjustments = await self.get_adjustments_for_plan(plan_id)
# Calculate additional fields
now = datetime.now()
days_remaining = _safe_days_diff(plan.deadline, now)
completed_hours = sum(
(s.actual_end - s.actual_start).total_seconds() / 3600
for s in sessions
if s.status == SessionStatus.COMPLETED and s.actual_start and s.actual_end
)
# Get upcoming sessions (next 7 days) - use timezone-safe comparison
def is_upcoming(s):
try:
sched = s.scheduled_date
n = now
# Handle timezone mismatch
if hasattr(sched, 'tzinfo') and sched.tzinfo is not None and n.tzinfo is None:
n = n.replace(tzinfo=timezone.utc)
elif n.tzinfo is not None and (not hasattr(sched, 'tzinfo') or sched.tzinfo is None):
sched = sched.replace(tzinfo=timezone.utc)
return s.status == SessionStatus.SCHEDULED and sched > n
except:
return False
upcoming = [s for s in sessions if is_upcoming(s)][:10]
return StudyPlanFull(
**plan.model_dump(),
topics=topics,
sessions=sessions,
adjustments=adjustments,
days_remaining=days_remaining,
completed_hours=completed_hours,
upcoming_sessions=upcoming,
)
async def get_plans_for_notebook(self, notebook_id: str) -> List[StudyPlan]:
"""Get all study plans for a notebook."""
query = """
SELECT * FROM study_plan
WHERE notebook_id = $notebook_id
ORDER BY created_at DESC
"""
results = await repo.query(query, {"notebook_id": notebook_id})
return [self._parse_plan(r) for r in results]
async def get_active_plans(self) -> List[StudyPlan]:
"""Get all active study plans."""
query = """
SELECT * FROM study_plan
WHERE status = 'active'
ORDER BY deadline ASC
"""
results = await repo.query(query, {})
return [self._parse_plan(r) for r in results]
async def update_plan(self, plan_id: str, data: StudyPlanUpdate) -> Optional[StudyPlan]:
"""Update a study plan."""
update_data = data.model_dump(exclude_unset=True)
if "deadline" in update_data and update_data["deadline"]:
update_data["deadline"] = update_data["deadline"].isoformat()
if "status" in update_data and update_data["status"]:
update_data["status"] = update_data["status"].value
update_data["updated_at"] = datetime.now().isoformat()
full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}"
result = await repo.update(full_plan_id, update_data)
if not result:
return None
return self._parse_plan(result)
async def delete_plan(self, plan_id: str) -> bool:
"""Delete a study plan and all related data."""
# Delete related data first
await repo.query("DELETE FROM study_session WHERE plan_id = $plan_id", {"plan_id": plan_id})
await repo.query("DELETE FROM study_topic WHERE plan_id = $plan_id", {"plan_id": plan_id})
await repo.query("DELETE FROM plan_adjustment WHERE plan_id = $plan_id", {"plan_id": plan_id})
full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}"
return await repo.delete(full_plan_id)
# Topic methods
async def create_topic(self, data: StudyTopicCreate) -> StudyTopic:
"""Create a study topic."""
topic_data = {
"plan_id": data.plan_id,
"name": data.name,
"description": data.description,
"difficulty": data.difficulty.value,
"estimated_hours": data.estimated_hours,
"priority": data.priority,
"source_ids": data.source_ids,
"prerequisites": data.prerequisites,
"status": TopicStatus.NOT_STARTED.value,
"mastery_level": 0.0,
"created_at": datetime.now().isoformat(),
}
result = await repo.create("study_topic", topic_data)
# repo.create returns a list, extract first element
if isinstance(result, list) and len(result) > 0:
result = result[0]
# Update plan total hours
await self._update_plan_total_hours(data.plan_id)
return self._parse_topic(result)
async def get_topic(self, topic_id: str) -> Optional[StudyTopic]:
"""Get a topic by ID."""
full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}"
result = await repo.get(full_topic_id)
if not result:
return None
return self._parse_topic(result)
async def get_topics_for_plan(self, plan_id: str) -> List[StudyTopic]:
"""Get all topics for a plan."""
query = """
SELECT * FROM study_topic
WHERE plan_id = $plan_id
ORDER BY priority DESC, created_at ASC
"""
results = await repo.query(query, {"plan_id": plan_id})
return [self._parse_topic(r) for r in results]
async def update_topic(self, topic_id: str, data: StudyTopicUpdate) -> Optional[StudyTopic]:
"""Update a topic."""
update_data = data.model_dump(exclude_unset=True)
if "difficulty" in update_data and update_data["difficulty"]:
update_data["difficulty"] = update_data["difficulty"].value
if "status" in update_data and update_data["status"]:
update_data["status"] = update_data["status"].value
full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}"
result = await repo.update(full_topic_id, update_data)
if not result:
return None
topic = self._parse_topic(result)
await self._update_plan_progress(topic.plan_id)
return topic
async def delete_topic(self, topic_id: str) -> bool:
"""Delete a topic and its sessions."""
topic = await self.get_topic(topic_id)
if not topic:
return False
await repo.query("DELETE FROM study_session WHERE topic_id = $topic_id", {"topic_id": topic_id})
full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}"
await repo.delete(full_topic_id)
await self._update_plan_total_hours(topic.plan_id)
return True
# Session methods
async def create_session(self, data: StudySessionCreate) -> StudySession:
"""Create a study session."""
session_data = {
"plan_id": data.plan_id,
"topic_id": data.topic_id,
"scheduled_date": data.scheduled_date.isoformat(),
"scheduled_duration_minutes": data.scheduled_duration_minutes,
"session_type": data.session_type.value,
"status": SessionStatus.SCHEDULED.value,
"created_at": datetime.now().isoformat(),
}
result = await repo.create("study_session", session_data)
# repo.create returns a list, extract first element
if isinstance(result, list) and len(result) > 0:
result = result[0]
return self._parse_session(result)
async def get_session(self, session_id: str) -> Optional[StudySession]:
"""Get a session by ID."""
full_session_id = session_id if ":" in session_id else f"study_session:{session_id}"
result = await repo.get(full_session_id)
if not result:
return None
return self._parse_session(result)
async def get_sessions_for_plan(self, plan_id: str) -> List[StudySession]:
"""Get all sessions for a plan."""
query = """
SELECT * FROM study_session
WHERE plan_id = $plan_id
ORDER BY scheduled_date ASC
"""
results = await repo.query(query, {"plan_id": plan_id})
return [self._parse_session(r) for r in results]
async def get_sessions_for_topic(self, topic_id: str) -> List[StudySession]:
"""Get all sessions for a topic."""
query = """
SELECT * FROM study_session
WHERE topic_id = $topic_id
ORDER BY scheduled_date ASC
"""
results = await repo.query(query, {"topic_id": topic_id})
return [self._parse_session(r) for r in results]
async def get_today_sessions(self, plan_id: Optional[str] = None) -> List[StudySession]:
"""Get sessions scheduled for today."""
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_end = today_start + timedelta(days=1)
if plan_id:
query = """
SELECT * FROM study_session
WHERE plan_id = $plan_id
AND scheduled_date >= $start AND scheduled_date < $end
ORDER BY scheduled_date ASC
"""
params = {"plan_id": plan_id, "start": today_start.isoformat(), "end": today_end.isoformat()}
else:
query = """
SELECT * FROM study_session
WHERE scheduled_date >= $start AND scheduled_date < $end
ORDER BY scheduled_date ASC
"""
params = {"start": today_start.isoformat(), "end": today_end.isoformat()}
results = await repo.query(query, params)
return [self._parse_session(r) for r in results]
async def update_session(self, session_id: str, data: StudySessionUpdate) -> Optional[StudySession]:
"""Update a session."""
update_data = data.model_dump(exclude_unset=True)
for date_field in ["scheduled_date", "actual_start", "actual_end"]:
if date_field in update_data and update_data[date_field]:
update_data[date_field] = update_data[date_field].isoformat()
if "session_type" in update_data and update_data["session_type"]:
update_data["session_type"] = update_data["session_type"].value
if "status" in update_data and update_data["status"]:
update_data["status"] = update_data["status"].value
full_session_id = session_id if ":" in session_id else f"study_session:{session_id}"
result = await repo.update(full_session_id, update_data)
if not result:
return None
session = self._parse_session(result)
# Update plan progress if session completed
if data.status == SessionStatus.COMPLETED:
await self._update_plan_progress(session.plan_id)
return session
async def start_session(self, session_id: str) -> Optional[StudySession]:
"""Start a study session."""
return await self.update_session(
session_id,
StudySessionUpdate(
status=SessionStatus.IN_PROGRESS,
actual_start=datetime.now()
)
)
async def complete_session(self, session_id: str, rating: Optional[int] = None, notes: Optional[str] = None) -> Optional[StudySession]:
"""Complete a study session."""
logger.info(f"Completing session {session_id} with rating={rating}, notes={notes}")
# Get the session first to know its topic
existing_session = await self.get_session(session_id)
if not existing_session:
logger.warning(f"Session {session_id} not found")
return None
logger.info(f"Found existing session, topic_id={existing_session.topic_id}, plan_id={existing_session.plan_id}")
session = await self.update_session(
session_id,
StudySessionUpdate(
status=SessionStatus.COMPLETED,
actual_end=datetime.now(),
rating=rating,
notes=notes
)
)
if session:
logger.info(f"Session updated to COMPLETED, now updating topic mastery and plan progress")
# Update topic mastery when session completes
await self._update_topic_mastery(session.topic_id)
# Force progress recalculation
await self._update_plan_progress(session.plan_id)
logger.info(f"Progress update complete for plan {session.plan_id}")
else:
logger.error(f"Failed to update session {session_id}")
return session
async def skip_session(self, session_id: str, reason: Optional[str] = None) -> Optional[StudySession]:
"""Skip a study session."""
return await self.update_session(
session_id,
StudySessionUpdate(
status=SessionStatus.SKIPPED,
notes=reason or "Session skipped"
)
)
async def delete_session(self, session_id: str) -> bool:
"""Delete a session."""
return await repo.delete("study_session", session_id)
# Adjustment methods
async def get_adjustments_for_plan(self, plan_id: str) -> List[PlanAdjustment]:
"""Get all adjustments for a plan."""
query = """
SELECT * FROM plan_adjustment
WHERE plan_id = $plan_id
ORDER BY created_at DESC
"""
results = await repo.query(query, {"plan_id": plan_id})
return [self._parse_adjustment(r) for r in results]
async def respond_to_adjustment(self, adjustment_id: str, accepted: bool) -> bool:
"""Accept or reject a plan adjustment."""
status = AdjustmentStatus.ACCEPTED if accepted else AdjustmentStatus.REJECTED
full_adjustment_id = adjustment_id if ":" in adjustment_id else f"plan_adjustment:{adjustment_id}"
await repo.update(full_adjustment_id, {"status": status.value})
if accepted:
# Apply the adjustment
adjustment = await repo.get(full_adjustment_id)
if adjustment:
await self._apply_adjustment(self._parse_adjustment(adjustment))
return True
# Schedule methods
async def get_weekly_schedule(self, plan_id: str, week_start: Optional[datetime] = None) -> WeeklySchedule:
"""Get weekly schedule for a plan."""
if not week_start:
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
week_start = today - timedelta(days=today.weekday()) # Monday
week_end = week_start + timedelta(days=7)
# Get sessions for the week
query = """
SELECT * FROM study_session
WHERE plan_id = $plan_id
AND scheduled_date >= $start AND scheduled_date < $end
ORDER BY scheduled_date ASC
"""
results = await repo.query(query, {
"plan_id": plan_id,
"start": week_start.isoformat(),
"end": week_end.isoformat()
})
sessions = [self._parse_session(r) for r in results]
# Group by day
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
days = []
total_planned = 0.0
total_completed = 0.0
for i in range(7):
day_date = week_start + timedelta(days=i)
day_sessions = [s for s in sessions if s.scheduled_date.date() == day_date.date()]
day_hours = sum(s.scheduled_duration_minutes for s in day_sessions) / 60
total_planned += day_hours
completed_hours = sum(
(s.actual_end - s.actual_start).total_seconds() / 3600
for s in day_sessions
if s.status == SessionStatus.COMPLETED and s.actual_start and s.actual_end
)
total_completed += completed_hours
days.append(DailySchedule(
date=day_date,
sessions=day_sessions,
total_hours=day_hours,
is_today=day_date.date() == today.date()
))
return WeeklySchedule(
plan_id=plan_id,
week_start=week_start,
week_end=week_end,
days=days,
total_planned_hours=total_planned,
total_completed_hours=total_completed
)
async def get_plan_stats(self, plan_id: str) -> StudyPlanStats:
"""Get statistics for a study plan."""
plan = await self.get_plan(plan_id)
if not plan:
return StudyPlanStats(plan_id=plan_id)
topics = await self.get_topics_for_plan(plan_id)
sessions = await self.get_sessions_for_plan(plan_id)
completed_topics = sum(1 for t in topics if t.status == TopicStatus.COMPLETED)
completed_sessions = [s for s in sessions if s.status == SessionStatus.COMPLETED]
total_planned_hours = plan.total_study_hours
total_completed_hours = sum(
(s.actual_end - s.actual_start).total_seconds() / 3600
for s in completed_sessions
if s.actual_start and s.actual_end
)
ratings = [s.rating for s in completed_sessions if s.rating]
average_rating = sum(ratings) / len(ratings) if ratings else None
now = datetime.now()
days_remaining = _safe_days_diff(plan.deadline, now)
remaining_hours = total_planned_hours - total_completed_hours
hours_per_day_needed = remaining_hours / days_remaining if days_remaining > 0 else remaining_hours
on_track = hours_per_day_needed <= plan.available_hours_per_day * 1.2 # 20% buffer
return StudyPlanStats(
plan_id=plan_id,
total_topics=len(topics),
completed_topics=completed_topics,
total_sessions=len(sessions),
completed_sessions=len(completed_sessions),
total_planned_hours=total_planned_hours,
total_completed_hours=total_completed_hours,
average_rating=average_rating,
on_track=on_track,
days_remaining=days_remaining,
hours_per_day_needed=hours_per_day_needed
)
# AI-powered plan generation
async def generate_plan(self, request: PlanGenerationRequest) -> PlanGenerationResult:
"""Generate a complete study plan using AI."""
warnings = []
# 1. Create the base plan
plan = await self.create_plan(StudyPlanCreate(
notebook_id=request.notebook_id,
title=request.title,
description=request.description,
deadline=request.deadline,
available_hours_per_day=request.available_hours_per_day
))
# 2. Get notebook sources (sources are connected via 'reference' relationship)
logger.info(f"Generating plan for notebook: {request.notebook_id}")
sources_query = """
SELECT in.* as source FROM reference
WHERE out = $notebook_id
FETCH source
"""
notebook_id_value = ensure_record_id(f"notebook:{request.notebook_id.replace('notebook:', '')}")
logger.debug(f"Querying sources with notebook_id: {notebook_id_value}")
sources_result = await repo.query(sources_query, {"notebook_id": notebook_id_value})
logger.info(f"Found {len(sources_result)} sources")
# Extract the actual source data from the query result
sources_data = []
for r in sources_result:
if r.get("source"):
sources_data.append(r["source"])
logger.info(f"Extracted {len(sources_data)} source objects")
if not sources_data:
warnings.append("No sources found in notebook. Add sources to generate topics.")
logger.warning("No sources found in notebook - returning empty plan")
return PlanGenerationResult(
plan=plan,
topics=[],
sessions=[],
total_hours=0,
days_with_sessions=0,
warnings=warnings
)
# 3. Extract topics using AI
source_contents = "\n\n---\n\n".join([
f"Source: {s.get('title', 'Untitled')}\n{s.get('full_text', '')[:2000]}"
for s in sources_data[:10] # Limit to 10 sources
])
logger.debug(f"Source content length: {len(source_contents)} chars")
topic_analysis = await self._extract_topics_with_ai(source_contents, request.focus_areas)
logger.info(f"AI extracted {len(topic_analysis)} topics")
# 4. Create topics
created_topics = []
for analysis in topic_analysis:
topic = await self.create_topic(StudyTopicCreate(
plan_id=plan.id,
name=analysis.name,
description=analysis.description,
difficulty=analysis.difficulty,
estimated_hours=analysis.estimated_hours,
priority=analysis.priority,
source_ids=analysis.source_ids,
prerequisites=analysis.prerequisites
))
created_topics.append(topic)
# 5. Calculate total hours and check feasibility
total_hours = sum(t.estimated_hours for t in created_topics)
days_until_deadline = max(1, _safe_days_diff(request.deadline, datetime.now()))
available_hours = days_until_deadline * request.available_hours_per_day
if total_hours > available_hours:
warnings.append(
f"Estimated {total_hours:.1f} hours needed but only {available_hours:.1f} hours available. "
"Consider extending deadline or increasing daily study time."
)
# 6. Generate sessions
created_sessions = await self._generate_sessions(
plan=plan,
topics=created_topics,
include_reviews=request.include_reviews,
include_practice=request.include_practice
)
# 7. Update plan total hours
await self._update_plan_total_hours(plan.id)
plan = await self.get_plan(plan.id)
# Count days with sessions
session_dates = set(s.scheduled_date.date() for s in created_sessions)
return PlanGenerationResult(
plan=plan,
topics=created_topics,
sessions=created_sessions,
total_hours=total_hours,
days_with_sessions=len(session_dates),
warnings=warnings
)
async def _extract_topics_with_ai(
self,
source_content: str,
focus_areas: Optional[List[str]] = None
) -> List[TopicAnalysis]:
"""Use AI to extract study topics from source content."""
logger.info(f"Starting AI topic extraction, content length: {len(source_content)}")
if not source_content or len(source_content.strip()) < 50:
logger.warning("Source content too short for topic extraction")
# Generate better fallback topics based on available time
return [
TopicAnalysis(
name="Core Concepts Review",
description="Review and understand the main concepts from your materials",
difficulty=TopicDifficulty.MEDIUM,
estimated_hours=4.0,
priority=10,
source_ids=[],
prerequisites=[],
key_concepts=[]
),
TopicAnalysis(
name="Practice & Application",
description="Apply learned concepts through practice exercises",
difficulty=TopicDifficulty.MEDIUM,
estimated_hours=3.0,
priority=8,
source_ids=[],
prerequisites=["Core Concepts Review"],
key_concepts=[]
),
TopicAnalysis(
name="Review & Consolidation",
description="Final review and knowledge consolidation",
difficulty=TopicDifficulty.EASY,
estimated_hours=2.0,
priority=6,
source_ids=[],
prerequisites=["Practice & Application"],
key_concepts=[]
)
]
focus_instruction = ""
if focus_areas:
focus_instruction = f"\n\nFocus on these areas: {', '.join(focus_areas)}"
prompt = f"""Analyze the following content and extract key study topics.
For each topic, provide:
1. name: A clear, concise topic name
2. description: Brief description of what the topic covers
3. difficulty: easy, medium, or hard
4. estimated_hours: How many hours to study this topic (0.5-8 hours)
5. priority: 1-10 (10 being most important/foundational)
6. key_concepts: List of key concepts within this topic
7. prerequisites: Names of other topics that should be studied first
Return as a JSON array of topics. Limit to 5-10 most important topics.
{focus_instruction}
Content:
{source_content}
Return ONLY valid JSON array, no markdown formatting."""
try:
# Use the LangChain model provisioning
logger.debug("Provisioning LangChain model for topic extraction...")
llm = await provision_langchain_model(prompt, None, "study_plan")
logger.debug("Invoking LLM...")
response = await llm.ainvoke(prompt)
response_text = response.content if hasattr(response, 'content') else str(response)
logger.debug(f"LLM response length: {len(response_text)}")
# Parse JSON response - clean markdown if present
response_text = response_text.strip()
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
topics_data = json.loads(response_text)
logger.info(f"Parsed {len(topics_data)} topics from AI response")
topics = []
for t in topics_data:
topics.append(TopicAnalysis(
name=t.get("name", "Unknown Topic"),
description=t.get("description", ""),
difficulty=TopicDifficulty(t.get("difficulty", "medium").lower()),
estimated_hours=float(t.get("estimated_hours", 2.0)),
priority=int(t.get("priority", 5)),
source_ids=[], # Would need to map back to actual sources
prerequisites=t.get("prerequisites", []),
key_concepts=t.get("key_concepts", [])
))
logger.info(f"Successfully created {len(topics)} TopicAnalysis objects")
return topics
except Exception as e:
logger.error(f"Error extracting topics with AI: {e}", exc_info=True)
# Return better fallback topics
return [
TopicAnalysis(
name="Core Material Study",
description="Study the main content and concepts",
difficulty=TopicDifficulty.MEDIUM,
estimated_hours=4.0,
priority=10,
source_ids=[],
prerequisites=[],
key_concepts=[]
),
TopicAnalysis(
name="Deep Dive & Practice",
description="Practice and reinforce understanding",
difficulty=TopicDifficulty.MEDIUM,
estimated_hours=3.0,
priority=8,
source_ids=[],
prerequisites=["Core Material Study"],
key_concepts=[]
),
TopicAnalysis(
name="Final Review",
description="Final review and preparation",
difficulty=TopicDifficulty.EASY,
estimated_hours=2.0,
priority=6,
source_ids=[],
prerequisites=["Deep Dive & Practice"],
key_concepts=[]
)
]
async def _generate_sessions(
self,
plan: StudyPlan,
topics: List[StudyTopic],
include_reviews: bool = True,
include_practice: bool = True
) -> List[StudySession]:
"""Generate study sessions for topics."""
sessions = []
current_date = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0)
# Normalize deadline to naive datetime for comparison
deadline = plan.deadline
if hasattr(deadline, 'tzinfo') and deadline.tzinfo is not None:
deadline = deadline.replace(tzinfo=None)
logger.info(f"Generating sessions: current_date={current_date}, deadline={deadline}, available_hours={plan.available_hours_per_day}")
# If current time is past 9am, start tomorrow
if current_date < datetime.now():
current_date += timedelta(days=1)
daily_hours_used = 0.0
# Sort topics by priority and prerequisites
sorted_topics = sorted(topics, key=lambda t: (-t.priority, t.created_at))
logger.info(f"Processing {len(sorted_topics)} topics for session generation")
for topic in sorted_topics:
logger.info(f"Topic: {topic.name}, estimated_hours={topic.estimated_hours}, difficulty={topic.difficulty}")
# Calculate number of learn sessions needed
hours_remaining = topic.estimated_hours
session_count = 0
while hours_remaining > 0:
session_duration = min(self.SESSION_DURATIONS[SessionType.LEARN], hours_remaining * 60)
# Check if we have time today
if daily_hours_used + (session_duration / 60) > plan.available_hours_per_day:
current_date += timedelta(days=1)
daily_hours_used = 0.0
# Don't schedule past deadline
if current_date >= deadline:
break
# Create learn session
session = await self.create_session(StudySessionCreate(
plan_id=plan.id,
topic_id=topic.id,
scheduled_date=current_date,
scheduled_duration_minutes=int(session_duration),
session_type=SessionType.LEARN
))
sessions.append(session)
hours_remaining -= session_duration / 60
daily_hours_used += session_duration / 60
session_count += 1
# Move to next time slot
current_date += timedelta(hours=1)
# Add review session (3 days after last learn session)
if include_reviews and session_count > 0:
review_date = current_date + timedelta(days=3)
if review_date < deadline:
review_session = await self.create_session(StudySessionCreate(
plan_id=plan.id,
topic_id=topic.id,
scheduled_date=review_date,
scheduled_duration_minutes=self.SESSION_DURATIONS[SessionType.REVIEW],
session_type=SessionType.REVIEW
))
sessions.append(review_session)
# Add practice session for hard topics
if include_practice and topic.difficulty == TopicDifficulty.HARD:
practice_date = current_date + timedelta(days=1)
if practice_date < deadline:
practice_session = await self.create_session(StudySessionCreate(
plan_id=plan.id,
topic_id=topic.id,
scheduled_date=practice_date,
scheduled_duration_minutes=self.SESSION_DURATIONS[SessionType.PRACTICE],
session_type=SessionType.PRACTICE
))
sessions.append(practice_session)
logger.info(f"Generated {len(sessions)} total sessions for plan")
return sessions
# Helper methods
async def _update_plan_total_hours(self, plan_id: str) -> None:
"""Update the total study hours for a plan."""
topics = await self.get_topics_for_plan(plan_id)
total_hours = sum(t.estimated_hours for t in topics)
full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}"
await repo.update(full_plan_id, {
"total_study_hours": total_hours,
"updated_at": datetime.now().isoformat()
})
async def _update_plan_progress(self, plan_id: str) -> None:
"""Update the progress percentage for a plan based on completed sessions."""
sessions = await self.get_sessions_for_plan(plan_id)
if not sessions:
return
# Calculate progress based on completed session time vs total session time
total_minutes = sum(s.scheduled_duration_minutes for s in sessions)
completed_minutes = sum(
s.scheduled_duration_minutes
for s in sessions
if s.status == SessionStatus.COMPLETED
)
progress = (completed_minutes / total_minutes * 100) if total_minutes > 0 else 0
status = PlanStatus.ACTIVE
if progress >= 100:
status = PlanStatus.COMPLETED
elif progress >= 95: # Almost done
status = PlanStatus.ACTIVE
full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}"
await repo.update(full_plan_id, {
"progress_percentage": progress,
"status": status.value,
"updated_at": datetime.now().isoformat()
})
logger.info(f"Updated plan {plan_id} progress to {progress:.1f}%")
async def _update_topic_mastery(self, topic_id: str) -> None:
"""Update topic mastery based on completed sessions."""
topic = await self.get_topic(topic_id)
if not topic:
return
sessions = await self.get_sessions_for_topic(topic_id)
if not sessions:
return
# Calculate mastery based on:
# 1. Percentage of sessions completed
# 2. Average rating (if available)
total_sessions = len(sessions)
completed_sessions = [s for s in sessions if s.status == SessionStatus.COMPLETED]
completion_ratio = len(completed_sessions) / total_sessions if total_sessions > 0 else 0
# Average rating contribution (0-1 scale)
ratings = [s.rating for s in completed_sessions if s.rating]
avg_rating = sum(ratings) / len(ratings) / 5 if ratings else 0.5
# Mastery = 70% completion + 30% quality (rating)
mastery = (completion_ratio * 0.7 + avg_rating * 0.3) * 100
# Determine topic status
new_status = topic.status
if mastery >= 80:
new_status = TopicStatus.COMPLETED
elif mastery > 0:
new_status = TopicStatus.IN_PROGRESS
full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}"
await repo.update(full_topic_id, {
"mastery_level": mastery,
"status": new_status.value
})
logger.info(f"Updated topic {topic_id} mastery to {mastery:.1f}%, status: {new_status.value}")
async def _apply_adjustment(self, adjustment: PlanAdjustment) -> None:
"""Apply an accepted adjustment to the plan."""
if adjustment.adjustment_type == AdjustmentType.EXTEND_DEADLINE:
if adjustment.suggested_value:
await repo.update("study_plan", adjustment.plan_id, {
"deadline": adjustment.suggested_value
})
elif adjustment.adjustment_type == AdjustmentType.INCREASE_HOURS:
if adjustment.suggested_value:
await repo.update("study_plan", adjustment.plan_id, {
"available_hours_per_day": float(adjustment.suggested_value)
})
# Add more adjustment types as needed
def _parse_plan(self, data: Dict[str, Any]) -> StudyPlan:
"""Parse plan from database result."""
# Handle unexpected data types - sometimes SurrealDB returns wrapped results
if isinstance(data, str):
# If it's a string ID, we need to fetch the full record
logger.warning(f"_parse_plan received string instead of dict: {data}")
raise ValueError(f"Invalid plan data format: expected dict, got string '{data}'")
if isinstance(data, list):
if len(data) > 0:
data = data[0]
else:
raise ValueError("Empty list returned from database")
if not isinstance(data, dict):
raise ValueError(f"Invalid plan data format: expected dict, got {type(data)}")
plan_id = data.get("id", "")
if isinstance(plan_id, dict):
plan_id = f"{plan_id.get('tb', '')}:{plan_id.get('id', {}).get('String', '')}"
return StudyPlan(
id=str(plan_id),
notebook_id=data.get("notebook_id", ""),
title=data.get("title", ""),
description=data.get("description"),
deadline=self._parse_datetime(data.get("deadline")),
available_hours_per_day=float(data.get("available_hours_per_day", 2.0)),
total_study_hours=float(data.get("total_study_hours", 0.0)),
status=PlanStatus(data.get("status", "active")),
progress_percentage=float(data.get("progress_percentage", 0.0)),
created_at=self._parse_datetime(data.get("created_at")),
updated_at=self._parse_datetime(data.get("updated_at")),
)
def _parse_topic(self, data: Dict[str, Any]) -> StudyTopic:
"""Parse topic from database result."""
# Handle list input (from repo.update which returns a list)
if isinstance(data, list):
if len(data) > 0:
data = data[0]
else:
raise ValueError("Empty list returned from database")
if not isinstance(data, dict):
raise ValueError(f"Invalid topic data format: expected dict, got {type(data)}")
topic_id = data.get("id", "")
if isinstance(topic_id, dict):
topic_id = f"{topic_id.get('tb', '')}:{topic_id.get('id', {}).get('String', '')}"
return StudyTopic(
id=str(topic_id),
plan_id=data.get("plan_id", ""),
name=data.get("name", ""),
description=data.get("description"),
difficulty=TopicDifficulty(data.get("difficulty", "medium")),
estimated_hours=float(data.get("estimated_hours", 1.0)),
priority=int(data.get("priority", 5)),
source_ids=data.get("source_ids", []),
prerequisites=data.get("prerequisites", []),
status=TopicStatus(data.get("status", "not_started")),
mastery_level=float(data.get("mastery_level", 0.0)),
created_at=self._parse_datetime(data.get("created_at")),
)
def _parse_session(self, data: Dict[str, Any]) -> StudySession:
"""Parse session from database result."""
# Handle list input (from repo.update which returns a list)
if isinstance(data, list):
if len(data) > 0:
data = data[0]
else:
raise ValueError("Empty list returned from database")
if not isinstance(data, dict):
raise ValueError(f"Invalid session data format: expected dict, got {type(data)}")
session_id = data.get("id", "")
if isinstance(session_id, dict):
session_id = f"{session_id.get('tb', '')}:{session_id.get('id', {}).get('String', '')}"
return StudySession(
id=str(session_id),
plan_id=data.get("plan_id", ""),
topic_id=data.get("topic_id", ""),
scheduled_date=self._parse_datetime(data.get("scheduled_date")),
scheduled_duration_minutes=int(data.get("scheduled_duration_minutes", 45)),
actual_start=self._parse_datetime(data.get("actual_start")) if data.get("actual_start") else None,
actual_end=self._parse_datetime(data.get("actual_end")) if data.get("actual_end") else None,
session_type=SessionType(data.get("session_type", "learn")),
status=SessionStatus(data.get("status", "scheduled")),
notes=data.get("notes"),
rating=data.get("rating"),
created_at=self._parse_datetime(data.get("created_at")),
)
def _parse_adjustment(self, data: Dict[str, Any]) -> PlanAdjustment:
"""Parse adjustment from database result."""
# Handle list input (from repo.update which returns a list)
if isinstance(data, list):
if len(data) > 0:
data = data[0]
else:
raise ValueError("Empty list returned from database")
if not isinstance(data, dict):
raise ValueError(f"Invalid adjustment data format: expected dict, got {type(data)}")
adj_id = data.get("id", "")
if isinstance(adj_id, dict):
adj_id = f"{adj_id.get('tb', '')}:{adj_id.get('id', {}).get('String', '')}"
return PlanAdjustment(
id=str(adj_id),
plan_id=data.get("plan_id", ""),
adjustment_type=AdjustmentType(data.get("adjustment_type", "reschedule")),
reason=data.get("reason", ""),
original_value=data.get("original_value"),
suggested_value=data.get("suggested_value"),
status=AdjustmentStatus(data.get("status", "pending")),
created_at=self._parse_datetime(data.get("created_at")),
)
def _parse_datetime(self, value: Any) -> datetime:
"""Parse datetime from various formats."""
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except:
return datetime.now()
return datetime.now()
# Singleton instance
study_plan_service = StudyPlanService()