""" Study Plan Service. AI-generated personalized study schedules based on content and deadlines. """ from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict, Any import json from loguru import logger def _get_now_tz(): """Get current time that matches deadline timezone (UTC or naive based on SurrealDB return).""" return datetime.now(timezone.utc) def _safe_days_diff(deadline, now): """Safely calculate days difference handling timezone mismatches.""" try: # If deadline is timezone-aware and now is naive, make now aware if hasattr(deadline, 'tzinfo') and deadline.tzinfo is not None: if now.tzinfo is None: now = now.replace(tzinfo=timezone.utc) # If deadline is naive and now is aware, make deadline aware elif now.tzinfo is not None: deadline = deadline.replace(tzinfo=timezone.utc) return max(0, (deadline - now).days) except Exception: return 0 from open_notebook.domain.study_plan import ( StudyPlan, StudyPlanCreate, StudyPlanUpdate, PlanStatus, StudyTopic, StudyTopicCreate, StudyTopicUpdate, TopicStatus, TopicDifficulty, StudySession, StudySessionCreate, StudySessionUpdate, SessionStatus, SessionType, PlanAdjustment, AdjustmentType, AdjustmentStatus, StudyPlanFull, StudyPlanWithTopics, StudyPlanStats, DailySchedule, WeeklySchedule, TopicAnalysis, PlanGenerationRequest, PlanGenerationResult ) from open_notebook.database.repository import repo, ensure_record_id from open_notebook.graphs.utils import provision_langchain_model class StudyPlanService: """Service for managing study plans.""" # Difficulty multipliers for time estimation DIFFICULTY_MULTIPLIERS = { TopicDifficulty.EASY: 0.8, TopicDifficulty.MEDIUM: 1.0, TopicDifficulty.HARD: 1.3, } # Session type durations in minutes SESSION_DURATIONS = { SessionType.LEARN: 45, SessionType.REVIEW: 30, SessionType.PRACTICE: 45, SessionType.QUIZ: 30, } async def create_plan(self, data: StudyPlanCreate) -> StudyPlan: """Create a new study plan.""" now = datetime.now() plan_data = { "notebook_id": data.notebook_id, "title": data.title, "description": data.description, "deadline": data.deadline, # Pass datetime directly, not isoformat "available_hours_per_day": data.available_hours_per_day, "total_study_hours": 0.0, "status": PlanStatus.ACTIVE.value, "progress_percentage": 0.0, "created_at": now, # Pass datetime directly "updated_at": now, # Pass datetime directly } result = await repo.create("study_plan", plan_data) logger.debug(f"repo.create result type: {type(result)}, value: {result}") # repo.create returns a list, extract first element if isinstance(result, list) and len(result) > 0: result = result[0] logger.debug(f"After list extraction type: {type(result)}, value: {result}") return self._parse_plan(result) async def get_plan(self, plan_id: str) -> Optional[StudyPlan]: """Get a study plan by ID.""" result = await repo.get(plan_id if ":" in plan_id else f"study_plan:{plan_id}") if not result: return None return self._parse_plan(result) async def get_plan_full(self, plan_id: str) -> Optional[StudyPlanFull]: """Get complete study plan with all related data.""" plan = await self.get_plan(plan_id) if not plan: return None topics = await self.get_topics_for_plan(plan_id) sessions = await self.get_sessions_for_plan(plan_id) adjustments = await self.get_adjustments_for_plan(plan_id) # Calculate additional fields now = datetime.now() days_remaining = _safe_days_diff(plan.deadline, now) completed_hours = sum( (s.actual_end - s.actual_start).total_seconds() / 3600 for s in sessions if s.status == SessionStatus.COMPLETED and s.actual_start and s.actual_end ) # Get upcoming sessions (next 7 days) - use timezone-safe comparison def is_upcoming(s): try: sched = s.scheduled_date n = now # Handle timezone mismatch if hasattr(sched, 'tzinfo') and sched.tzinfo is not None and n.tzinfo is None: n = n.replace(tzinfo=timezone.utc) elif n.tzinfo is not None and (not hasattr(sched, 'tzinfo') or sched.tzinfo is None): sched = sched.replace(tzinfo=timezone.utc) return s.status == SessionStatus.SCHEDULED and sched > n except: return False upcoming = [s for s in sessions if is_upcoming(s)][:10] return StudyPlanFull( **plan.model_dump(), topics=topics, sessions=sessions, adjustments=adjustments, days_remaining=days_remaining, completed_hours=completed_hours, upcoming_sessions=upcoming, ) async def get_plans_for_notebook(self, notebook_id: str) -> List[StudyPlan]: """Get all study plans for a notebook.""" query = """ SELECT * FROM study_plan WHERE notebook_id = $notebook_id ORDER BY created_at DESC """ results = await repo.query(query, {"notebook_id": notebook_id}) return [self._parse_plan(r) for r in results] async def get_active_plans(self) -> List[StudyPlan]: """Get all active study plans.""" query = """ SELECT * FROM study_plan WHERE status = 'active' ORDER BY deadline ASC """ results = await repo.query(query, {}) return [self._parse_plan(r) for r in results] async def update_plan(self, plan_id: str, data: StudyPlanUpdate) -> Optional[StudyPlan]: """Update a study plan.""" update_data = data.model_dump(exclude_unset=True) if "deadline" in update_data and update_data["deadline"]: update_data["deadline"] = update_data["deadline"].isoformat() if "status" in update_data and update_data["status"]: update_data["status"] = update_data["status"].value update_data["updated_at"] = datetime.now().isoformat() full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}" result = await repo.update(full_plan_id, update_data) if not result: return None return self._parse_plan(result) async def delete_plan(self, plan_id: str) -> bool: """Delete a study plan and all related data.""" # Delete related data first await repo.query("DELETE FROM study_session WHERE plan_id = $plan_id", {"plan_id": plan_id}) await repo.query("DELETE FROM study_topic WHERE plan_id = $plan_id", {"plan_id": plan_id}) await repo.query("DELETE FROM plan_adjustment WHERE plan_id = $plan_id", {"plan_id": plan_id}) full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}" return await repo.delete(full_plan_id) # Topic methods async def create_topic(self, data: StudyTopicCreate) -> StudyTopic: """Create a study topic.""" topic_data = { "plan_id": data.plan_id, "name": data.name, "description": data.description, "difficulty": data.difficulty.value, "estimated_hours": data.estimated_hours, "priority": data.priority, "source_ids": data.source_ids, "prerequisites": data.prerequisites, "status": TopicStatus.NOT_STARTED.value, "mastery_level": 0.0, "created_at": datetime.now().isoformat(), } result = await repo.create("study_topic", topic_data) # repo.create returns a list, extract first element if isinstance(result, list) and len(result) > 0: result = result[0] # Update plan total hours await self._update_plan_total_hours(data.plan_id) return self._parse_topic(result) async def get_topic(self, topic_id: str) -> Optional[StudyTopic]: """Get a topic by ID.""" full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}" result = await repo.get(full_topic_id) if not result: return None return self._parse_topic(result) async def get_topics_for_plan(self, plan_id: str) -> List[StudyTopic]: """Get all topics for a plan.""" query = """ SELECT * FROM study_topic WHERE plan_id = $plan_id ORDER BY priority DESC, created_at ASC """ results = await repo.query(query, {"plan_id": plan_id}) return [self._parse_topic(r) for r in results] async def update_topic(self, topic_id: str, data: StudyTopicUpdate) -> Optional[StudyTopic]: """Update a topic.""" update_data = data.model_dump(exclude_unset=True) if "difficulty" in update_data and update_data["difficulty"]: update_data["difficulty"] = update_data["difficulty"].value if "status" in update_data and update_data["status"]: update_data["status"] = update_data["status"].value full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}" result = await repo.update(full_topic_id, update_data) if not result: return None topic = self._parse_topic(result) await self._update_plan_progress(topic.plan_id) return topic async def delete_topic(self, topic_id: str) -> bool: """Delete a topic and its sessions.""" topic = await self.get_topic(topic_id) if not topic: return False await repo.query("DELETE FROM study_session WHERE topic_id = $topic_id", {"topic_id": topic_id}) full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}" await repo.delete(full_topic_id) await self._update_plan_total_hours(topic.plan_id) return True # Session methods async def create_session(self, data: StudySessionCreate) -> StudySession: """Create a study session.""" session_data = { "plan_id": data.plan_id, "topic_id": data.topic_id, "scheduled_date": data.scheduled_date.isoformat(), "scheduled_duration_minutes": data.scheduled_duration_minutes, "session_type": data.session_type.value, "status": SessionStatus.SCHEDULED.value, "created_at": datetime.now().isoformat(), } result = await repo.create("study_session", session_data) # repo.create returns a list, extract first element if isinstance(result, list) and len(result) > 0: result = result[0] return self._parse_session(result) async def get_session(self, session_id: str) -> Optional[StudySession]: """Get a session by ID.""" full_session_id = session_id if ":" in session_id else f"study_session:{session_id}" result = await repo.get(full_session_id) if not result: return None return self._parse_session(result) async def get_sessions_for_plan(self, plan_id: str) -> List[StudySession]: """Get all sessions for a plan.""" query = """ SELECT * FROM study_session WHERE plan_id = $plan_id ORDER BY scheduled_date ASC """ results = await repo.query(query, {"plan_id": plan_id}) return [self._parse_session(r) for r in results] async def get_sessions_for_topic(self, topic_id: str) -> List[StudySession]: """Get all sessions for a topic.""" query = """ SELECT * FROM study_session WHERE topic_id = $topic_id ORDER BY scheduled_date ASC """ results = await repo.query(query, {"topic_id": topic_id}) return [self._parse_session(r) for r in results] async def get_today_sessions(self, plan_id: Optional[str] = None) -> List[StudySession]: """Get sessions scheduled for today.""" today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) today_end = today_start + timedelta(days=1) if plan_id: query = """ SELECT * FROM study_session WHERE plan_id = $plan_id AND scheduled_date >= $start AND scheduled_date < $end ORDER BY scheduled_date ASC """ params = {"plan_id": plan_id, "start": today_start.isoformat(), "end": today_end.isoformat()} else: query = """ SELECT * FROM study_session WHERE scheduled_date >= $start AND scheduled_date < $end ORDER BY scheduled_date ASC """ params = {"start": today_start.isoformat(), "end": today_end.isoformat()} results = await repo.query(query, params) return [self._parse_session(r) for r in results] async def update_session(self, session_id: str, data: StudySessionUpdate) -> Optional[StudySession]: """Update a session.""" update_data = data.model_dump(exclude_unset=True) for date_field in ["scheduled_date", "actual_start", "actual_end"]: if date_field in update_data and update_data[date_field]: update_data[date_field] = update_data[date_field].isoformat() if "session_type" in update_data and update_data["session_type"]: update_data["session_type"] = update_data["session_type"].value if "status" in update_data and update_data["status"]: update_data["status"] = update_data["status"].value full_session_id = session_id if ":" in session_id else f"study_session:{session_id}" result = await repo.update(full_session_id, update_data) if not result: return None session = self._parse_session(result) # Update plan progress if session completed if data.status == SessionStatus.COMPLETED: await self._update_plan_progress(session.plan_id) return session async def start_session(self, session_id: str) -> Optional[StudySession]: """Start a study session.""" return await self.update_session( session_id, StudySessionUpdate( status=SessionStatus.IN_PROGRESS, actual_start=datetime.now() ) ) async def complete_session(self, session_id: str, rating: Optional[int] = None, notes: Optional[str] = None) -> Optional[StudySession]: """Complete a study session.""" logger.info(f"Completing session {session_id} with rating={rating}, notes={notes}") # Get the session first to know its topic existing_session = await self.get_session(session_id) if not existing_session: logger.warning(f"Session {session_id} not found") return None logger.info(f"Found existing session, topic_id={existing_session.topic_id}, plan_id={existing_session.plan_id}") session = await self.update_session( session_id, StudySessionUpdate( status=SessionStatus.COMPLETED, actual_end=datetime.now(), rating=rating, notes=notes ) ) if session: logger.info(f"Session updated to COMPLETED, now updating topic mastery and plan progress") # Update topic mastery when session completes await self._update_topic_mastery(session.topic_id) # Force progress recalculation await self._update_plan_progress(session.plan_id) logger.info(f"Progress update complete for plan {session.plan_id}") else: logger.error(f"Failed to update session {session_id}") return session async def skip_session(self, session_id: str, reason: Optional[str] = None) -> Optional[StudySession]: """Skip a study session.""" return await self.update_session( session_id, StudySessionUpdate( status=SessionStatus.SKIPPED, notes=reason or "Session skipped" ) ) async def delete_session(self, session_id: str) -> bool: """Delete a session.""" return await repo.delete("study_session", session_id) # Adjustment methods async def get_adjustments_for_plan(self, plan_id: str) -> List[PlanAdjustment]: """Get all adjustments for a plan.""" query = """ SELECT * FROM plan_adjustment WHERE plan_id = $plan_id ORDER BY created_at DESC """ results = await repo.query(query, {"plan_id": plan_id}) return [self._parse_adjustment(r) for r in results] async def respond_to_adjustment(self, adjustment_id: str, accepted: bool) -> bool: """Accept or reject a plan adjustment.""" status = AdjustmentStatus.ACCEPTED if accepted else AdjustmentStatus.REJECTED full_adjustment_id = adjustment_id if ":" in adjustment_id else f"plan_adjustment:{adjustment_id}" await repo.update(full_adjustment_id, {"status": status.value}) if accepted: # Apply the adjustment adjustment = await repo.get(full_adjustment_id) if adjustment: await self._apply_adjustment(self._parse_adjustment(adjustment)) return True # Schedule methods async def get_weekly_schedule(self, plan_id: str, week_start: Optional[datetime] = None) -> WeeklySchedule: """Get weekly schedule for a plan.""" if not week_start: today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) week_start = today - timedelta(days=today.weekday()) # Monday week_end = week_start + timedelta(days=7) # Get sessions for the week query = """ SELECT * FROM study_session WHERE plan_id = $plan_id AND scheduled_date >= $start AND scheduled_date < $end ORDER BY scheduled_date ASC """ results = await repo.query(query, { "plan_id": plan_id, "start": week_start.isoformat(), "end": week_end.isoformat() }) sessions = [self._parse_session(r) for r in results] # Group by day today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) days = [] total_planned = 0.0 total_completed = 0.0 for i in range(7): day_date = week_start + timedelta(days=i) day_sessions = [s for s in sessions if s.scheduled_date.date() == day_date.date()] day_hours = sum(s.scheduled_duration_minutes for s in day_sessions) / 60 total_planned += day_hours completed_hours = sum( (s.actual_end - s.actual_start).total_seconds() / 3600 for s in day_sessions if s.status == SessionStatus.COMPLETED and s.actual_start and s.actual_end ) total_completed += completed_hours days.append(DailySchedule( date=day_date, sessions=day_sessions, total_hours=day_hours, is_today=day_date.date() == today.date() )) return WeeklySchedule( plan_id=plan_id, week_start=week_start, week_end=week_end, days=days, total_planned_hours=total_planned, total_completed_hours=total_completed ) async def get_plan_stats(self, plan_id: str) -> StudyPlanStats: """Get statistics for a study plan.""" plan = await self.get_plan(plan_id) if not plan: return StudyPlanStats(plan_id=plan_id) topics = await self.get_topics_for_plan(plan_id) sessions = await self.get_sessions_for_plan(plan_id) completed_topics = sum(1 for t in topics if t.status == TopicStatus.COMPLETED) completed_sessions = [s for s in sessions if s.status == SessionStatus.COMPLETED] total_planned_hours = plan.total_study_hours total_completed_hours = sum( (s.actual_end - s.actual_start).total_seconds() / 3600 for s in completed_sessions if s.actual_start and s.actual_end ) ratings = [s.rating for s in completed_sessions if s.rating] average_rating = sum(ratings) / len(ratings) if ratings else None now = datetime.now() days_remaining = _safe_days_diff(plan.deadline, now) remaining_hours = total_planned_hours - total_completed_hours hours_per_day_needed = remaining_hours / days_remaining if days_remaining > 0 else remaining_hours on_track = hours_per_day_needed <= plan.available_hours_per_day * 1.2 # 20% buffer return StudyPlanStats( plan_id=plan_id, total_topics=len(topics), completed_topics=completed_topics, total_sessions=len(sessions), completed_sessions=len(completed_sessions), total_planned_hours=total_planned_hours, total_completed_hours=total_completed_hours, average_rating=average_rating, on_track=on_track, days_remaining=days_remaining, hours_per_day_needed=hours_per_day_needed ) # AI-powered plan generation async def generate_plan(self, request: PlanGenerationRequest) -> PlanGenerationResult: """Generate a complete study plan using AI.""" warnings = [] # 1. Create the base plan plan = await self.create_plan(StudyPlanCreate( notebook_id=request.notebook_id, title=request.title, description=request.description, deadline=request.deadline, available_hours_per_day=request.available_hours_per_day )) # 2. Get notebook sources (sources are connected via 'reference' relationship) logger.info(f"Generating plan for notebook: {request.notebook_id}") sources_query = """ SELECT in.* as source FROM reference WHERE out = $notebook_id FETCH source """ notebook_id_value = ensure_record_id(f"notebook:{request.notebook_id.replace('notebook:', '')}") logger.debug(f"Querying sources with notebook_id: {notebook_id_value}") sources_result = await repo.query(sources_query, {"notebook_id": notebook_id_value}) logger.info(f"Found {len(sources_result)} sources") # Extract the actual source data from the query result sources_data = [] for r in sources_result: if r.get("source"): sources_data.append(r["source"]) logger.info(f"Extracted {len(sources_data)} source objects") if not sources_data: warnings.append("No sources found in notebook. Add sources to generate topics.") logger.warning("No sources found in notebook - returning empty plan") return PlanGenerationResult( plan=plan, topics=[], sessions=[], total_hours=0, days_with_sessions=0, warnings=warnings ) # 3. Extract topics using AI source_contents = "\n\n---\n\n".join([ f"Source: {s.get('title', 'Untitled')}\n{s.get('full_text', '')[:2000]}" for s in sources_data[:10] # Limit to 10 sources ]) logger.debug(f"Source content length: {len(source_contents)} chars") topic_analysis = await self._extract_topics_with_ai(source_contents, request.focus_areas) logger.info(f"AI extracted {len(topic_analysis)} topics") # 4. Create topics created_topics = [] for analysis in topic_analysis: topic = await self.create_topic(StudyTopicCreate( plan_id=plan.id, name=analysis.name, description=analysis.description, difficulty=analysis.difficulty, estimated_hours=analysis.estimated_hours, priority=analysis.priority, source_ids=analysis.source_ids, prerequisites=analysis.prerequisites )) created_topics.append(topic) # 5. Calculate total hours and check feasibility total_hours = sum(t.estimated_hours for t in created_topics) days_until_deadline = max(1, _safe_days_diff(request.deadline, datetime.now())) available_hours = days_until_deadline * request.available_hours_per_day if total_hours > available_hours: warnings.append( f"Estimated {total_hours:.1f} hours needed but only {available_hours:.1f} hours available. " "Consider extending deadline or increasing daily study time." ) # 6. Generate sessions created_sessions = await self._generate_sessions( plan=plan, topics=created_topics, include_reviews=request.include_reviews, include_practice=request.include_practice ) # 7. Update plan total hours await self._update_plan_total_hours(plan.id) plan = await self.get_plan(plan.id) # Count days with sessions session_dates = set(s.scheduled_date.date() for s in created_sessions) return PlanGenerationResult( plan=plan, topics=created_topics, sessions=created_sessions, total_hours=total_hours, days_with_sessions=len(session_dates), warnings=warnings ) async def _extract_topics_with_ai( self, source_content: str, focus_areas: Optional[List[str]] = None ) -> List[TopicAnalysis]: """Use AI to extract study topics from source content.""" logger.info(f"Starting AI topic extraction, content length: {len(source_content)}") if not source_content or len(source_content.strip()) < 50: logger.warning("Source content too short for topic extraction") # Generate better fallback topics based on available time return [ TopicAnalysis( name="Core Concepts Review", description="Review and understand the main concepts from your materials", difficulty=TopicDifficulty.MEDIUM, estimated_hours=4.0, priority=10, source_ids=[], prerequisites=[], key_concepts=[] ), TopicAnalysis( name="Practice & Application", description="Apply learned concepts through practice exercises", difficulty=TopicDifficulty.MEDIUM, estimated_hours=3.0, priority=8, source_ids=[], prerequisites=["Core Concepts Review"], key_concepts=[] ), TopicAnalysis( name="Review & Consolidation", description="Final review and knowledge consolidation", difficulty=TopicDifficulty.EASY, estimated_hours=2.0, priority=6, source_ids=[], prerequisites=["Practice & Application"], key_concepts=[] ) ] focus_instruction = "" if focus_areas: focus_instruction = f"\n\nFocus on these areas: {', '.join(focus_areas)}" prompt = f"""Analyze the following content and extract key study topics. For each topic, provide: 1. name: A clear, concise topic name 2. description: Brief description of what the topic covers 3. difficulty: easy, medium, or hard 4. estimated_hours: How many hours to study this topic (0.5-8 hours) 5. priority: 1-10 (10 being most important/foundational) 6. key_concepts: List of key concepts within this topic 7. prerequisites: Names of other topics that should be studied first Return as a JSON array of topics. Limit to 5-10 most important topics. {focus_instruction} Content: {source_content} Return ONLY valid JSON array, no markdown formatting.""" try: # Use the LangChain model provisioning logger.debug("Provisioning LangChain model for topic extraction...") llm = await provision_langchain_model(prompt, None, "study_plan") logger.debug("Invoking LLM...") response = await llm.ainvoke(prompt) response_text = response.content if hasattr(response, 'content') else str(response) logger.debug(f"LLM response length: {len(response_text)}") # Parse JSON response - clean markdown if present response_text = response_text.strip() if response_text.startswith("```"): lines = response_text.split("\n") response_text = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) topics_data = json.loads(response_text) logger.info(f"Parsed {len(topics_data)} topics from AI response") topics = [] for t in topics_data: topics.append(TopicAnalysis( name=t.get("name", "Unknown Topic"), description=t.get("description", ""), difficulty=TopicDifficulty(t.get("difficulty", "medium").lower()), estimated_hours=float(t.get("estimated_hours", 2.0)), priority=int(t.get("priority", 5)), source_ids=[], # Would need to map back to actual sources prerequisites=t.get("prerequisites", []), key_concepts=t.get("key_concepts", []) )) logger.info(f"Successfully created {len(topics)} TopicAnalysis objects") return topics except Exception as e: logger.error(f"Error extracting topics with AI: {e}", exc_info=True) # Return better fallback topics return [ TopicAnalysis( name="Core Material Study", description="Study the main content and concepts", difficulty=TopicDifficulty.MEDIUM, estimated_hours=4.0, priority=10, source_ids=[], prerequisites=[], key_concepts=[] ), TopicAnalysis( name="Deep Dive & Practice", description="Practice and reinforce understanding", difficulty=TopicDifficulty.MEDIUM, estimated_hours=3.0, priority=8, source_ids=[], prerequisites=["Core Material Study"], key_concepts=[] ), TopicAnalysis( name="Final Review", description="Final review and preparation", difficulty=TopicDifficulty.EASY, estimated_hours=2.0, priority=6, source_ids=[], prerequisites=["Deep Dive & Practice"], key_concepts=[] ) ] async def _generate_sessions( self, plan: StudyPlan, topics: List[StudyTopic], include_reviews: bool = True, include_practice: bool = True ) -> List[StudySession]: """Generate study sessions for topics.""" sessions = [] current_date = datetime.now().replace(hour=9, minute=0, second=0, microsecond=0) # Normalize deadline to naive datetime for comparison deadline = plan.deadline if hasattr(deadline, 'tzinfo') and deadline.tzinfo is not None: deadline = deadline.replace(tzinfo=None) logger.info(f"Generating sessions: current_date={current_date}, deadline={deadline}, available_hours={plan.available_hours_per_day}") # If current time is past 9am, start tomorrow if current_date < datetime.now(): current_date += timedelta(days=1) daily_hours_used = 0.0 # Sort topics by priority and prerequisites sorted_topics = sorted(topics, key=lambda t: (-t.priority, t.created_at)) logger.info(f"Processing {len(sorted_topics)} topics for session generation") for topic in sorted_topics: logger.info(f"Topic: {topic.name}, estimated_hours={topic.estimated_hours}, difficulty={topic.difficulty}") # Calculate number of learn sessions needed hours_remaining = topic.estimated_hours session_count = 0 while hours_remaining > 0: session_duration = min(self.SESSION_DURATIONS[SessionType.LEARN], hours_remaining * 60) # Check if we have time today if daily_hours_used + (session_duration / 60) > plan.available_hours_per_day: current_date += timedelta(days=1) daily_hours_used = 0.0 # Don't schedule past deadline if current_date >= deadline: break # Create learn session session = await self.create_session(StudySessionCreate( plan_id=plan.id, topic_id=topic.id, scheduled_date=current_date, scheduled_duration_minutes=int(session_duration), session_type=SessionType.LEARN )) sessions.append(session) hours_remaining -= session_duration / 60 daily_hours_used += session_duration / 60 session_count += 1 # Move to next time slot current_date += timedelta(hours=1) # Add review session (3 days after last learn session) if include_reviews and session_count > 0: review_date = current_date + timedelta(days=3) if review_date < deadline: review_session = await self.create_session(StudySessionCreate( plan_id=plan.id, topic_id=topic.id, scheduled_date=review_date, scheduled_duration_minutes=self.SESSION_DURATIONS[SessionType.REVIEW], session_type=SessionType.REVIEW )) sessions.append(review_session) # Add practice session for hard topics if include_practice and topic.difficulty == TopicDifficulty.HARD: practice_date = current_date + timedelta(days=1) if practice_date < deadline: practice_session = await self.create_session(StudySessionCreate( plan_id=plan.id, topic_id=topic.id, scheduled_date=practice_date, scheduled_duration_minutes=self.SESSION_DURATIONS[SessionType.PRACTICE], session_type=SessionType.PRACTICE )) sessions.append(practice_session) logger.info(f"Generated {len(sessions)} total sessions for plan") return sessions # Helper methods async def _update_plan_total_hours(self, plan_id: str) -> None: """Update the total study hours for a plan.""" topics = await self.get_topics_for_plan(plan_id) total_hours = sum(t.estimated_hours for t in topics) full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}" await repo.update(full_plan_id, { "total_study_hours": total_hours, "updated_at": datetime.now().isoformat() }) async def _update_plan_progress(self, plan_id: str) -> None: """Update the progress percentage for a plan based on completed sessions.""" sessions = await self.get_sessions_for_plan(plan_id) if not sessions: return # Calculate progress based on completed session time vs total session time total_minutes = sum(s.scheduled_duration_minutes for s in sessions) completed_minutes = sum( s.scheduled_duration_minutes for s in sessions if s.status == SessionStatus.COMPLETED ) progress = (completed_minutes / total_minutes * 100) if total_minutes > 0 else 0 status = PlanStatus.ACTIVE if progress >= 100: status = PlanStatus.COMPLETED elif progress >= 95: # Almost done status = PlanStatus.ACTIVE full_plan_id = plan_id if ":" in plan_id else f"study_plan:{plan_id}" await repo.update(full_plan_id, { "progress_percentage": progress, "status": status.value, "updated_at": datetime.now().isoformat() }) logger.info(f"Updated plan {plan_id} progress to {progress:.1f}%") async def _update_topic_mastery(self, topic_id: str) -> None: """Update topic mastery based on completed sessions.""" topic = await self.get_topic(topic_id) if not topic: return sessions = await self.get_sessions_for_topic(topic_id) if not sessions: return # Calculate mastery based on: # 1. Percentage of sessions completed # 2. Average rating (if available) total_sessions = len(sessions) completed_sessions = [s for s in sessions if s.status == SessionStatus.COMPLETED] completion_ratio = len(completed_sessions) / total_sessions if total_sessions > 0 else 0 # Average rating contribution (0-1 scale) ratings = [s.rating for s in completed_sessions if s.rating] avg_rating = sum(ratings) / len(ratings) / 5 if ratings else 0.5 # Mastery = 70% completion + 30% quality (rating) mastery = (completion_ratio * 0.7 + avg_rating * 0.3) * 100 # Determine topic status new_status = topic.status if mastery >= 80: new_status = TopicStatus.COMPLETED elif mastery > 0: new_status = TopicStatus.IN_PROGRESS full_topic_id = topic_id if ":" in topic_id else f"study_topic:{topic_id}" await repo.update(full_topic_id, { "mastery_level": mastery, "status": new_status.value }) logger.info(f"Updated topic {topic_id} mastery to {mastery:.1f}%, status: {new_status.value}") async def _apply_adjustment(self, adjustment: PlanAdjustment) -> None: """Apply an accepted adjustment to the plan.""" if adjustment.adjustment_type == AdjustmentType.EXTEND_DEADLINE: if adjustment.suggested_value: await repo.update("study_plan", adjustment.plan_id, { "deadline": adjustment.suggested_value }) elif adjustment.adjustment_type == AdjustmentType.INCREASE_HOURS: if adjustment.suggested_value: await repo.update("study_plan", adjustment.plan_id, { "available_hours_per_day": float(adjustment.suggested_value) }) # Add more adjustment types as needed def _parse_plan(self, data: Dict[str, Any]) -> StudyPlan: """Parse plan from database result.""" # Handle unexpected data types - sometimes SurrealDB returns wrapped results if isinstance(data, str): # If it's a string ID, we need to fetch the full record logger.warning(f"_parse_plan received string instead of dict: {data}") raise ValueError(f"Invalid plan data format: expected dict, got string '{data}'") if isinstance(data, list): if len(data) > 0: data = data[0] else: raise ValueError("Empty list returned from database") if not isinstance(data, dict): raise ValueError(f"Invalid plan data format: expected dict, got {type(data)}") plan_id = data.get("id", "") if isinstance(plan_id, dict): plan_id = f"{plan_id.get('tb', '')}:{plan_id.get('id', {}).get('String', '')}" return StudyPlan( id=str(plan_id), notebook_id=data.get("notebook_id", ""), title=data.get("title", ""), description=data.get("description"), deadline=self._parse_datetime(data.get("deadline")), available_hours_per_day=float(data.get("available_hours_per_day", 2.0)), total_study_hours=float(data.get("total_study_hours", 0.0)), status=PlanStatus(data.get("status", "active")), progress_percentage=float(data.get("progress_percentage", 0.0)), created_at=self._parse_datetime(data.get("created_at")), updated_at=self._parse_datetime(data.get("updated_at")), ) def _parse_topic(self, data: Dict[str, Any]) -> StudyTopic: """Parse topic from database result.""" # Handle list input (from repo.update which returns a list) if isinstance(data, list): if len(data) > 0: data = data[0] else: raise ValueError("Empty list returned from database") if not isinstance(data, dict): raise ValueError(f"Invalid topic data format: expected dict, got {type(data)}") topic_id = data.get("id", "") if isinstance(topic_id, dict): topic_id = f"{topic_id.get('tb', '')}:{topic_id.get('id', {}).get('String', '')}" return StudyTopic( id=str(topic_id), plan_id=data.get("plan_id", ""), name=data.get("name", ""), description=data.get("description"), difficulty=TopicDifficulty(data.get("difficulty", "medium")), estimated_hours=float(data.get("estimated_hours", 1.0)), priority=int(data.get("priority", 5)), source_ids=data.get("source_ids", []), prerequisites=data.get("prerequisites", []), status=TopicStatus(data.get("status", "not_started")), mastery_level=float(data.get("mastery_level", 0.0)), created_at=self._parse_datetime(data.get("created_at")), ) def _parse_session(self, data: Dict[str, Any]) -> StudySession: """Parse session from database result.""" # Handle list input (from repo.update which returns a list) if isinstance(data, list): if len(data) > 0: data = data[0] else: raise ValueError("Empty list returned from database") if not isinstance(data, dict): raise ValueError(f"Invalid session data format: expected dict, got {type(data)}") session_id = data.get("id", "") if isinstance(session_id, dict): session_id = f"{session_id.get('tb', '')}:{session_id.get('id', {}).get('String', '')}" return StudySession( id=str(session_id), plan_id=data.get("plan_id", ""), topic_id=data.get("topic_id", ""), scheduled_date=self._parse_datetime(data.get("scheduled_date")), scheduled_duration_minutes=int(data.get("scheduled_duration_minutes", 45)), actual_start=self._parse_datetime(data.get("actual_start")) if data.get("actual_start") else None, actual_end=self._parse_datetime(data.get("actual_end")) if data.get("actual_end") else None, session_type=SessionType(data.get("session_type", "learn")), status=SessionStatus(data.get("status", "scheduled")), notes=data.get("notes"), rating=data.get("rating"), created_at=self._parse_datetime(data.get("created_at")), ) def _parse_adjustment(self, data: Dict[str, Any]) -> PlanAdjustment: """Parse adjustment from database result.""" # Handle list input (from repo.update which returns a list) if isinstance(data, list): if len(data) > 0: data = data[0] else: raise ValueError("Empty list returned from database") if not isinstance(data, dict): raise ValueError(f"Invalid adjustment data format: expected dict, got {type(data)}") adj_id = data.get("id", "") if isinstance(adj_id, dict): adj_id = f"{adj_id.get('tb', '')}:{adj_id.get('id', {}).get('String', '')}" return PlanAdjustment( id=str(adj_id), plan_id=data.get("plan_id", ""), adjustment_type=AdjustmentType(data.get("adjustment_type", "reschedule")), reason=data.get("reason", ""), original_value=data.get("original_value"), suggested_value=data.get("suggested_value"), status=AdjustmentStatus(data.get("status", "pending")), created_at=self._parse_datetime(data.get("created_at")), ) def _parse_datetime(self, value: Any) -> datetime: """Parse datetime from various formats.""" if isinstance(value, datetime): return value if isinstance(value, str): try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except: return datetime.now() return datetime.now() # Singleton instance study_plan_service = StudyPlanService()