import json import os import copy import sqlite3 import operator import pandas as pd from math import ceil from fastapi import FastAPI, Query from contextlib import asynccontextmanager from datetime import datetime, timedelta from dateutil import parser from collections import defaultdict from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage from typing import Annotated, List, Optional, Literal from pydantic import BaseModel, Field from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.constants import Send from fastapi.responses import HTMLResponse # Session State replacement session_state = { "data": None, "test_results": None, "full_roadmap": None, "report_data": None, "final_report": None, "dependencies": None, "updated_roadmap": None, "chapter_analysis": [] } # AGENT 1 @asynccontextmanager async def lifespan(app: FastAPI): try: with open("fourdayRoadmap.json", "r") as f: session_state["data"] = json.load(f) with open("synthesized_full_roadmap.json", "r") as f: session_state["full_roadmap"] = json.load(f) with open("dependencies.json", 'r') as file: session_state["dependencies"] = json.load(file) # Process tasks as incomplete process_task_data() load_ag4_data() print("✅ Roadmaps loaded successfully.") except Exception as e: print(f"❌ Error loading roadmaps: {e}") yield print("🛑 Shutting down.") # Initialize FastAPI app app = FastAPI( title="JEE Roadmap Planner API", description="API for managing and analyzing JEE Roadmaps", version="1.0.0", lifespan=lifespan ) # Function to mark tasks as incomplete def process_task_data(): data = session_state["data"] for day in data["schedule"]: for subject in day["subjects"]: for task in subject["tasks"]: task["task_completed"] = False task["completion_timestamp"] = None task["rescheduled"] = 0 session_state["data"] = data print("Task data processed!") def add_test(roadmap, date, physics = [], chemistry = [], maths = []): date = parser.parse(date).strftime("%Y-%m-%d") found = False for i, day in enumerate(roadmap["schedule"]): if day["date"] == date: found = True day["test_portion"] = [{ "name": "Physics", "chapters": physics }, { "name": "Chemistry", "chapters": chemistry }, { "name": "Maths", "chapters": maths }] break if not found: print("Kindly check the Entered Date(YYYY-MM-DD), it's not available in the roadmap") return roadmap def add_tasks(roadmap, tasks): from_date = datetime.strptime(parser.parse(tasks.from_date).strftime("%Y-%m-%d"), "%Y-%m-%d") to_date = datetime.strptime(parser.parse(tasks.to_date).strftime("%Y-%m-%d"), "%Y-%m-%d") current_date = from_date date_found = False while current_date <= to_date: date_str = current_date.strftime("%Y-%m-%d") # Find the day in the roadmap day = next((d for d in roadmap["schedule"] if d["date"] == date_str), None) if not day: current_date += timedelta(days=1) continue date_found = True # Ensure 'teacher_tasks' exists if "teacher_tasks" not in day: day["teacher_tasks"] = [] for subject_block in tasks.subjects: subject_name = subject_block.subject new_tasks = [task.model_dump() for task in subject_block.tasks] # Check if subject already exists for the day subject_entry = next((sub for sub in day["teacher_tasks"] if sub["name"] == subject_name), None) if subject_entry: for task in new_tasks: if task not in subject_entry["tasks"]: subject_entry["tasks"].append(task) else: day["teacher_tasks"].append({ "name": subject_name, "tasks": new_tasks }) current_date += timedelta(days=1) if not date_found: print("Kindly check the Entered Dates(YYYY-MM-DD), they are not available in the roadmap") return roadmap def check_tot_time(day): tot_time = 0 for subject in day['subjects']: for task in subject["tasks"]: tot_time += float(task['time'].split(" ")[0]) return tot_time def extract_tasks(roadmap, test_portions=None, dependencies=None): incomplete_tasks_by_subject = defaultdict(list) subjectwise_tasks = defaultdict(list) prev_day = roadmap[0] for subject in prev_day["subjects"]: subject_name = subject["name"] tasks = subject["tasks"] # Separate completed and incomplete tasks incomplete_tasks = [task for task in tasks if task['task_completed'] == False] completed_tasks = [task for task in tasks if task['task_completed'] == True] for task in incomplete_tasks: task['rescheduled'] += 1 # Store incomplete tasks per subject if incomplete_tasks: incomplete_tasks_by_subject[subject_name].extend(incomplete_tasks) # Keep only completed tasks in the previous day subject["tasks"] = completed_tasks for day_index, day in enumerate(roadmap[1:]): for subject in day["subjects"]: subject_name = subject["name"] subjectwise_tasks[subject_name].extend(subject["tasks"]) if test_portions and dependencies: dependent_tasks_by_subject = defaultdict(list) dependent_chapters = set() for subject in test_portions: sub_name = subject['name'] for chapter in subject['chapters']: if chapter in dependencies[sub_name]: dependent_chapters.update(dependencies[sub_name][chapter]) for subject, tasks in subjectwise_tasks.items(): retained_tasks = [] for task in tasks: if task.get("ChapterName") in dependent_chapters: dependent_tasks_by_subject[subject].append(task) else: retained_tasks.append(task) subjectwise_tasks[subject] = retained_tasks for subject, tasks in incomplete_tasks_by_subject.items(): retained_tasks = [] for task in tasks: if task.get("ChapterName") in dependent_chapters: dependent_tasks_by_subject[subject].append(task) else: retained_tasks.append(task) incomplete_tasks_by_subject[subject] = retained_tasks return roadmap, subjectwise_tasks, incomplete_tasks_by_subject, dependent_tasks_by_subject return roadmap, subjectwise_tasks, incomplete_tasks_by_subject def get_task_time(task): return round(float(task['time'].split(" ")[0]), 3) def calculate_time_distribution(roadmap, incomplete_tasks, incomplete_tasks_by_subject, max_hours_per_day): total_hours = 0 num_days = len(roadmap[1:]) extra_day=False extra_hours = 0 if incomplete_tasks_by_subject: for subject in incomplete_tasks_by_subject: for task in incomplete_tasks_by_subject[subject]: extra_hours += get_task_time(task) extra_day=True for subject in incomplete_tasks: for task in incomplete_tasks[subject]: total_hours += get_task_time(task) for day in roadmap[1:]: if day['dayNumber'] >= 550: max_hours_per_day = 16 for subject in day["subjects"]: for task in subject["tasks"]: total_hours += get_task_time(task) if num_days <= 0: return [], [total_hours + extra_hours] if total_hours+extra_hours > 0 else [] max_possible_hours = num_days * max_hours_per_day if total_hours <= max_possible_hours and not extra_day: # Calculate base hours per day (minimum) base_hours = total_hours // num_days # Calculate remaining hours remaining_hours = total_hours - (base_hours * num_days) # Start with all days having base hours distribution = [base_hours] * num_days # Distribute remaining hours starting from the last day for i in range(num_days - 1, -1, -1): if remaining_hours > 0: additional = min(1, remaining_hours, max_hours_per_day - distribution[i]) distribution[i] += additional remaining_hours -= additional return distribution, [] # Otherwise, max out all current days and prepare for extra days distribution = [max_hours_per_day] * num_days remaining_hours = total_hours - max_possible_hours if extra_day: base_hours = total_hours // num_days remaining_hours = total_hours - (base_hours * num_days) distribution = [base_hours] * num_days for i in range(num_days - 1, -1, -1): if remaining_hours > 0: additional = min(1, remaining_hours, max_hours_per_day - distribution[i]) distribution[i] += additional remaining_hours -= additional remaining_hours = extra_hours extra_distribution = [] while remaining_hours > 0: hours = min(max_hours_per_day, remaining_hours) extra_distribution.append(hours) remaining_hours -= hours return distribution, extra_distribution def add_tasks_for_extra_days(subject_all_tasks, incomplete_tasks, extra_day_tasks, extra_distribution, ratio, max_hours_per_day): subject_names = list(subject_all_tasks.keys()) or list(incomplete_tasks.keys()) has_incomplete_tasks = any(tasks for tasks in incomplete_tasks.values()) for i, target_time in enumerate(extra_distribution): day_time = 0 if subject_all_tasks: regular_task_limit = ceil(target_time * ratio[0] / 100) if has_incomplete_tasks else target_time incomplete_task_limit = ceil(target_time * ratio[1] / 100) if has_incomplete_tasks else 0 else: regular_task_limit = 0 incomplete_task_limit = target_time # Create a new day with subjects new_day = {"subjects": [{"name": n, "tasks": []} for n in subject_names]} # Step 1: Allocate regular tasks up to their limit regular_time = 0 while regular_time < regular_task_limit and day_time < max_hours_per_day: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if regular_time + task_time <= regular_task_limit and day_time + task_time <= max_hours_per_day: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) regular_time += task_time day_time += task_time added = True if not added: break # Step 2: Allocate incomplete tasks up to their limit incomplete_time = 0 while incomplete_time < incomplete_task_limit and day_time < max_hours_per_day: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not incomplete_tasks[subject_name]: continue next_task = incomplete_tasks[subject_name][0] task_time = get_task_time(next_task) if incomplete_time + task_time <= incomplete_task_limit and day_time + task_time <= max_hours_per_day: subject["tasks"].append(incomplete_tasks[subject_name].pop(0)) incomplete_time += task_time day_time += task_time added = True if not added: break # Step 3: Use remaining time for additional regular tasks if available if day_time < target_time: while day_time < target_time: added = False for subject in new_day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if day_time + task_time <= max_hours_per_day: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) day_time += task_time added = True if day_time > target_time: break if not added: break if i == len(extra_distribution) - 1: for subject in new_day["subjects"]: subject_name = subject["name"] # Add remaining regular tasks while subject_all_tasks[subject_name]: subject["tasks"].append(subject_all_tasks[subject_name].pop(0)) # Add remaining incomplete tasks while incomplete_tasks[subject_name]: subject["tasks"].append(incomplete_tasks[subject_name].pop(0)) extra_day_tasks.append(new_day) return extra_day_tasks def shift_the_roadmap(roadmap, max_hours_per_day, ratio=(80, 20), dependencies=None, test_portions=None): roadmap = copy.deepcopy(roadmap) # Extract tasks based on ratio mode if ratio == (80, 20): roadmap, subject_all_tasks, incomplete_tasks = extract_tasks(roadmap) dependent_tasks = None incomplete_tasks_by_subject = None else: roadmap, subject_all_tasks, incomplete_tasks_by_subject, dependent_tasks = extract_tasks( roadmap, test_portions, dependencies ) incomplete_tasks = dependent_tasks # Distribute time across days time_distribution, extra_distribution = calculate_time_distribution(roadmap, incomplete_tasks, incomplete_tasks_by_subject, max_hours_per_day) # Check if there are any incomplete tasks has_incomplete_tasks = any(tasks for tasks in incomplete_tasks.values()) # Prepare containers for task assignments pending_regular_tasks = defaultdict(lambda: defaultdict(list)) pending_incomplete_tasks = defaultdict(lambda: defaultdict(list)) # Redistribute tasks for each day for day_index, day in enumerate(roadmap[1:], 1): target_time = time_distribution[day_index - 1] day_time = 0 # Set task limits based on whether incomplete tasks exist regular_task_limit = ceil(target_time * ratio[0] / 100) if has_incomplete_tasks else target_time incomplete_task_limit = ceil(target_time * ratio[1] / 100) if has_incomplete_tasks else 0 # Step 1: Allocate regular tasks up to their limit (either 80% or 100%) regular_time = 0 while regular_time < regular_task_limit and day_time < max_hours_per_day: added = False for subject in day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if regular_time + task_time <= regular_task_limit and day_time + task_time <= max_hours_per_day: pending_regular_tasks[day_index][subject_name].append(subject_all_tasks[subject_name].pop(0)) regular_time += task_time day_time += task_time added = True if not added: break # Step 2: Allocate incomplete tasks if they exist if has_incomplete_tasks and incomplete_task_limit > 0: incomplete_time = 0 while incomplete_time < incomplete_task_limit and day_time < max_hours_per_day: added = False for subject in day["subjects"]: subject_name = subject["name"] if not incomplete_tasks[subject_name]: continue next_task = incomplete_tasks[subject_name][0] task_time = get_task_time(next_task) if incomplete_time + task_time <= incomplete_task_limit and day_time + task_time <= max_hours_per_day: pending_incomplete_tasks[day_index][subject_name].append(incomplete_tasks[subject_name].pop(0)) incomplete_time += task_time day_time += task_time added = True # Check if we've depleted all incomplete tasks if not any(tasks for tasks in incomplete_tasks.values()): has_incomplete_tasks = False break if not added: break # Step 3: Use remaining time for additional regular tasks if available if day_time < target_time: while day_time < target_time: added = False for subject in day["subjects"]: subject_name = subject["name"] if not subject_all_tasks[subject_name]: continue next_task = subject_all_tasks[subject_name][0] task_time = get_task_time(next_task) if day_time + task_time <= max_hours_per_day: pending_regular_tasks[day_index][subject_name].append(subject_all_tasks[subject_name].pop(0)) day_time += task_time added = True if day_time > target_time: break if not added: break extra_day_tasks = [] if extra_distribution: if incomplete_tasks_by_subject: for subject, tasks in incomplete_tasks_by_subject.items(): incomplete_tasks[subject].extend(tasks) extra_day_tasks = add_tasks_for_extra_days(subject_all_tasks, incomplete_tasks, extra_day_tasks, extra_distribution, (80, 20), max_hours_per_day) # Final appending of tasks for day_index, day in enumerate(roadmap[1:], 1): for subject in day["subjects"]: subject_name = subject["name"] subject["tasks"] = ( pending_regular_tasks[day_index][subject_name] + pending_incomplete_tasks[day_index][subject_name] ) else: for day_index, day in enumerate(roadmap[1:], 1): if day_index == len(roadmap) - 1: for subject in day["subjects"]: subject_name = subject["name"] # Add remaining regular tasks while subject_all_tasks[subject_name]: task = subject_all_tasks[subject_name].pop(0) pending_regular_tasks[day_index][subject_name].append(task) # Add remaining incomplete tasks while incomplete_tasks[subject_name]: task = incomplete_tasks[subject_name].pop(0) pending_incomplete_tasks[day_index][subject_name].append(task) # Final appending of tasks for subject in day["subjects"]: subject_name = subject["name"] subject["tasks"] = ( pending_regular_tasks[day_index][subject_name] + pending_incomplete_tasks[day_index][subject_name] ) return roadmap, extra_day_tasks def update_roadmap(current_roadmap, current_dayNumber, max_hours_per_day, dependencies, no_of_revision_days = 2): if current_dayNumber <= 1 or current_dayNumber > len(current_roadmap['schedule']): session_state["updated_roadmap"] = current_roadmap current_roadmap = copy.deepcopy(current_roadmap) day_index = current_dayNumber-2 test_index = None if "supplementary_tasks" in current_roadmap['schedule'][day_index]: current_roadmap['schedule'][day_index+1]['supplementary_tasks'] = [] for subject in current_roadmap['schedule'][day_index]["supplementary_tasks"]: subject_name = subject["name"] tasks = subject["tasks"] # Separate completed and incomplete tasks incomplete_tasks = [task for task in tasks if task['task_completed'] == False] completed_tasks = [task for task in tasks if task['task_completed'] == True] for task in incomplete_tasks: task['rescheduled'] += 1 # Move incomplete tasks to next day if incomplete_tasks: current_roadmap['schedule'][day_index+1]['supplementary_tasks'].append({ "name":subject_name, "tasks":incomplete_tasks }) # Keep only completed tasks in the previous day subject["tasks"] = completed_tasks # Check if a test exists in any specified day for day in current_roadmap['schedule']: if 'test_portion' in day: test_index = current_roadmap['schedule'].index(day) if test_index > (current_dayNumber-1): time_to_test = test_index - (current_dayNumber-1) test_portions = day['test_portion'] break else: test_index = None break extra_rev_days = max(no_of_revision_days - 2, 0) # Determine scheduling strategy based on time to test if test_index is not None: if 30 >= time_to_test > 25: # Far from test: Normal scheduling with backlog reduction before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-25)] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-25):] max_hours_per_day = 16 ratio = (80, 20) test_portions = None dependencies = None elif 25 >= time_to_test > (10 + extra_rev_days): # Mid-range: focus on current coursework before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-(10+extra_rev_days))] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-(10+extra_rev_days)):] max_hours_per_day = 16 ratio = (80, 20) test_portions = None dependencies = None elif (10 + extra_rev_days) >= time_to_test > no_of_revision_days: # Approaching test: Balance current work with test preparation before_checkpoint = current_roadmap['schedule'][day_index:day_index+(time_to_test-no_of_revision_days)] after_checkpoint = current_roadmap['schedule'][day_index+(time_to_test-no_of_revision_days):] max_hours_per_day = 16 ratio = (50, 50) elif 0 < time_to_test <= no_of_revision_days: # Final revision period: Focus entirely on test preparation before_checkpoint = current_roadmap['schedule'][day_index:test_index] after_checkpoint = current_roadmap['schedule'][test_index:] max_hours_per_day = 16 ratio = (0, 100) else: # No upcoming test: Normal scheduling if day_index + 4 <= len(current_roadmap['schedule'])-1: before_checkpoint = current_roadmap['schedule'][day_index:day_index+4] after_checkpoint = current_roadmap['schedule'][day_index+4:] else: before_checkpoint = current_roadmap['schedule'][day_index:] after_checkpoint = [] ratio = (80, 20) test_portions = None dependencies = None new_roadmap, extra_day_tasks = shift_the_roadmap(before_checkpoint, max_hours_per_day, ratio, dependencies, test_portions) for day in new_roadmap: new_date = day["date"] for idx, existing_day in enumerate(current_roadmap['schedule']): if existing_day['date'] == new_date: current_roadmap['schedule'][idx] = day ckp_idx = idx break if extra_day_tasks: for day in extra_day_tasks: for subject in day["subjects"]: for task in subject['tasks']: task["Critical_Notification"] = "Unable to schedule - Too many backlogs" num_extra_days = len(extra_day_tasks) if test_index is not None: if 30 >= time_to_test > (10 + extra_rev_days): new_checkpoint = copy.deepcopy(after_checkpoint) day = copy.deepcopy(after_checkpoint[0]) for subject in day['subjects']: sub_name = subject["name"] subject['tasks'] = [ task for day in extra_day_tasks for subj in day["subjects"] if subj["name"] == sub_name for task in subj["tasks"] ] day["dayNumber"] = new_checkpoint[0]["dayNumber"] - 1 day["date"] = (datetime.strptime(new_checkpoint[0]["date"], "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") new_checkpoint.insert(0, day) curr_roadmap, extra_days = shift_the_roadmap(roadmap=new_checkpoint, max_hours_per_day = max_hours_per_day, ratio = ratio, dependencies = dependencies, test_portions = test_portions) new_roadmap = current_roadmap['schedule'][:ckp_idx+1] new_roadmap.extend(curr_roadmap[1:]) current_roadmap['schedule'] = new_roadmap for tasks in extra_days: day = copy.deepcopy(new_roadmap[-1]) day["dayNumber"] = current_roadmap['schedule'][-1]["dayNumber"] + 1 day["date"] = (datetime.strptime(current_roadmap['schedule'][-1]["date"], "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") day['subjects'] = tasks['subjects'] current_roadmap['schedule'].append(day) elif 0 < time_to_test <= (10 + extra_rev_days): # Step 1: Add empty days at the end last_day = current_roadmap['schedule'][-1] last_date = datetime.strptime(last_day["date"], "%Y-%m-%d") last_day_number = last_day["dayNumber"] for i in range(num_extra_days): new_day = { "dayNumber": last_day_number + i + 1, "date": (last_date + timedelta(days=i + 1)).strftime("%Y-%m-%d"), "subjects": [] } current_roadmap['schedule'].append(new_day) # Step 2: Shift 'subject' key from test_index to end in reverse order total_days = len(current_roadmap['schedule']) for i in range(total_days - num_extra_days - 1, test_index - 1, -1): from_day = current_roadmap['schedule'][i] to_day = current_roadmap['schedule'][i + num_extra_days] to_day["subjects"] = from_day["subjects"] # Step 3: Insert the extra_day_tasks into the cleared slots starting at test_index for i, new_task_day in enumerate(extra_day_tasks): target_day = current_roadmap['schedule'][test_index + i] target_day["subjects"] = new_task_day["subjects"] else: if day_index + 4 <= len(current_roadmap['schedule'])-1: new_checkpoint = copy.deepcopy(after_checkpoint) day = copy.deepcopy(after_checkpoint[0]) for subject in day['subjects']: sub_name = subject["name"] subject['tasks'] = [ task for day in extra_day_tasks for subj in day["subjects"] if subj["name"] == sub_name for task in subj["tasks"] ] day["dayNumber"] = new_checkpoint[0]["dayNumber"] - 1 day["date"] = (datetime.strptime(new_checkpoint[0]["date"], "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") new_checkpoint.insert(0, day) curr_roadmap, extra_days = shift_the_roadmap(roadmap=new_checkpoint, max_hours_per_day = max_hours_per_day, ratio = ratio, dependencies = dependencies, test_portions = test_portions) new_roadmap = current_roadmap['schedule'][:ckp_idx+1] new_roadmap.extend(curr_roadmap[1:]) current_roadmap['schedule'] = new_roadmap for tasks in extra_days: day = copy.deepcopy(new_roadmap[-1]) day["dayNumber"] = current_roadmap['schedule'][-1]["dayNumber"] + 1 day["date"] = (datetime.strptime(current_roadmap['schedule'][-1]["date"], "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") day['subjects'] = tasks['subjects'] current_roadmap['schedule'].append(day) else: for tasks in extra_day_tasks: day = copy.deepcopy(new_roadmap[-1]) day["dayNumber"] = current_roadmap['schedule'][-1]["dayNumber"] + 1 day["date"] = (datetime.strptime(current_roadmap['schedule'][-1]["date"], "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") day['subjects'] = tasks['subjects'] current_roadmap['schedule'].append(day) session_state['updated_roadmap'] = current_roadmap # AGENT 2 def generate_sql_for_report(llm, prompt): table_struct = """ CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT, task_completed BOOLEAN, completion_timestamp TEXT ) """ response = llm.invoke( [ SystemMessage(content=f"""You are a helper who runs in the background of an AI agent, which helps students for their JEE Preparation. Now your job is to analyze the user's prompt and create an SQL query to extract the related Information from an sqlite3 database with the table structure: {table_struct}. Note: For the time column, the data is formatted like '0.5 hour', '1 hour', '2 hours' and so on, it tells the amount of time required to complete that specific task. So make sure to create queries that compare just the numbers within the text. For the task_type column, the data is either of these (Concept Understanding, Question Practice, Revision or Test) You will also make sure multiple times that you give an SQL Query that adheres to the given table structure, and you output just the SQL query. Do not include anything else like new line statements, ```sql or any other text. Your output is going to be directly fed into a Python script to extract the required information. So, please follow all the given instructions. Verify multiple times that the SQL query is error free for the SQLite3 format."""), HumanMessage(content=f"""Keeping the table structure in mind: {table_struct}, Convert this prompt to an SQL query for the given table: {prompt}. Make sure your output is just the SQL query, which can directly be used to extract required content.""") ] ) return response.content.strip() def get_sql_data_for_report(sql_query): conn = sqlite3.connect("jee_full_roadmap.db") cursor = conn.cursor() results = [] queries = [q.strip() for q in sql_query.strip().split(';') if q.strip()] for query in queries: cursor.execute(query) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() results.append({ "query": query, "columns": columns, "rows": rows }) conn.close() return results def create_db_for_report(roadmap_data): try: conn = sqlite3.connect("jee_full_roadmap.db") cursor = conn.cursor() cursor.execute("DROP TABLE IF EXISTS roadmap") cursor.execute(""" CREATE TABLE roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT, task_completed BOOLEAN, completion_timestamp TEXT ) """) for day in roadmap_data["schedule"]: date = day["date"] day_num = day["dayNumber"] for subj in day["subjects"]: subject = subj["name"] for task in subj["tasks"]: cursor.execute(""" INSERT INTO roadmap (day_num, date, subject, chapter_name, task_type, time, subtopic, task_completed, completion_timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( day_num, date, subject, task["ChapterName"], task["type"], task["time"], task["subtopic"], task["task_completed"], task["completion_timestamp"] )) conn.commit() conn.close() print("✅ Database created and data inserted successfully.") except Exception as e: print(f"⚠️ Error initializing database: {e}") # Function to generate report llm = ChatOpenAI(model="gpt-4o-mini") class Section(BaseModel): name: str = Field( description="Name for this section of the report.", ) description: str = Field( description="Brief overview of the main topics and concepts to be covered in this section.", ) data_requirements: str = Field( description="Description of the data needed from the roadmap database to write this section.", ) class Sections(BaseModel): sections: List[Section] = Field( description="Sections of the report.", ) planner = llm.with_structured_output(Sections) class State(TypedDict): sections: list[Section] # List of report sections completed_sections: Annotated[list, operator.add] # All workers write to this key in parallel final_report: str # Final report # Combined helper-worker state class ProcessorState(TypedDict): section: Section completed_sections: Annotated[list, operator.add] def orchestrator(state: State): """Orchestrator that generates a plan for the report with data requirements""" schema = """CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, -- [yyyy-mm-dd] subject TEXT, -- (Physics, Chemistry or Maths) chapter_name TEXT, task_type TEXT, -- (Concept Understanding, Question Practice, Revision, Test) time TEXT, -- formatted like '0.5 hour', '1 hour', '2 Hours', and so on -- Tells the amount of time required to finish the task subtopic TEXT, task_completed BOOLEAN, -- 0/1 indicates task completion status completion_timestamp TEXT )""" # Generate queries report_sections = planner.invoke( [ SystemMessage(content=f"""You are responsible for creating a structured plan for a JEE preparation analysis report. Audience: The report is intended primarily for students, but must also be insightful to mentors and parents. Keep the language motivational and supportive, with actionable insights backed by data. Report Format: The report will be composed of exactly 4 concise sections. Your job is to define these sections. Each section must include: - **Name**: A short, descriptive title - **Description**: What the section analyzes and how it helps the student - **Data Requirements**: A plain-English description of what fields and metrics are needed from the roadmap database whose schema is given here: {schema} DO NOT invent new sections or formats. Use exactly the following four section templates and fill in the descriptions and data requirements precisely. --- ### Study Time Analysis **Description**: Analyze how much total time the student planned to spend vs how much they actually completed, across different subjects and task types. This will help the student understand where their time is really going. **Data Requirements**: - Fields: `subject`, `task_type`, `time`, `task_completed` - Metrics: - Total planned time → SUM of all `time` - Total actual time → SUM of `time` where `task_completed = 1` - Grouped by both `subject` and `task_type` --- ### Task Completion Metrics **Description**: Measure the student’s consistency and follow-through by looking at completion rates across subjects and task types. **Data Requirements**: - Fields: `subject`, `task_type`, `task_completed` - Metrics: - Total tasks → COUNT of all tasks - Completed tasks → COUNT of tasks where `task_completed = 1` - Completion percentage per subject and task type --- ### Study Balance Analysis **Description**: Evaluate how the student's study time is distributed across task types (e.g., Practice, Revision, Test) within each subject. This highlights over- or under-emphasis on any category. **Data Requirements**: - Fields: `subject`, `task_type`, `time` - Metrics: - SUM of `time` for each (subject, task_type) pair where task_completed = 1 - Relative distribution of time per subject to detect imbalance --- ### Strengths and Areas for Improvement **Description**: This section analyzes how the student's effort is distributed — not by estimating how long they spent, but by combining how many tasks they completed and how much time those completed tasks represent. This helps identify: - Subjects and task types where the student is showing strong commitment - Areas that may be neglected or inconsistently approached **Data Requirements**: - Fields: subject, task_type, task_completed, time - Metrics (filtered where task_completed = 1): - Total Number of completed tasks - Total amount of time spent - Grouped by subject and task_type --- Important Constraints: - You must include **all the mentioned fields** in the `data_requirements` — no assumptions - Use only **aggregate metrics** — no need for per-task or per-day analysis - Keep descriptions student-focused, clear, and motivational - Do not alter section names or invent new ones - Do not output anything outside the strict format above Your output will be passed into a structured data pipeline. Return only the filled-out section definitions as described above. """), HumanMessage(content="""Use the given table structure of the roadmap and decide all the sections of the report along with what should be in it and the clearly mention all the data thats required for it from the roadmap table"""), ] ) return {"sections": report_sections.sections} def processor(state: ProcessorState): """Combined helper and worker - gets data and writes section in one step""" section = state['section'] # HELPER PART: Get data for this section sql_query = generate_sql_for_report(llm, section.data_requirements) rows = get_sql_data_for_report(sql_query) # WORKER PART: Write the section using the data section_result = llm.invoke( [ SystemMessage( content=f"""Create a concise, data-driven JEE preparation report section that provides actionable insights for students, parents, and mentors. Requirements: 1. Begin directly with key metrics and insights - no introductory preamble 2. Use specific numbers, percentages, and ratios to quantify performance 3. Include concise tables or bullet points for clarity where appropriate 4. Highlight patterns related to: - Task completion rates - Time allocation efficiency - Subject/topic focus distribution - Study consistency patterns 5. For each observation, provide a brief actionable recommendation focused on student improvement. 6. Use professional but motivational tone appropriate for academic context 7. Strictly use Markdown for formatting all the tables and the numbers 8. Strictly keep each section very focused and write it under 0 to 50 words 9. Verify the formatting of all the tables multiple times to ensure the markdown is correct. 10. Check all the numbers and calculations made by you multiple times to ensure accuracy Base all analysis strictly on the provided data - avoid assumptions beyond what's explicitly given to you. Don't assume anything else, even a little bit. *Important* If you receive an empty data input, understand that the student hasn't done tasks matching the given data description. Also, know that this report is for the student to improve themselves, and they have no part in making sure the data is logged for this analysis. Deeply analyze the SQL query ->{sql_query} and the data description ->{section.data_requirements} used to extract the data and figure out why there was no data available in the roadmap, which the student went through and write the section accordingly. """ ), HumanMessage( content=f"""Here is the section name: {section.name} and description: {section.description} Data for writing this section: {rows}""" ), ] ) # Return completed section return {"completed_sections": [section_result.content]} def synthesizer(state: State): """Synthesize full report from sections""" # List of completed sections completed_sections = state["completed_sections"] # Format completed section to str to use as context for final sections completed_report_sections = "\n\n---\n\n".join(completed_sections) return {"final_report": completed_report_sections} # Assign processors function def assign_processors(state: State): """Assign a processor to each section in the plan""" return [Send("processor", {"section": s}) for s in state["sections"]] def generate_report(full_roadmap): # Build workflow workflow_builder = StateGraph(State) # Add the nodes workflow_builder.add_node("orchestrator", orchestrator) workflow_builder.add_node("processor", processor) workflow_builder.add_node("synthesizer", synthesizer) # Add edges to connect nodes workflow_builder.add_edge(START, "orchestrator") workflow_builder.add_conditional_edges("orchestrator", assign_processors, ["processor"]) workflow_builder.add_edge("processor", "synthesizer") workflow_builder.add_edge("synthesizer", END) # Compile the workflow workflow = workflow_builder.compile() # Initialize database create_db_for_report(full_roadmap) # Invoke state = workflow.invoke({}) session_state['final_report'] = state["final_report"] # AGENT 3 def initialize_roadmap_db(): if not os.path.exists("jee_roadmap.db"): try: with open("full_roadmap.json") as f: roadmap_data = json.load(f) conn = sqlite3.connect("jee_roadmap.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, subject TEXT, chapter_name TEXT, task_type TEXT, time TEXT, subtopic TEXT ) """) for day in roadmap_data["schedule"]: date = day["date"] day_num = day["dayNumber"] for subj in day["subjects"]: subject = subj["name"] for task in subj["tasks"]: cursor.execute(""" INSERT INTO roadmap (day_num, date, subject, chapter_name, task_type, time, subtopic) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( day_num, date, subject, task["ChapterName"], task["type"], task["time"], task["subtopic"] )) conn.commit() conn.close() print("✅ Database created and data inserted successfully.") except Exception as e: print(f"⚠️ Error initializing database: {e}") def get_chapters_and_subtopics(): with open("full_roadmap.json", "r") as f: data = json.load(f) ch_subt = { "Physics": {}, "Chemistry": {}, "Maths": {} } for day in data["schedule"]: for subject in day['subjects']: sub = ch_subt[subject['name']] for task in subject['tasks']: sub[task['ChapterName']] = [] for day in data["schedule"]: for subject in day['subjects']: sub = ch_subt[subject['name']] for task in subject['tasks']: if task['subtopic'] not in sub[task['ChapterName']]: sub[task['ChapterName']].append(task['subtopic']) return ch_subt # Function to convert NL query to SQL def generate_sql_from_nl(prompt): table_struct = """CREATE TABLE IF NOT EXISTS roadmap ( id INTEGER PRIMARY KEY AUTOINCREMENT, day_num INTEGER, date TEXT, -- [yyyy-mm-dd] subject TEXT, -- [Physics, Chemistry or Maths] chapter_name TEXT, task_type TEXT, -- (Concept Understanding, Question Practice, Revision, Test) time TEXT, -- formatted like '0.5 hour', '1 hour', '2 Hours', and so on subtopic TEXT, )""" ch_subt = get_chapters_and_subtopics() response = llm.invoke( [ SystemMessage( content=f"""You are an helper who runs in the background of an AI agent, which helps students for their JEE Preparation. Now your Job is to analyze the users prompt and create an SQL query to extract the related Information from an sqlite3 database with the table structure: {table_struct}. Note: - For the time column, the data is formatted like '0.5 hour', '1 hour', '2 hours' and so on. So make sure to create queries that compare just the numbers within the text. - If the student mention about any chapters or subtopics, browse through this json file {ch_subt}, find the one with the closest match to the users query and use only those exact names of Chapers and Subtopics present in this file to create SQL the query. - For date related queries, refer today's date {datetime.now().date()} - If the user ask's you general questions, Return a Dummy query like {"SELECT * FROM your_table WHERE FALSE;"} You will also make sure multiple times that you give an SQL Query that adheres to the given table structure, and you Output just the SQL query. Do not include anyting else like new line statements, ```sql or any other text. Your output is going to be directly fed into a Python script to extract the required information. So, please follow all the given Instructions. """ ), HumanMessage( content=f"""Keeping the table structure in mind: {table_struct}, Convert this prompt to an SQL query for the given table: {prompt}. Make sure your output is just the SQL query, which can directly be used to extract required content""" ), ] ) # Return completed section return response.content.strip() # Function to fetch data from SQLite def fetch_data_from_sql(sql_query): conn = sqlite3.connect("jee_roadmap.db") cursor = conn.cursor() cursor.execute(sql_query) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() data = { "query": sql_query, "columns": columns, "rows": rows } conn.close() return data # Function to convert SQL output to natural language def generate_nl_from_sql_output(prompt, data): response = llm.invoke( [ SystemMessage( content=f"""You are an helpful AI chatbot working under the roadmap section of an AI Agent, whose role is to aid students in their preparation for the JEE examination. You are going to play a very crucial role of a Roadmap Assistant, who helps the student out with whatever query they have related to their roadmap, the data required to answer the users query is already extracted from the Roadmap table of a SQLite3 database and given to you here {data}. Analyse the users query deeply and reply to it with the relevant information from the given data in a supportive manner. If you get empty data as an input, deeply analyze the user's prompt and the sql query and give a suitable reply. If you find the user's prompt to be conversational in nature, please respond accordingly.""" ), HumanMessage( content=f"""Answer to this users query using the data given to you, while keeping your role in mind: {prompt}""" ), ] ) # Return completed section return response.content.strip() # Main function for chatbot def answer_user_query(prompt): initialize_roadmap_db() query = generate_sql_from_nl(prompt) data = fetch_data_from_sql(query) return generate_nl_from_sql_output(prompt, data) ## Agent 4 - Analysis and Accountability Agent class Plan(BaseModel): result: Literal["More Tasks", "Revise Chapter", "Strong Chapter"] = Field( ..., description="""In general what is needed to be done to make the student perform better in this concept the next time?""" ) tasks: List[Literal["Concept Understanding", "Question Practice", "Revision", "Test", "No Tasks"]] = Field( ..., description="""If more tasks are needed to be scheduled for the student, what kind of tasks must be scheduled for the student to perform better the next time?""" ) class Analysis(BaseModel): correct_answers: int = Field( ..., description="Count and find out the number of 'Correct?' keys with True as the value" ) why_correct: str = Field( ..., description="""Give a deep analysis on which area must the student be strong in, in order to answer the questions which were answered correctly""" ) wrong_answers: int = Field( ..., description="ount and find out the number of 'Correct?' keys with False as the value" ) why_wrong: str = Field( ..., description="""Give a deep analysis on which area should the student be weak in, to answer those questions incorrectly""" ) action: Plan = Field( ..., description="""Deeply analyze the difficulty levels of the answered questions along with the change in the ELO scores, as the student gives right and wrong answers. With this analysis, determine what would the student require more of to perform better the next time.""" ) analyzer = llm.with_structured_output(Analysis) def get_llm_response(data): response = analyzer.invoke( [ SystemMessage( content=f"""You are a smart AI mentor who is an expert in making sure that the students prepare perfectly for their JEE examinaton. In order to make sure of this, you are asking a set of 5 questions to the student from the list of subtopics, which the student claims to have completed. Now, once the student answers all those 5 questions. You will be given with all the details you need to analyze how good the student has performed. The details which will be given to you are: 1). The Subject from which the question was asked. Which will be either Physics, Chemistry, or Maths 2). The Chapter from which the question was asked from. 3). The tag of the Question - It tells the kind of work the student needs to do, to answer that question. Which will be either Concept Understanding, Question Practice, Revision or Test. 4). The Complexity level of the question. Which will be either Easy, Hard or Medium. 5). The Questions which were asked to the student. 6). The 4 options which were given to the student. 7). The Option that was selected by the student. 8). The Correct Option. 9). Whether the student has answered the question correctly or not. 10). The ELO score of the student after answering that question. Your job is to strictly follow the given output structure the whole time you would be producing the analysis. You will not deviate in any manner from the given output json structure, if you do so then the whole application will crash. The complete data Required to analyze the student's performance is given to you here: {data} """ ), HumanMessage( content=f"""Perform the analysis with absolute accuracy and make no mistake. The analysis will be used to understand whether the student actually knows something in that subtopic of the subject, or he has just marked it as completed just for the sake of it. Make sure your output is perfectly formatted with the given structure""" ), ] ) return response def ag4_update_roadmap(test_data, roadmap, chapter_analysis, current_dayNumber): data_dict = test_data.to_dict(orient="records") response = get_llm_response( json.dumps(data_dict, indent=4) ).model_dump() subject = data_dict[0]["Subject"] chapter = data_dict[0]["Chapter"] analysis = { "subject": subject, "chapter": chapter, "positives": response['why_correct'], "negatives": response['why_wrong'], } chapter_analysis.append(analysis) if response['action']['tasks'][0] != "No Tasks": tasks = { "Physics": [], "Chemistry": [], "Maths": [] } for task in response['action']['tasks']: tasks[subject].append({ "ChapterName": chapter, "type": task, "rescheduled": 0, "completed": False, "completion_timestamp": None }) day = roadmap['schedule'][current_dayNumber] if "supplementary_tasks" not in day: day["supplementary_tasks"] = [{ "name":"Physics", "tasks":[] },{ "name":"Chemistry", "tasks":[] },{ "name":"Maths", "tasks":[] }] for sub in day["supplementary_tasks"]: sub_name = sub["name"] for task in tasks[sub_name]: if task not in sub["tasks"]: sub["tasks"].append(task) session_state['updated_roadmap'] = roadmap session_state['chapter_analysis'] = chapter_analysis @app.get("/", response_class=HTMLResponse) def root(): return """
Select an agent:
""" # --- AGENT 1: Task Analysis (Task Analysis Page) --- @app.get("/agent1") def agent1(choice: Optional[str] = Query("Four Day Roadmap", description="Choose roadmap: 'Four Day Roadmap' or 'Full Roadmap'")): """ Agent 1 - Task Analysis: Builds a performance report based on selected roadmap. """ # Handle choice of roadmap if choice == "Four Day Roadmap": if session_state["data"] is None: return {"error": "Roadmap data not loaded. Load data first."} session_state["report_data"] = session_state["data"] elif choice == "Full Roadmap": with open("synthesized_full_roadmap.json", "r") as f: session_state["report_data"] = json.load(f) # Generate performance report if session_state["report_data"]: generate_report(session_state["report_data"]) return { "final_report": session_state["final_report"] } return {"message": "No report data available."} @app.get("/testscheduler") def testscheduler( date : str = Query("2025-02-23", description="Enter the date to schedule the test"), physics : list = Query(["Properties of Solids and Liquids"], description="Enter the chapters for test in physics as a list"), chemistry : list = Query(["Equilibrium"], description="Enter the chapters for test in chemistry as a list"), maths : list = Query(["Limits,Continuity and Differentiability"], description="Enter the chapters for test in maths as a list"), ): """ Helps the teacher in Scheduling of Tests in the roadmap """ session_state["data"] = add_test(session_state["data"], date, physics, chemistry, maths) return {"sucessful": "Test Succesfully Scheduled in the roadmap"} class Task(BaseModel): chapter: str = Field(..., description="The chapter associated with this task.") description: str = Field(..., description="A brief explanation of what needs to be done.") estimated_time: float = Field(..., description="Estimated time in hours to complete the task.") class SubjectTasks(BaseModel): subject: Literal["Physics", "Chemistry", "Maths"] = Field(..., description="The subject the tasks belong to.") tasks: List[Task] = Field(default_factory=list, description="The list of tasks in this subject.") class Tasks(BaseModel): from_date: str = Field(..., description="The start date for the tasks") to_date: str = Field(..., description="The end date for the tasks.") subjects: List[SubjectTasks] = Field(default_factory=list, description="Subjectwise Task list") @app.post("/taskadder") def taskadder(tasks: Tasks): """ Helps the teacher in scheduling tasks in the roadmap. If no tasks are given, the subject entry will still be added with an empty task list. """ session_state["data"] = add_tasks(session_state["data"], tasks) return {"successful": "Task successfully added to the roadmap"} #HTML code to add tasks @app.get("/task-addition-form", response_class=HTMLResponse) def task_form(): return """