import json import os import copy import sqlite3 import operator import streamlit as st from math import ceil from datetime import datetime, timedelta from dateutil import parser from collections import defaultdict from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage from typing import Annotated, List from pydantic import BaseModel, Field from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.constants import Send # Page configuration st.set_page_config(layout="wide", page_title="JEE Roadmap Planner") # Initialize session state variables if "data" not in st.session_state: st.session_state.data = None if "full_roadmap" not in st.session_state: st.session_state.full_roadmap = None if "report_data" not in st.session_state: st.session_state.report_data = None if "final_report" not in st.session_state: st.session_state.final_report = None if "updated_roadmap" not in st.session_state: st.session_state.updated_roadmap = None # Navigation sidebar setup st.sidebar.title("JEE Roadmap Planner") page = st.sidebar.radio("Navigation", ["Home", "Roadmap Manager", "Task Analysis","Roadmap Chatbot"]) # AGENT 1 def load_initial_data(): with st.spinner("Loading roadmap data..."): try: with open('fourdayRoadmap.json', 'r') as file: data = json.load(file) st.session_state.data = data with open("full_roadmap.json", 'r') as file: data = json.load(file) st.session_state.full_roadmap = data with open("dependencies.json", 'r') as file: data = json.load(file) st.session_state.dependencies = data st.success("Data loaded successfully!") return True except Exception as e: st.error(f"Error loading data: {e}") return False # Function to mark tasks as incomplete def process_task_completion_data(): with st.spinner("Processing task completion data..."): data = st.session_state.data for day in data["schedule"]: for subject in day["subjects"]: for task in subject["tasks"]: task["task_completed"] = False task["completion_timestamp"] = None task["rescheduled"] = 0 st.session_state.data = data st.success("Task completion data processed!") def add_test(roadmap, date, physics = [], chemistry = [], maths = []): date = parser.parse(date).strftime("%Y-%m-%d") for i, day in enumerate(roadmap["schedule"]): if day["date"] == date: roadmap["schedule"][i] = { "dayNumber": day['dayNumber'], "date": date, "test_portion": [ { "name": "Physics", "chapters": physics }, { "name": "Chemistry", "chapters": chemistry }, { "name": "Maths", "chapters": maths } ], "subjects": day['subjects'] } return roadmap def check_tot_time(day): tot_time = 0 for subject in day['subjects']: for task in subject["tasks"]: tot_time += float(task['time'].split(" ")[0]) return tot_time def extract_tasks(roadmap, test_portions=None, dependencies=None): incomplete_tasks_by_subject = defaultdict(list) subjectwise_tasks = defaultdict(list) prev_day = roadmap[0] for subject in prev_day["subjects"]: subject_name = subject["name"] tasks = subject["tasks"] # Separate completed and incomplete tasks incomplete_tasks = [task for task in tasks if task['task_completed'] == False] completed_tasks = [task for task in tasks if task['task_completed'] == True] for task in incomplete_tasks: task['rescheduled'] += 1 # Store incomplete tasks per subject if incomplete_tasks: incomplete_tasks_by_subject[subject_name].extend(incomplete_tasks) # Keep only completed tasks in the previous day subject["tasks"] = completed_tasks for day_index, day in enumerate(roadmap[1:]): for subject in day["subjects"]: subject_name = subject["name"] subjectwise_tasks[subject_name].extend(subject["tasks"]) if test_portions and dependencies: dependent_tasks_by_subject = defaultdict(list) dependent_chapters = set() for subject in test_portions: sub_name = subject['name'] for chapter in subject['chapters']: if chapter in dependencies[sub_name]: dependent_chapters.update(dependencies[sub_name][chapter]) for subject, tasks in subjectwise_tasks.items(): retained_tasks = [] for task in tasks: if task.get("ChapterName") in dependent_chapters: dependent_tasks_by_subject[subject].append(task) else: retained_tasks.append(task) subjectwise_tasks[subject] = retained_tasks for subject, tasks in incomplete_tasks_by_subject.items(): retained_tasks = [] for task in tasks: if task.get("ChapterName") in dependent_chapters: dependent_tasks_by_subject[subject].append(task) else: retained_tasks.append(task) incomplete_tasks_by_subject[subject] = retained_tasks return roadmap, subjectwise_tasks, incomplete_tasks_by_subject, dependent_tasks_by_subject return roadmap, subjectwise_tasks, incomplete_tasks_by_subject def get_task_time(task): return round(float(task['time'].split(" ")[0]), 3) def calculate_time_distribution(roadmap, incomplete_tasks, incomplete_tasks_by_subject, max_hours_per_day): total_hours = 0 num_days = len(roadmap[1:]) extra_day=False extra_hours = 0 if incomplete_tasks_by_subject: for subject in incomplete_tasks_by_subject: for task in incomplete_tasks_by_subject[subject]: extra_hours += get_task_time(task) extra_day=True for subject in incomplete_tasks: for task in incomplete_tasks[subject]: total_hours += get_task_time(task) for day in roadmap[1:]: if day['dayNumber'] >= 550: max_hours_per_day = 16 for subject in day["subjects"]: for task in subject["tasks"]: total_hours += get_task_time(task) if num_days <= 0: return [], [total_hours + extra_hours] if total_hours+extra_hours > 0 else [] max_possible_hours = num_days * max_hours_per_day if total_hours <= max_possible_hours and not extra_day: # Calculate base hours per day (minimum) base_hours = total_hours // num_days # Calculate remaining hours remaining_hours = total_hours - (base_hours * num_days) # Start with all days having base hours distribution = [base_hours] * num_days # Distribute remaining hours starting from the last day for i in range(num_days - 1, -1, -1): if remaining_hours > 0: additional = min(1, remaining_hours, max_hours_per_day - distribution[i]) distribution[i] += additional remaining_hours -= additional return distribution, [] # Otherwise, max out all current days and prepare for extra days distribution = [max_hours_per_day] * num_days remaining_hours = total_hours - max_possible_hours if extra_day: base_hours = total_hours // num_days remaining_hours = total_hours - (base_hours * num_days) distribution = [base_hours] * num_days for i in range(num_days - 1, -1, -1): if remaining_hours > 0: additional = min(1, remaining_hours, max_hours_per_day - distribution[i]) distribution[i] += additional remaining_hours -= additional remaining_hours = extra_hours extra_distribution = [] while remaining_hours > 0: hours = min(max_hours_per_day, remaining_hours) extra_distribution.append(hours) remaining_hours -= hours return distribution, extra_distribution def add_tasks_for_extra_days(subject_all_tasks, incomplete_tasks, extra_day_tasks, extra_distribution, ratio, max_hours_per_day): subject_names = list(subject_all_tasks.keys()) or list(incomplete_tasks.keys()) has_incomplete_tasks = any(tasks for tasks in incomplete_tasks.values()) for i, target_time in enumerate(extra_distribution): day_time = 0 if subject_all_tasks: regular_task_limit = ceil(target_time * ratio[0] / 100) if has_incomplete_tasks else target_time incomplete_task_limit = ceil(target_time * ratio[1] / 100) if has_incomplete_tasks else 0 else: regular_task_limit = 0 incomplete_task_limit = target_time # Create a new day with subjects new_day = {"subjects": [{"name": n, "tasks": []} for n in subject_names]} # Step 1: Allocate regular tasks up to their limit regular_time = 0 while regular_time < regular_task_limit and day_time < max_hours_per_day: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if regular_time + task_time <= regular_task_limit and day_time + task_time <= max_hours_per_day: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) regular_time += task_time day_time += task_time added = True if not added: break # Step 2: Allocate incomplete tasks up to their limit incomplete_time = 0 while incomplete_time < incomplete_task_limit and day_time < max_hours_per_day: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not incomplete_tasks[subject_name]: continue next_task = incomplete_tasks[subject_name][0] task_time = get_task_time(next_task) if incomplete_time + task_time <= incomplete_task_limit and day_time + task_time <= max_hours_per_day: subject["tasks"].append(incomplete_tasks[subject_name].pop(0)) incomplete_time += task_time day_time += task_time added = True if not added: break # Step 3: Use remaining time for additional regular tasks if available if day_time < target_time: while day_time < target_time: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if day_time + task_time <= max_hours_per_day: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) day_time += task_time added = True if day_time > target_time: break if not added: break if i == len(extra_distribution) - 1: for subject in new_day["subjects"]: subject_name = subject["name"] # Add remaining regular tasks while subject_all_tasks[subject_name]: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) # Add remaining incomplete tasks while incomplete_tasks[subject_name]: subject["tasks"].append(incomplete_tasks[subject_name].pop(0)) extra_day_tasks.append(new_day) return extra_day_tasks def shift_the_roadmap(roadmap, max_hours_per_day, ratio=(80, 20), dependencies=None, test_portions=None): roadmap = copy.deepcopy(roadmap) # Extract tasks based on ratio mode if ratio == (80, 20): roadmap, subject_all_tasks, incomplete_tasks = extract_tasks(roadmap) dependent_tasks = None incomplete_tasks_by_subject = None else: roadmap, subject_all_tasks, incomplete_tasks_by_subject, dependent_tasks = extract_tasks( roadmap, test_portions, dependencies ) incomplete_tasks = dependent_tasks # Distribute time across days time_distribution, extra_distribution = calculate_time_distribution(roadmap, incomplete_tasks, incomplete_tasks_by_subject, max_hours_per_day) # Check if there are any incomplete tasks has_incomplete_tasks = any(tasks for tasks in incomplete_tasks.values()) # Prepare containers for task assignments pending_regular_tasks = defaultdict(lambda: defaultdict(list)) pending_incomplete_tasks = defaultdict(lambda: defaultdict(list)) # Redistribute tasks for each day for day_index, day in enumerate(roadmap[1:], 1): target_time = time_distribution[day_index - 1] day_time = 0 # Set task limits based on whether incomplete tasks exist regular_task_limit = ceil(target_time * ratio[0] / 100) if has_incomplete_tasks else target_time incomplete_task_limit = ceil(target_time * ratio[1] / 100) if has_incomplete_tasks else 0 # Step 1: Allocate regular tasks up to their limit (either 80% or 100%) regular_time = 0 while regular_time < regular_task_limit and day_time < max_hours_per_day: added = False for subject in day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if regular_time + task_time <= regular_task_limit and day_time + task_time <= max_hours_per_day: pending_regular_tasks[day_index][subject_name].append(subject_all_tasks[subject_name].pop(0)) regular_time += task_time day_time += task_time added = True if not added: break # Step 2: Allocate incomplete tasks if they exist if has_incomplete_tasks and incomplete_task_limit > 0: incomplete_time = 0 while incomplete_time < incomplete_task_limit and day_time < max_hours_per_day: added = False for subject in day["subjects"]: subject_name = subject["name"] if not incomplete_tasks[subject_name]: continue next_task = incomplete_tasks[subject_name][0] task_time = get_task_time(next_task) if incomplete_time + task_time <= incomplete_task_limit and day_time + task_time <= max_hours_per_day: pending_incomplete_tasks[day_index][subject_name].append(incomplete_tasks[subject_name].pop(0)) incomplete_time += task_time day_time += task_time added = True # Check if we've depleted all incomplete tasks if not any(tasks for tasks in incomplete_tasks.values()): has_incomplete_tasks = False break if not added: break # Step 3: Use remaining time for additional regular tasks if available if day_time < target_time: while day_time < target_time: added = False for subject in day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if day_time + task_time <= max_hours_per_day: pending_regular_tasks[day_index][subject_name].append(subject_all_tasks[subject_name].pop(0)) day_time += task_time added = True if day_time > target_time: break if not added: break extra_day_tasks = [] if extra_distribution: if incomplete_tasks_by_subject: for subject, tasks in incomplete_tasks_by_subject.items(): incomplete_tasks[subject].extend(tasks) extra_day_tasks = add_tasks_for_extra_days(subject_all_tasks, incomplete_tasks, extra_day_tasks, extra_distribution, (80, 20), max_hours_per_day) # Final appending of tasks for day_index, day in enumerate(roadmap[1:], 1): for subject in day["subjects"]: subject_name = subject["name"] subject["tasks"] = ( pending_regular_tasks[day_index][subject_name] + pending_incomplete_tasks[day_index][subject_name] ) else: for day_index, day in enumerate(roadmap[1:], 1): if day_index == len(roadmap) - 1: for subject in day["subjects"]: subject_name = subject["name"] # Add remaining regular tasks while subject_all_tasks[subject_name]: task = subject_all_tasks[subject_name].pop(0) pending_regular_tasks[day_index][subject_name].append(task) # Add remaining incomplete tasks while incomplete_tasks[subject_name]: task = incomplete_tasks[subject_name].pop(0) pending_incomplete_tasks[day_index][subject_name].append(task) # Final appending of tasks for subject in day["subjects"]: subject_name = subject["name"] subject["tasks"] = ( pending_regular_tasks[day_index][subject_name] + pending_incomplete_tasks[day_index][subject_name] ) return roadmap, extra_day_tasks def update_roadmap(current_roadmap, current_dayNumber, max_hours_per_day, dependencies, no_of_revision_days = 2): if current_dayNumber == 1: return current_roadmap current_roadmap = copy.deepcopy(current_roadmap) day_index = current_dayNumber-2 test_index = None # Check if a test exists in any specified day for day in current_roadmap['schedule']: if 'test_portion' in day: test_index = current_roadmap['schedule'].index(day) if test_index > (current_dayNumber-1): time_to_test = test_index - (current_dayNumber-1) test_portions = day['test_portion'] break else: test_index = None break extra_rev_days = max(no_of_revision_days - 2, 0) # Determine scheduling strategy based on time to test if test_index is not None: if 30 >= time_to_test > 25: # Far from test: Normal scheduling with backlog reduction before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-25)] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-25):] max_hours_per_day = 16 ratio = (80, 20) test_portions = None dependencies = None elif 25 >= time_to_test > (10 + extra_rev_days): # Mid-range: focus on current coursework before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-(10+extra_rev_days))] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-(10+extra_rev_days)):] max_hours_per_day = 16 ratio = (80, 20) test_portions = None dependencies = None elif (10 + extra_rev_days) >= time_to_test > no_of_revision_days: # Approaching test: Balance current work with test preparation before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-no_of_revision_days)] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-no_of_revision_days):] max_hours_per_day = 16 ratio = (50, 50) elif 0 < time_to_test <= no_of_revision_days: # Final revision period: Focus entirely on test preparation before_checkpoint = current_roadmap['schedule'][day_index:test_index] after_checkpoint = current_roadmap['schedule'][test_index:] max_hours_per_day = 16 ratio = (0, 100) else: # No upcoming test: Normal scheduling if day_index + 4 <= len(current_roadmap['schedule']): before_checkpoint = current_roadmap['schedule'][day_index:day_index+4] after_checkpoint = current_roadmap['schedule'][day_index+4:] else: print("Helloo") before_checkpoint = current_roadmap['schedule'][day_index:] after_checkpoint = [] ratio = (80, 20) test_portions = None dependencies = None new_roadmap, extra_day_tasks = shift_the_roadmap(before_checkpoint, max_hours_per_day, ratio, dependencies, test_portions) for day in new_roadmap: new_date = day["date"] for idx, existing_day in enumerate(current_roadmap['schedule']): if existing_day['date'] == new_date: current_roadmap['schedule'][idx] = day ckp_idx = idx break if extra_day_tasks: for day in extra_day_tasks: for subject in day["subjects"]: for task in subject['tasks']: task["Critical_Notification"] = "Unable to schedule - Too many backlogs" num_extra_days = len(extra_day_tasks) if test_index is not None: if 30 >= time_to_test > (10 + extra_rev_days): new_checkpoint = copy.deepcopy(after_checkpoint) day = copy.deepcopy(after_checkpoint[0]) for subject in day['subjects']: sub_name = subject["name"] subject['tasks'] = [ task for day in extra_day_tasks for subj in day["subjects"] if subj["name"] == sub_name for task in subj["tasks"] ] day["dayNumber"] = new_checkpoint[0]["dayNumber"] - 1 day["date"] = (datetime.strptime(new_checkpoint[0]["date"], "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") new_checkpoint.insert(0, day) curr_roadmap, extra_days = shift_the_roadmap(roadmap=new_checkpoint, max_hours_per_day = max_hours_per_day, ratio = ratio, dependencies = dependencies, test_portions = test_portions) new_roadmap = current_roadmap['schedule'][:ckp_idx+1] new_roadmap.extend(curr_roadmap[1:]) current_roadmap['schedule'] = new_roadmap elif 0 < time_to_test <= (10 + extra_rev_days): # Step 1: Add empty days at the end last_day = current_roadmap['schedule'][-1] last_date = datetime.strptime(last_day["date"], "%Y-%m-%d") last_day_number = last_day["dayNumber"] for i in range(num_extra_days): new_day = { "dayNumber": last_day_number + i + 1, "date": (last_date + timedelta(days=i + 1)).strftime("%Y-%m-%d"), "subjects": [] } current_roadmap['schedule'].append(new_day) # Step 2: Shift 'subject' key from test_index to end in reverse order total_days = len(current_roadmap['schedule']) for i in range(total_days - num_extra_days - 1, test_index - 1, -1): from_day = current_roadmap['schedule'][i] to_day = current_roadmap['schedule'][i + num_extra_days] to_day["subjects"] = from_day["subjects"] # Step 3: Insert the extra_day_tasks into the cleared slots starting at test_index for i, new_task_day in enumerate(extra_day_tasks): target_day = current_roadmap['schedule'][test_index + i] target_day["subjects"] = new_task_day["subjects"] else: if day_index + 4 <= len(current_roadmap['schedule']): new_checkpoint = copy.deepcopy(after_checkpoint) day = copy.deepcopy(after_checkpoint[0]) for subject in day['subjects']: sub_name = subject["name"] subject['tasks'] = [ task for day in extra_day_tasks for subj in day["subjects"] if subj["name"] == sub_name for task in subj["tasks"] ] day["dayNumber"] = new_checkpoint[0]["dayNumber"] - 1 day["date"] = (datetime.strptime(new_checkpoint[0]["date"], "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") new_checkpoint.insert(0, day) curr_roadmap, extra_days = shift_the_roadmap(roadmap=new_checkpoint, max_hours_per_day = max_hours_per_day, ratio = ratio, dependencies = dependencies, test_portions = test_portions) new_roadmap = current_roadmap['schedule'][:ckp_idx+1] new_roadmap.extend(curr_roadmap[1:]) current_roadmap['schedule'] = new_roadmap else: for tasks in extra_day_tasks: day = copy.deepcopy(new_roadmap[-1]) day["dayNumber"] = current_roadmap['schedule'][-1]["dayNumber"] + 1 day["date"] = (datetime.strptime(current_roadmap['schedule'][-1]["date"], "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") day['subjects'] = tasks['subjects'] current_roadmap['schedule'].append(day) st.session_state.updated_roadmap = current_roadmap # AGENT 2 def generate_sql_for_report(llm, prompt): table_struct = """ CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT, task_completed BOOLEAN, completion_timestamp TEXT ) """ response = llm.invoke( [ SystemMessage(content=f"""You are a helper who runs in the background of an AI agent, which helps students for their JEE Preparation. Now your job is to analyze the user's prompt and create an SQL query to extract the related Information from an sqlite3 database with the table structure: {table_struct}. Note: For the time column, the data is formatted like '0.5 hour', '1 hour', '2 hours' and so on, it tells the amount of time required to complete that specific task. So make sure to create queries that compare just the numbers within the text. For the task_type column, the data is either of these (Concept Understanding, Question Practice, Revision or Test) You will also make sure multiple times that you give an SQL Query that adheres to the given table structure, and you output just the SQL query. Do not include anything else like new line statements, ```sql or any other text. Your output is going to be directly fed into a Python script to extract the required information. So, please follow all the given instructions. Verify multiple times that the SQL query is error free for the SQLite3 format."""), HumanMessage(content=f"""Keeping the table structure in mind: {table_struct}, Convert this prompt to an SQL query for the given table: {prompt}. Make sure your output is just the SQL query, which can directly be used to extract required content.""") ] ) return response.content.strip() def get_sql_data_for_report(sql_query): conn = sqlite3.connect("jee_full_roadmap.db") cursor = conn.cursor() results = [] queries = [q.strip() for q in sql_query.strip().split(';') if q.strip()] for query in queries: cursor.execute(query) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() results.append({ "query": query, "columns": columns, "rows": rows }) conn.close() return results def create_db_for_report(roadmap_data): try: conn = sqlite3.connect("jee_full_roadmap.db") cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS roadmap") cursor.execute(""" CREATE TABLE roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT, task_completed BOOLEAN, completion_timestamp TEXT ) """) for day in roadmap_data["schedule"]: date = day["date"] day_num = day["dayNumber"] for subj in day["subjects"]: subject = subj["name"] for task in subj["tasks"]: cursor.execute(""" INSERT INTO roadmap (day_num, date, subject, chapter_name, task_type, time, subtopic, task_completed, completion_timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( day_num, date, subject, task["ChapterName"], task["type"], task["time"], task["subtopic"], task["task_completed"], task["completion_timestamp"] )) conn.commit() conn.close() print("✅ Database created and data inserted successfully.") except Exception as e: print(f"⚠️ Error initializing database: {e}") # Function to generate report llm = ChatOpenAI(model="gpt-4o-mini") class Section(BaseModel): name: str = Field( description="Name for this section of the report.", ) description: str = Field( description="Brief overview of the main topics and concepts to be covered in this section.", ) data_requirements: str = Field( description="Description of the data needed from the roadmap database to write this section.", ) class Sections(BaseModel): sections: List[Section] = Field( description="Sections of the report.", ) planner = llm.with_structured_output(Sections) class State(TypedDict): sections: list[Section] # List of report sections completed_sections: Annotated[list, operator.add] # All workers write to this key in parallel final_report: str # Final report # Combined helper-worker state class ProcessorState(TypedDict): section: Section completed_sections: Annotated[list, operator.add] def orchestrator(state: State): """Orchestrator that generates a plan for the report with data requirements""" schema = """CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, -- [yyyy-mm-dd] subject TEXT, -- (Physics, Chemistry or Maths) chapter_name TEXT, task_type TEXT, -- (Concept Understanding, Question Practice, Revision, Test) time TEXT, -- formatted like '0.5 hour', '1 hour', '2 Hours', and so on -- Tells the amount of time required to finish the task subtopic TEXT, task_completed BOOLEAN, -- 0/1 indicates task completion status completion_timestamp TEXT )""" # Generate queries report_sections = planner.invoke( [ SystemMessage(content=f"""You are responsible for creating a structured plan for a JEE preparation analysis report. Audience: The report is intended primarily for students, but must also be insightful to mentors and parents. Keep the language motivational and supportive, with actionable insights backed by data. Report Format: The report will be composed of exactly 4 concise sections. Your job is to define these sections. Each section must include: - **Name**: A short, descriptive title - **Description**: What the section analyzes and how it helps the student - **Data Requirements**: A plain-English description of what fields and metrics are needed from the roadmap database whose schema is given here: {schema} DO NOT invent new sections or formats. Use exactly the following four section templates and fill in the descriptions and data requirements precisely. --- ### Study Time Analysis **Description**: Analyze how much total time the student planned to spend vs how much they actually completed, across different subjects and task types. This will help the student understand where their time is really going. **Data Requirements**: - Fields: `subject`, `task_type`, `time`, `task_completed` - Metrics: - Total planned time → SUM of all `time` - Total actual time → SUM of `time` where `task_completed = 1` - Grouped by both `subject` and `task_type` --- ### Task Completion Metrics **Description**: Measure the student’s consistency and follow-through by looking at completion rates across subjects and task types. **Data Requirements**: - Fields: `subject`, `task_type`, `task_completed` - Metrics: - Total tasks → COUNT of all tasks - Completed tasks → COUNT of tasks where `task_completed = 1` - Completion percentage per subject and task type --- ### Study Balance Analysis **Description**: Evaluate how the student's study time is distributed across task types (e.g., Practice, Revision, Test) within each subject. This highlights over- or under-emphasis on any category. **Data Requirements**: - Fields: `subject`, `task_type`, `time` - Metrics: - SUM of `time` for each (subject, task_type) pair where task_completed = 1 - Relative distribution of time per subject to detect imbalance --- ### Strengths and Areas for Improvement **Description**: This section analyzes how the student's effort is distributed — not by estimating how long they spent, but by combining how many tasks they completed and how much time those completed tasks represent. This helps identify: - Subjects and task types where the student is showing strong commitment - Areas that may be neglected or inconsistently approached **Data Requirements**: - Fields: subject, task_type, task_completed, time - Metrics (filtered where task_completed = 1): - Total Number of completed tasks - Total amount of time spent - Grouped by subject and task_type --- Important Constraints: - You must include **all the mentioned fields** in the `data_requirements` — no assumptions - Use only **aggregate metrics** — no need for per-task or per-day analysis - Keep descriptions student-focused, clear, and motivational - Do not alter section names or invent new ones - Do not output anything outside the strict format above Your output will be passed into a structured data pipeline. Return only the filled-out section definitions as described above. """), HumanMessage(content="""Use the given table structure of the roadmap and decide all the sections of the report along with what should be in it and the clearly mention all the data thats required for it from the roadmap table"""), ] ) return {"sections": report_sections.sections} def processor(state: ProcessorState): """Combined helper and worker - gets data and writes section in one step""" section = state['section'] # HELPER PART: Get data for this section sql_query = generate_sql_for_report(llm, section.data_requirements) rows = get_sql_data_for_report(sql_query) # WORKER PART: Write the section using the data section_result = llm.invoke( [ SystemMessage( content=f"""Create a concise, data-driven JEE preparation report section that provides actionable insights for students, parents, and mentors. Requirements: 1. Begin directly with key metrics and insights - no introductory preamble 2. Use specific numbers, percentages, and ratios to quantify performance 3. Include concise tables or bullet points for clarity where appropriate 4. Highlight patterns related to: - Task completion rates - Time allocation efficiency - Subject/topic focus distribution - Study consistency patterns 5. For each observation, provide a brief actionable recommendation focused on student improvement. 6. Use professional but motivational tone appropriate for academic context 7. Strictly use Markdown for formatting all the tables and the numbers 8. Strictly keep each section very focused and write it under 0 to 50 words 9. Verify the formatting of all the tables multiple times to ensure the markdown is correct. 10. Check all the numbers and calculations made by you multiple times to ensure accuracy Base all analysis strictly on the provided data - avoid assumptions beyond what's explicitly given to you. Don't assume anything else, even a little bit. *Important* If you receive an empty data input, understand that the student hasn't done tasks matching the given data description. Also, know that this report is for the student to improve themselves, and they have no part in making sure the data is logged for this analysis. Deeply analyze the SQL query ->{sql_query} and the data description ->{section.data_requirements} used to extract the data and figure out why there was no data available in the roadmap, which the student went through and write the section accordingly. """ ), HumanMessage( content=f"""Here is the section name: {section.name} and description: {section.description} Data for writing this section: {rows}""" ), ] ) # Return completed section return {"completed_sections": [section_result.content]} def synthesizer(state: State): """Synthesize full report from sections""" # List of completed sections completed_sections = state["completed_sections"] # Format completed section to str to use as context for final sections completed_report_sections = "\n\n---\n\n".join(completed_sections) return {"final_report": completed_report_sections} # Assign processors function def assign_processors(state: State): """Assign a processor to each section in the plan""" return [Send("processor", {"section": s}) for s in state["sections"]] def generate_report(full_roadmap): with st.spinner("Generating performance report using AI..."): # Build workflow workflow_builder = StateGraph(State) # Add the nodes workflow_builder.add_node("orchestrator", orchestrator) workflow_builder.add_node("processor", processor) workflow_builder.add_node("synthesizer", synthesizer) # Add edges to connect nodes workflow_builder.add_edge(START, "orchestrator") workflow_builder.add_conditional_edges("orchestrator", assign_processors, ["processor"]) workflow_builder.add_edge("processor", "synthesizer") workflow_builder.add_edge("synthesizer", END) # Compile the workflow workflow = workflow_builder.compile() # Initialize database create_db_for_report(full_roadmap) # Invoke state = workflow.invoke({}) st.session_state.final_report = state["final_report"] # AGENT 3 def initialize_roadmap_db(): if not os.path.exists("jee_roadmap.db"): try: with open("full_roadmap.json") as f: roadmap_data = json.load(f) conn = sqlite3.connect("jee_roadmap.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT ) """) for day in roadmap_data["schedule"]: date = day["date"] day_num = day["dayNumber"] for subj in day["subjects"]: subject = subj["name"] for task in subj["tasks"]: cursor.execute(""" INSERT INTO roadmap (day_num, date, subject, chapter_name, task_type, time, subtopic) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( day_num, date, subject, task["ChapterName"], task["type"], task["time"], task["subtopic"] )) conn.commit() conn.close() print("✅ Database created and data inserted successfully.") except Exception as e: print(f"⚠️ Error initializing database: {e}") def get_chapters_and_subtopics(): with open("full_roadmap.json", "r") as f: data = json.load(f) ch_subt = { "Physics": {}, "Chemistry": {}, "Maths": {} } for day in data["schedule"]: for subject in day['subjects']: sub = ch_subt[subject['name']] for task in subject['tasks']: sub[task['ChapterName']] = [] for day in data["schedule"]: for subject in day['subjects']: sub = ch_subt[subject['name']] for task in subject['tasks']: if task['subtopic'] not in sub[task['ChapterName']]: sub[task['ChapterName']].append(task['subtopic']) return ch_subt # Function to convert NL query to SQL def generate_sql_from_nl(prompt): table_struct = """CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, -- [yyyy-mm-dd] subject TEXT, -- [Physics, Chemistry or Maths] chapter_name TEXT, task_type TEXT, -- (Concept Understanding, Question Practice, Revision, Test) time TEXT, -- formatted like '0.5 hour', '1 hour', '2 Hours', and so on subtopic TEXT, )""" ch_subt = get_chapters_and_subtopics() response = llm.invoke( [ SystemMessage( content=f"""You are an helper who runs in the background of an AI agent, which helps students for their JEE Preparation. Now your Job is to analyze the users prompt and create an SQL query to extract the related Information from an sqlite3 database with the table structure: {table_struct}. Note: - For the time column, the data is formatted like '0.5 hour', '1 hour', '2 hours' and so on. So make sure to create queries that compare just the numbers within the text. - If the student mention about any chapters or subtopics, browse through this json file {ch_subt}, find the one with the closest match to the users query and use only those exact names of Chapers and Subtopics present in this file to create SQL the query. - For date related queries, refer today's date {datetime.now().date()} You will also make sure multiple times that you give an SQL Query that adheres to the given table structure, and you Output just the SQL query. Do not include anyting else like new line statements, ```sql or any other text. Your output is going to be directly fed into a Python script to extract the required information. So, please follow all the given Instructions. """ ), HumanMessage( content=f"""Keeping the table structure in mind: {table_struct}, Convert this prompt to an SQL query for the given table: {prompt}. Make sure your output is just the SQL query, which can directly be used to extract required content""" ), ] ) # Return completed section return response.content.strip() # Function to fetch data from SQLite def fetch_data_from_sql(sql_query): conn = sqlite3.connect("jee_roadmap.db") cursor = conn.cursor() cursor.execute(sql_query) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() data = { "query": sql_query, "columns": columns, "rows": rows } conn.close() return data # Function to convert SQL output to natural language def generate_nl_from_sql_output(prompt, data): response = llm.invoke( [ SystemMessage( content=f"""You are an helpful AI chatbot working under the roadmap section of an AI Agent, whose role is to aid students in their preparation for the JEE examination. You are going to play a very crucial role of a Roadmap Assistant, who helps the student out with whatever query they have related to their roadmap, the data required to answer the users query is already extracted from the Roadmap table of a SQLite3 database and given to you here {data}. Analyse the users query deeply and reply to it with the relevant information from the given data in a supportive manner. If you get empty data as an input, deeply analyze the user's prompt and the sql query and give a suitable reply.""" ), HumanMessage( content=f"""Answer to this users query using the data given to you, while keeping your role in mind: {prompt}""" ), ] ) # Return completed section return response.content.strip() # Main function for chatbot def answer_user_query(prompt): initialize_roadmap_db() query = generate_sql_from_nl(prompt) data = fetch_data_from_sql(query) return generate_nl_from_sql_output(prompt, data) # ---- HOME PAGE ---- if page == "Home": st.title("📚 JEE Roadmap Planner") st.markdown(""" ### Welcome to your JEE Study Roadmap Planner! This tool helps you manage your JEE preparation schedule by: 1. 📊 **Analyzing your study performance** 2. 🔄 **Redistributing incomplete tasks** 3. 📝 **Providing personalized feedback** Get started by loading your roadmap data and following the step-by-step process. """) st.info("Navigate using the sidebar to access different features of the app.") # Initial data loading if st.button("📂 Load Roadmap Data"): success = load_initial_data() if success: st.session_state.first_load = True # ---- ROADMAP MANAGER PAGE ---- elif page == "Roadmap Manager": # AGENT 2 st.title("🗓️ Roadmap Manager") if st.session_state.data is None: st.warning("Please load roadmap data first from the Home page.") else: st.markdown("### Roadmap Management Steps") st.subheader("Step 1: Process Tasks") if st.button("1️⃣ Mark Tasks as Incomplete"): process_task_completion_data() st.subheader("Step 2: Reschedule Tasks") if st.button("2️⃣ Optimize Task Distribution"): update_roadmap(current_roadmap = st.session_state.data, current_dayNumber = 2, max_hours_per_day = 9, dependencies = st.session_state.dependencies, no_of_revision_days = 2) # Display original and updated roadmaps side by side if st.session_state.data and st.session_state.updated_roadmap: st.subheader("Roadmap Comparison") col1, col2 = st.columns(2) with col1: st.markdown("#### Original Roadmap") with st.expander("View Original Roadmap"): st.json(st.session_state.data) with col2: st.markdown("#### Updated Roadmap") with st.expander("View Updated Roadmap"): st.json(st.session_state.updated_roadmap) for day in st.session_state.updated_roadmap['schedule']: st.write(f"Day: {day['dayNumber']} -> Total Time: {check_tot_time(day)} Hours") # ---- TASK ANALYSIS PAGE ---- elif page == "Task Analysis": # AGENT 1 st.title("📊 Task Analysis") choice = st.selectbox("Choose the roadmap to use for building report", ["Four Day Roadmap", "Full Roadmap"]) if choice == "Four Day Roadmap": if st.session_state.data is None: st.warning("Please load roadmap data first from the Home page.") st.session_state.report_data = st.session_state.data elif choice == "Full Roadmap": with open("synthesized_full_roadmap.json", "r") as f: st.session_state.report_data = json.load(f) st.markdown("### Performance Report") if st.button("🔍 Generate Performance Report"): generate_report(st.session_state.report_data) if st.session_state.final_report: st.markdown(st.session_state.final_report) else: st.info("Click the button above to generate your performance report.") # Add visualization options if st.session_state.data: st.subheader("Task Breakdown") # Simple task statistics if st.checkbox("Show Task Statistics"): task_count = 0 subject_counts = {} type_counts = {} for day in st.session_state.report_data["schedule"]: for subject in day["subjects"]: subject_name = subject["name"] if subject_name not in subject_counts: subject_counts[subject_name] = 0 for task in subject["tasks"]: subject_counts[subject_name] += 1 task_count += 1 # Count by task type task_type = task.get("type", "Unknown") if task_type not in type_counts: type_counts[task_type] = 0 type_counts[task_type] += 1 st.write(f"Total tasks: {task_count}") # Create charts for data visualization col1, col2 = st.columns(2) with col1: st.subheader("Subject Distribution") st.bar_chart(subject_counts) with col2: st.subheader("Task Type Distribution") st.bar_chart(type_counts) # ---- ROADMAP CHATBOT PAGE ---- # AGENT 3 elif page == "Roadmap Chatbot": st.title("🤖 Roadmap Chatbot Assistant") user_query = st.text_input("Ask a question about your roadmap:", placeholder="e.g., What are my tasks on 14 Feb 2025?") if st.button("Ask") and user_query: with st.spinner("Thinking..."): try: response = answer_user_query(user_query) st.markdown(response) except Exception as e: st.error(f"Error: {e}")