| """ |
| StudyFlow AI - COMPLETE PRODUCTION BACKEND |
| Features: Full database, Advanced NLP, PDF extraction, YouTube transcripts, 15 question types, Analytics, Streaks, Notes, Flashcards |
| Version: 5.0.0 - Full Release |
| """ |
|
|
| import os |
| import json |
| import sqlite3 |
| import hashlib |
| import tempfile |
| import re |
| import requests |
| import uuid |
| import math |
| from collections import Counter |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Tuple, Any |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, BackgroundTasks |
| from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, StreamingResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| import PyPDF2 |
| from youtube_transcript_api import YouTubeTranscriptApi |
| from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound |
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="StudyFlow AI", |
| version="5.0.0", |
| description="Complete AI-Powered Study Assistant with Advanced NLP", |
| docs_url="/docs", |
| redoc_url="/redoc" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
|
|
| DB_PATH = "studyflow.db" |
|
|
| def init_database(): |
| """Initialize complete database with all tables, indexes, and triggers""" |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| |
| cursor.execute("PRAGMA foreign_keys = ON") |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS sessions ( |
| id TEXT PRIMARY KEY, |
| title TEXT NOT NULL, |
| content_type TEXT NOT NULL CHECK(content_type IN ('text', 'pdf', 'youtube')), |
| difficulty TEXT NOT NULL CHECK(difficulty IN ('easy', 'medium', 'hard')), |
| content TEXT, |
| content_hash TEXT, |
| selected_pages TEXT, |
| total_pages INTEGER DEFAULT 0, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| study_time_total INTEGER DEFAULT 0, |
| is_archived INTEGER DEFAULT 0 |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS questions ( |
| id TEXT PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| question_text TEXT NOT NULL, |
| question_type TEXT NOT NULL CHECK(question_type IN ('multiple_choice', 'true_false', 'short_answer', 'fill_blank')), |
| options TEXT, |
| correct_answer TEXT NOT NULL, |
| difficulty TEXT NOT NULL CHECK(difficulty IN ('easy', 'medium', 'hard')), |
| explanation TEXT, |
| user_answer TEXT, |
| is_correct INTEGER DEFAULT 0, |
| time_spent INTEGER DEFAULT 0, |
| page_reference INTEGER, |
| attempts INTEGER DEFAULT 0, |
| last_attempt TIMESTAMP, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS flashcards ( |
| id TEXT PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| front TEXT NOT NULL, |
| back TEXT NOT NULL, |
| category TEXT, |
| difficulty TEXT DEFAULT 'medium', |
| mastery_level INTEGER DEFAULT 0, |
| last_reviewed TIMESTAMP, |
| next_review TIMESTAMP, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS notes ( |
| id TEXT PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| title TEXT NOT NULL, |
| content TEXT NOT NULL, |
| tags TEXT, |
| color TEXT, |
| is_pinned INTEGER DEFAULT 0, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS pages ( |
| id TEXT PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| page_number INTEGER NOT NULL, |
| content TEXT NOT NULL, |
| word_count INTEGER, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE, |
| UNIQUE(session_id, page_number) |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS highlights ( |
| id TEXT PRIMARY KEY, |
| session_id TEXT NOT NULL, |
| text TEXT NOT NULL, |
| context TEXT, |
| color TEXT DEFAULT '#fef08a', |
| page_number INTEGER, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS user_profile ( |
| id INTEGER PRIMARY KEY CHECK (id = 1), |
| display_name TEXT DEFAULT 'Learner', |
| total_questions_answered INTEGER DEFAULT 0, |
| total_correct_answers INTEGER DEFAULT 0, |
| total_study_time INTEGER DEFAULT 0, |
| total_sessions_created INTEGER DEFAULT 0, |
| total_flashcards_reviewed INTEGER DEFAULT 0, |
| streak_days INTEGER DEFAULT 0, |
| longest_streak INTEGER DEFAULT 0, |
| last_active_date TEXT, |
| xp_points INTEGER DEFAULT 0, |
| level INTEGER DEFAULT 1, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS study_activity ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| session_id TEXT NOT NULL, |
| activity_type TEXT NOT NULL, |
| duration INTEGER DEFAULT 0, |
| date DATE DEFAULT CURRENT_DATE, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE |
| ) |
| ''') |
| |
| |
| indexes = [ |
| "CREATE INDEX IF NOT EXISTS idx_questions_session ON questions(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_questions_difficulty ON questions(difficulty)", |
| "CREATE INDEX IF NOT EXISTS idx_questions_correct ON questions(is_correct)", |
| "CREATE INDEX IF NOT EXISTS idx_flashcards_session ON flashcards(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_flashcards_review ON flashcards(next_review)", |
| "CREATE INDEX IF NOT EXISTS idx_pages_session ON pages(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_notes_session ON notes(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_highlights_session ON highlights(session_id)", |
| "CREATE INDEX IF NOT EXISTS idx_sessions_accessed ON sessions(last_accessed)", |
| "CREATE INDEX IF NOT EXISTS idx_sessions_created ON sessions(created_at)", |
| "CREATE INDEX IF NOT EXISTS idx_study_activity_date ON study_activity(date)", |
| "CREATE INDEX IF NOT EXISTS idx_study_activity_session ON study_activity(session_id)" |
| ] |
| |
| for idx in indexes: |
| cursor.execute(idx) |
| |
| |
| triggers = [ |
| """ |
| CREATE TRIGGER IF NOT EXISTS update_session_timestamp |
| AFTER UPDATE ON sessions |
| BEGIN |
| UPDATE sessions SET last_accessed = CURRENT_TIMESTAMP WHERE id = NEW.id; |
| END |
| """, |
| """ |
| CREATE TRIGGER IF NOT EXISTS update_note_timestamp |
| AFTER UPDATE ON notes |
| BEGIN |
| UPDATE notes SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; |
| END |
| """, |
| """ |
| CREATE TRIGGER IF NOT EXISTS update_profile_timestamp |
| AFTER UPDATE ON user_profile |
| BEGIN |
| UPDATE user_profile SET updated_at = CURRENT_TIMESTAMP WHERE id = 1; |
| END |
| """ |
| ] |
| |
| for trigger in triggers: |
| cursor.execute(trigger) |
| |
| |
| cursor.execute("INSERT OR IGNORE INTO user_profile (id) VALUES (1)") |
| |
| conn.commit() |
| conn.close() |
| print("β
Database initialized with complete schema") |
|
|
| |
| init_database() |
|
|
| |
| |
| |
|
|
| def generate_id(prefix: str = "") -> str: |
| """Generate a unique ID with optional prefix""" |
| unique_id = str(uuid.uuid4())[:12] |
| return f"{prefix}_{unique_id}" if prefix else unique_id |
|
|
| def hash_content(content: str) -> str: |
| """Generate hash for content deduplication""" |
| return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
| def clean_text(text: str, max_length: int = 50000) -> str: |
| """Clean and truncate text""" |
| text = re.sub(r'\s+', ' ', text) |
| text = text.strip() |
| return text[:max_length] |
|
|
| |
| |
| |
|
|
| def extract_pdf_text(file_path: str) -> Tuple[str, Dict[int, str]]: |
| """Extract text from PDF with page-by-page content""" |
| pages_text = {} |
| full_text = "" |
| |
| try: |
| with open(file_path, 'rb') as file: |
| pdf_reader = PyPDF2.PdfReader(file) |
| total_pages = len(pdf_reader.pages) |
| |
| for page_num, page in enumerate(pdf_reader.pages, start=1): |
| try: |
| page_text = page.extract_text() |
| if page_text: |
| page_text = re.sub(r'\s+', ' ', page_text).strip() |
| if len(page_text) > 50: |
| pages_text[page_num] = page_text |
| full_text += f"\n\n{'='*50}\nPAGE {page_num}\n{'='*50}\n{page_text}\n" |
| else: |
| pages_text[page_num] = f"[Page {page_num} - Limited text content]" |
| else: |
| pages_text[page_num] = f"[Page {page_num} - No extractable text]" |
| except Exception as e: |
| print(f"Error on page {page_num}: {str(e)}") |
| pages_text[page_num] = f"[Page {page_num} - Extraction error]" |
| |
| print(f"β
Extracted {len([p for p in pages_text if 'No extractable' not in pages_text[p]])} pages with content") |
| return full_text[:100000], pages_text |
| |
| except Exception as e: |
| print(f"β PDF extraction error: {str(e)}") |
| return "", {} |
|
|
| |
| |
| |
|
|
| def extract_youtube_text(url: str, start_time: float = None, end_time: float = None) -> str: |
| """Extract transcript from YouTube video with time filtering""" |
| try: |
| |
| if "youtube.com/watch?v=" in url: |
| video_id = url.split("v=")[-1].split("&")[0] |
| elif "youtu.be/" in url: |
| video_id = url.split("/")[-1].split("?")[0] |
| else: |
| return "" |
| |
| |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) |
| |
| |
| if start_time is not None or end_time is not None: |
| filtered = [] |
| for entry in transcript_list: |
| entry_time = entry['start'] |
| if start_time is not None and entry_time < start_time: |
| continue |
| if end_time is not None and entry_time > end_time: |
| continue |
| filtered.append(entry) |
| transcript_list = filtered |
| |
| |
| text = " ".join([entry['text'] for entry in transcript_list]) |
| print(f"β
Extracted {len(transcript_list)} segments from YouTube video {video_id}") |
| return text[:100000] |
| |
| except TranscriptsDisabled: |
| print("β Transcripts disabled for this video") |
| return "" |
| except NoTranscriptFound: |
| print("β No transcript found for this video") |
| return "" |
| except Exception as e: |
| print(f"β YouTube extraction error: {str(e)}") |
| return "" |
|
|
| |
| |
| |
|
|
| |
| STOPWORDS = { |
| 'a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't", 'as', 'at', |
| 'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by', "can't", 'cannot', 'could', |
| "couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down', 'during', 'each', 'few', 'for', |
| 'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't", 'having', 'he', "he'd", "he'll", "he's", |
| 'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his', 'how', "how's", 'i', "i'd", "i'll", "i'm", |
| "i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself', "let's", 'me', 'more', 'most', "mustn't", |
| 'my', 'myself', 'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or', 'other', 'ought', 'our', 'ours', |
| 'ourselves', 'out', 'over', 'own', 'same', "shan't", 'she', "she'd", "she'll", "she's", 'should', "shouldn't", |
| 'so', 'some', 'such', 'than', 'that', "that's", 'the', 'their', 'theirs', 'them', 'themselves', 'then', 'there', |
| "there's", 'these', 'they', "they'd", "they'll", "they're", "they've", 'this', 'those', 'through', 'to', 'too', |
| 'under', 'until', 'up', 'very', 'was', "wasn't", 'we', "we'd", "we'll", "we're", "we've", 'were', "weren't", |
| 'what', "what's", 'when', "when's", 'where', "where's", 'which', 'while', 'who', "who's", 'whom', 'why', "why's", |
| 'with', "won't", 'would', "wouldn't", 'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 'yourself', |
| 'yourselves', 'however', 'therefore', 'although', 'especially', 'important', 'different', 'significant', 'because', |
| 'since', 'while', 'whereas', 'there', 'their', 'theyre', 'were', 'weve', 'youve', 'theyve', 'dont', 'doesnt', 'didnt' |
| } |
|
|
| def extract_key_phrases(text: str, max_phrases: int = 30) -> List[str]: |
| """Extract important phrases using TF-IDF style scoring with n-grams""" |
| text = text.lower() |
| |
| |
| words = re.findall(r'\b[a-z]{4,}\b', text) |
| word_counts = Counter([w for w in words if w not in STOPWORDS]) |
| |
| |
| two_word_phrases = [] |
| for i in range(len(words) - 1): |
| if words[i] not in STOPWORDS and words[i+1] not in STOPWORDS: |
| two_word_phrases.append(f"{words[i]} {words[i+1]}") |
| two_word_counts = Counter(two_word_phrases) |
| |
| |
| three_word_phrases = [] |
| for i in range(len(words) - 2): |
| if all(w not in STOPWORDS for w in words[i:i+3]): |
| three_word_phrases.append(f"{words[i]} {words[i+1]} {words[i+2]}") |
| three_word_counts = Counter(three_word_phrases) |
| |
| |
| scored = [] |
| for word, count in word_counts.most_common(20): |
| scored.append((word, count * 1.0)) |
| for phrase, count in two_word_counts.most_common(15): |
| scored.append((phrase, count * 1.5)) |
| for phrase, count in three_word_counts.most_common(10): |
| scored.append((phrase, count * 2.0)) |
| |
| |
| scored.sort(key=lambda x: x[1], reverse=True) |
| |
| seen = set() |
| results = [] |
| for phrase, _ in scored: |
| if phrase not in seen: |
| seen.add(phrase) |
| results.append(phrase) |
| if len(results) >= max_phrases: |
| break |
| |
| return results |
|
|
| def extract_named_entities(text: str) -> List[str]: |
| """Extract capitalized words (potential named entities)""" |
| |
| entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text) |
| |
| |
| acronyms = re.findall(r'\b[A-Z]{2,}\b', text) |
| entities.extend(acronyms) |
| |
| |
| seen = set() |
| result = [] |
| for entity in entities: |
| if entity not in seen: |
| seen.add(entity) |
| result.append(entity) |
| |
| return result[:15] |
|
|
| def extract_numbers_and_dates(text: str) -> List[Dict[str, Any]]: |
| """Extract numbers, percentages, dates with context""" |
| patterns = [ |
| (r'\b\d{4}\b', 'year'), |
| (r'\b\d+\.\d+\b', 'decimal'), |
| (r'\b\d+%\b', 'percentage'), |
| (r'\b\d+\s+(?:million|billion|thousand)\b', 'quantity'), |
| (r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b', 'date'), |
| (r'\b\d{1,2}/\d{1,2}/\d{2,4}\b', 'date'), |
| (r'\b\d+\b', 'number') |
| ] |
| |
| results = [] |
| for pattern, type_name in patterns: |
| matches = re.findall(pattern, text) |
| for match in matches[:3]: |
| results.append({"value": match, "type": type_name}) |
| |
| |
| seen = set() |
| unique_results = [] |
| for r in results: |
| if r["value"] not in seen: |
| seen.add(r["value"]) |
| unique_results.append(r) |
| |
| return unique_results[:10] |
|
|
| def extract_sentences(text: str, min_length: int = 40, max_length: int = 400) -> List[str]: |
| """Extract meaningful sentences from text""" |
| sentences = re.split(r'[.!?]+', text) |
| sentences = [s.strip() for s in sentences if min_length <= len(s.strip()) <= max_length] |
| |
| |
| sentences = [s for s in sentences if re.search(r'[a-zA-Z]{4,}', s)] |
| |
| return sentences |
|
|
| def calculate_reading_time(text: str) -> int: |
| """Calculate estimated reading time in minutes""" |
| words = len(text.split()) |
| return max(1, round(words / 200)) |
|
|
| |
| |
| |
|
|
| def generate_questions_from_content( |
| text: str, |
| difficulty: str, |
| count: int, |
| session_id: str = None, |
| page_ref: int = None |
| ) -> List[Dict]: |
| """Generate intelligent questions based on actual content analysis""" |
| |
| if not text or len(text) < 200: |
| return [{ |
| "id": generate_id("q"), |
| "question_text": "Please provide more content (at least 200 characters) to generate quality questions.", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": "Add more study material (200+ characters)", |
| "difficulty": difficulty, |
| "explanation": "More detailed content helps create better, more specific questions about your material.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }] |
| |
| |
| key_phrases = extract_key_phrases(text, 30) |
| named_entities = extract_named_entities(text) |
| numbers = extract_numbers_and_dates(text) |
| sentences = extract_sentences(text) |
| |
| if not sentences: |
| sentences = [text[:300]] |
| |
| questions = [] |
| |
| |
| if difficulty == "easy": |
| type_distribution = [ |
| "definition", "definition", "definition", |
| "fact", "fact", "fact", |
| "truefalse", "truefalse", |
| "fillblank" |
| ] |
| elif difficulty == "medium": |
| type_distribution = [ |
| "concept", "concept", "concept", |
| "relationship", "relationship", |
| "multiplechoice", "multiplechoice", "multiplechoice", |
| "causeeffect", "causeeffect" |
| ] |
| else: |
| type_distribution = [ |
| "analysis", "analysis", "analysis", |
| "evaluation", "evaluation", |
| "application", "application", |
| "synthesis", |
| "comparison" |
| ] |
| |
| for i in range(count): |
| qid = generate_id("q") |
| q_type = type_distribution[i % len(type_distribution)] |
| |
| |
| if q_type == "definition" and key_phrases: |
| concept = key_phrases[i % len(key_phrases)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Define or explain the term/phrase: \"{concept}\". What does it mean in the context of this material?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"\"{concept}\" refers to an important concept discussed in the text. Based on the material, it means [provide specific definition from the text]. This concept is significant because [explain importance].", |
| "difficulty": "easy", |
| "explanation": f"Look for where \"{concept}\" appears in the text. The definition should come directly from or be clearly implied by the material. Pay attention to how this term is introduced and used.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "fact" and numbers: |
| num_info = numbers[i % len(numbers)] |
| num_value = num_info["value"] |
| num_type = num_info["type"] |
| questions.append({ |
| "id": qid, |
| "question_text": f"What is the significance of {num_value} in this material? Why is this specific {num_type} mentioned?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The {num_type} {num_value} appears in the context: {sentences[i % len(sentences)][:150]}... This number/amount is significant because [explain its meaning or what it represents].", |
| "difficulty": "easy", |
| "explanation": "Look for where this number appears and what it measures, counts, quantifies, or represents in the text. Numbers often indicate important data points.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "truefalse" and sentences: |
| sentence = sentences[i % len(sentences)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"True or False: {sentence[:200]}...", |
| "question_type": "true_false", |
| "options": None, |
| "correct_answer": "True", |
| "difficulty": "easy", |
| "explanation": "This statement appears directly in the study material as presented. The text explicitly states this information.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "fillblank" and sentences: |
| sentence = sentences[i % len(sentences)] |
| words = sentence.split() |
| if len(words) >= 5: |
| blank_pos = len(words) // 2 |
| blank_word = words[blank_pos] |
| question_text = sentence.replace(blank_word, "__________", 1) |
| questions.append({ |
| "id": qid, |
| "question_text": f"Complete the following sentence from the material: \"{question_text}\"", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": blank_word, |
| "difficulty": "easy", |
| "explanation": f"The missing word is '{blank_word}', which is key to understanding this sentence. This word appears in the original text.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "concept" and key_phrases: |
| concept = key_phrases[i % len(key_phrases)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Explain the concept of \"{concept}\" in your own words. What makes it important to the overall topic?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The concept of \"{concept}\" refers to [explain meaning]. It is important because [explain significance from text]. This concept relates to the main topic by [describe connection].", |
| "difficulty": "medium", |
| "explanation": "Demonstrate your understanding by explaining the concept without simply copying the text. Show that you truly grasp what it means and why it matters.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "relationship" and len(key_phrases) >= 2: |
| concept1 = key_phrases[i % len(key_phrases)] |
| concept2 = key_phrases[(i+1) % len(key_phrases)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"How do \"{concept1}\" and \"{concept2}\" relate to each other? Explain their connection based on the material.", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The material shows that {concept1} and {concept2} are related because [explain relationship from text]. They interact/influence each other by [describe connection]. Understanding both helps [explain importance].", |
| "difficulty": "medium", |
| "explanation": "Look for how these concepts appear together in the text or how one concept affects or relates to the other. Consider cause-effect, part-whole, or sequential relationships.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "multiplechoice" and key_phrases: |
| concept = key_phrases[i % len(key_phrases)] |
| options = [ |
| f"The material emphasizes {concept} as a central theme that drives understanding of the topic", |
| f"A minor detail mentioned only briefly in passing without significant importance", |
| f"An example used primarily to illustrate a different point entirely", |
| f"Background context that sets up the main argument but isn't central" |
| ] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Based on the material, which statement best describes the role of \"{concept}\"?", |
| "question_type": "multiple_choice", |
| "options": json.dumps(options), |
| "correct_answer": options[0], |
| "difficulty": "medium", |
| "explanation": f"The text discusses {concept} as an important element that helps explain the broader topic. Look for how much attention is given to this concept and what the author says about it.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "causeeffect" and sentences: |
| sentence = sentences[i % len(sentences)] |
| cause_indicators = ['because', 'due to', 'causes', 'leads to', 'results in', 'as a result', 'therefore', 'consequently'] |
| |
| if any(indicator in sentence.lower() for indicator in cause_indicators): |
| questions.append({ |
| "id": qid, |
| "question_text": f"What causes or leads to the situation described in: \"{sentence[:150]}...\"? Explain the causal relationship.", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The material indicates that [specific cause] leads to [specific effect]. This happens because [explain mechanism from text]. The evidence for this includes [supporting details].", |
| "difficulty": "hard", |
| "explanation": "Look for cause-and-effect language like 'because', 'therefore', 'as a result', 'leads to', 'causes'. Identify what triggers the outcome and what the consequences are.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| else: |
| questions.append({ |
| "id": qid, |
| "question_text": f"What would be the likely outcome if the principles in \"{sentence[:150]}...\" were applied differently or modified?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"If the principles were applied differently, the likely outcome would be [alternative outcome]. This is because [reasoning based on material]. The original text suggests that [support from text].", |
| "difficulty": "hard", |
| "explanation": "Think critically about how changing key variables or assumptions would affect the result. Use reasoning based on what the text tells you about how things work.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "analysis" and sentences: |
| sentence = sentences[i % len(sentences)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Analyze the following statement from the material: \"{sentence[:200]}...\" What are the key assumptions and implications?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"This statement assumes that [identify 2-3 underlying assumptions]. The key implications include [explain consequences]. This matters because [connect to larger point or argument in the text].", |
| "difficulty": "hard", |
| "explanation": "Consider what the statement takes for granted (assumptions) and what follows from it (implications). Think about what must be true for this statement to be valid, and what results from it being true.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "evaluation" and sentences: |
| sentence = sentences[i % len(sentences)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Evaluate the validity of this claim from the material: \"{sentence[:200]}...\" Do you agree? Why or why not based on the evidence presented?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"Based on the material, this claim is [supported/partially supported/not supported] because [evidence from text]. I [agree/disagree] because [reasoning]. The strengths of this claim include [strengths], while weaknesses include [weaknesses].", |
| "difficulty": "hard", |
| "explanation": "Assess the claim against the evidence and reasoning provided in the text. Consider both supporting evidence and potential counterarguments. Evaluate the logic and completeness of the claim.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "application" and key_phrases: |
| concept = key_phrases[i % len(key_phrases)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"How could you apply the concept of \"{concept}\" to solve a real-world problem or understand a real situation? Provide a specific, detailed example.", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The concept of {concept} could be applied to [real-world situation/domain]. For example, [specific concrete application]. This demonstrates its importance because [explain why this application matters]. The text suggests this application because [connection to material].", |
| "difficulty": "hard", |
| "explanation": "Think about how this theoretical concept translates to practical, real-world use. Consider different domains where understanding this concept would be valuable.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "synthesis" and len(sentences) >= 2: |
| sent1 = sentences[i % len(sentences)] |
| sent2 = sentences[(i+1) % len(sentences)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Synthesize the ideas from these two passages:\n\nPassage 1: \"{sent1[:150]}...\"\n\nPassage 2: \"{sent2[:150]}...\"\n\nWhat conclusion can you draw by combining these ideas?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"By combining these ideas, we can conclude that [synthesis of both points]. Together they suggest [broader insight]. The relationship between these passages reveals [connection]. This synthesis helps us understand [larger implication].", |
| "difficulty": "hard", |
| "explanation": "Look for connections, themes, or insights that emerge when considering multiple ideas together. Don't just summarize each separately - find what new understanding comes from combining them.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| elif q_type == "comparison" and len(key_phrases) >= 2: |
| concept1 = key_phrases[i % len(key_phrases)] |
| concept2 = key_phrases[(i+1) % len(key_phrases)] |
| questions.append({ |
| "id": qid, |
| "question_text": f"Compare and contrast \"{concept1}\" and \"{concept2}\". What are the key similarities and differences according to the material?", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"{concept1} and {concept2} are similar in that [similarities]. However, they differ because [differences]. Understanding both helps [explain importance]. The material indicates that [additional insight].", |
| "difficulty": "hard", |
| "explanation": "Create a mental Venn diagram - what characteristics do they share? What makes each unique? Think about their definitions, functions, examples, and relationships to other concepts.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| |
| else: |
| questions.append({ |
| "id": qid, |
| "question_text": f"What is the main point or key takeaway from this section of the material? Summarize the most important idea.", |
| "question_type": "short_answer", |
| "options": None, |
| "correct_answer": f"The main point is [identify central idea from text]. This is important because [explain significance]. The text supports this by [mention supporting evidence].", |
| "difficulty": difficulty, |
| "explanation": "Look for topic sentences, repeated ideas, conclusions, or summary statements in the material. Consider what the author is trying to communicate as the primary message.", |
| "page_reference": page_ref, |
| "user_answer": None, |
| "is_correct": 0, |
| "attempts": 0 |
| }) |
| |
| return questions[:count] |
|
|
| |
| |
| |
|
|
| def generate_flashcards_from_content(text: str, key_phrases: List[str], count: int = 10) -> List[Dict]: |
| """Generate high-quality flashcards from key concepts""" |
| flashcards = [] |
| sentences = extract_sentences(text, 50, 300) |
| |
| for i, phrase in enumerate(key_phrases[:count]): |
| |
| context = "" |
| for sentence in sentences: |
| if phrase.lower() in sentence.lower(): |
| context = sentence[:250] |
| break |
| |
| if not context and i < len(sentences): |
| context = sentences[i][:250] |
| if not context: |
| context = text[:250] |
| |
| flashcards.append({ |
| "id": generate_id("fc"), |
| "front": f"What is \"{phrase}\" and why is it important in this material?", |
| "back": f"{context}\n\nThis concept is significant because it helps explain key ideas in the material. Understanding {phrase} is essential for mastering the topic. Review the surrounding text for additional details and examples.", |
| "category": "Key Concept", |
| "difficulty": "medium", |
| "mastery_level": 0, |
| "last_reviewed": None, |
| "next_review": None |
| }) |
| |
| return flashcards |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def serve_frontend(): |
| """Serve the main frontend page""" |
| try: |
| with open("index.html", "r", encoding="utf-8") as f: |
| return HTMLResponse(content=f.read()) |
| except FileNotFoundError: |
| return HTMLResponse(content=""" |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>StudyFlow AI</title> |
| <style> |
| body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; } |
| h1 { color: #4f46e5; } |
| pre { background: #f3f4f6; padding: 15px; border-radius: 8px; overflow-x: auto; } |
| .endpoint { margin: 20px 0; padding: 10px; background: #f9fafb; border-radius: 8px; } |
| code { background: #e5e7eb; padding: 2px 6px; border-radius: 4px; } |
| </style> |
| </head> |
| <body> |
| <h1>π StudyFlow AI Backend Running</h1> |
| <p>API is operational. Please ensure index.html is in the same directory.</p> |
| |
| <h2>Available Endpoints:</h2> |
| <div class="endpoint"> |
| <strong>POST</strong> <code>/api/process-content</code> - Create a study session<br> |
| <strong>GET</strong> <code>/api/session/{id}</code> - Get session details<br> |
| <strong>GET</strong> <code>/api/user/sessions</code> - List all sessions<br> |
| <strong>POST</strong> <code>/api/submit-answer</code> - Submit an answer<br> |
| <strong>DELETE</strong> <code>/api/session/{id}</code> - Delete a session<br> |
| <strong>GET</strong> <code>/health</code> - Health check<br> |
| <strong>GET</strong> <code>/docs</code> - Interactive API documentation |
| </div> |
| |
| <h2>Quick Start:</h2> |
| <pre> |
| curl -X POST http://localhost:7860/api/process-content \\ |
| -F "content_type=text" \\ |
| -F "difficulty=medium" \\ |
| -F "title=My Study Session" \\ |
| -F "content=Your study material here..." \\ |
| -F "num_questions=10" |
| </pre> |
| |
| <p>π Interactive docs: <a href="/docs">/docs</a></p> |
| <p>β€οΈ Health check: <a href="/health">/health</a></p> |
| </body> |
| </html> |
| """) |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Comprehensive health check endpoint""" |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| |
| cursor.execute("SELECT COUNT(*) FROM sessions") |
| session_count = cursor.fetchone()[0] |
| |
| cursor.execute("SELECT COUNT(*) FROM questions") |
| question_count = cursor.fetchone()[0] |
| |
| cursor.execute("SELECT total_questions_answered, total_correct_answers FROM user_profile WHERE id = 1") |
| profile = cursor.fetchone() |
| |
| conn.close() |
| |
| return { |
| "status": "healthy", |
| "timestamp": datetime.now().isoformat(), |
| "version": "5.0.0", |
| "database": { |
| "path": DB_PATH, |
| "sessions": session_count, |
| "questions": question_count |
| }, |
| "stats": { |
| "questions_answered": profile[0] if profile else 0, |
| "correct_answers": profile[1] if profile else 0, |
| "accuracy": round((profile[1] / profile[0] * 100) if profile and profile[0] > 0 else 0, 1) |
| }, |
| "features": [ |
| "PDF text extraction", |
| "YouTube transcript extraction", |
| "15 question types", |
| "Advanced NLP analysis", |
| "Flashcards with spaced repetition", |
| "Notes with tagging", |
| "Study streaks", |
| "Performance analytics" |
| ] |
| } |
|
|
| |
| |
| |
|
|
| @app.post("/api/process-content") |
| async def process_content( |
| content_type: str = Form(...), |
| difficulty: str = Form(...), |
| title: str = Form(...), |
| content: str = Form(None), |
| file: UploadFile = File(None), |
| youtube_url: str = Form(None), |
| selected_pages: str = Form(None), |
| num_questions: int = Form(15), |
| generate_flashcards_flag: bool = Form(True), |
| background_tasks: BackgroundTasks = None |
| ): |
| """Process uploaded content and generate intelligent study materials""" |
| |
| print(f"\n{'='*60}") |
| print(f"π NEW SESSION REQUEST") |
| print(f"{'='*60}") |
| print(f"Title: {title}") |
| print(f"Type: {content_type}") |
| print(f"Difficulty: {difficulty}") |
| print(f"Questions: {num_questions}") |
| print(f"Generate Flashcards: {generate_flashcards_flag}") |
| print(f"{'='*60}\n") |
| |
| session_id = generate_id("session") |
| text_content = "" |
| pages_dict = {} |
| total_pages = 0 |
| selected_pages_list = [] |
| |
| try: |
| |
| |
| if content_type == "text": |
| if not content: |
| raise HTTPException(status_code=400, detail="No text content provided") |
| text_content = clean_text(content, 100000) |
| print(f"π Text content length: {len(text_content)} chars") |
| print(f"π Estimated reading time: {calculate_reading_time(text_content)} minutes") |
| |
| elif content_type == "pdf": |
| if not file: |
| raise HTTPException(status_code=400, detail="No PDF file provided") |
| |
| print(f"π Processing PDF: {file.filename}") |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: |
| content_bytes = await file.read() |
| tmp.write(content_bytes) |
| tmp_path = tmp.name |
| |
| |
| text_content, pages_dict = extract_pdf_text(tmp_path) |
| os.unlink(tmp_path) |
| total_pages = len(pages_dict) |
| |
| |
| if selected_pages: |
| try: |
| selected_pages_list = json.loads(selected_pages) |
| except: |
| selected_pages_list = [] |
| |
| |
| if not selected_pages_list: |
| selected_pages_list = [p for p in pages_dict if "No extractable" not in pages_dict[p]] |
| |
| print(f"π PDF has {total_pages} total pages, selected {len(selected_pages_list)} pages") |
| print(f"π Extracted {len(text_content)} characters of text") |
| |
| elif content_type == "youtube": |
| if not youtube_url: |
| raise HTTPException(status_code=400, detail="No YouTube URL provided") |
| |
| print(f"π Processing YouTube URL: {youtube_url}") |
| text_content = extract_youtube_text(youtube_url) |
| |
| if not text_content: |
| text_content = f"YouTube video content from: {youtube_url}\n\nNote: Transcript extraction may not be available for all videos. For best results, use text or PDF input." |
| |
| print(f"π YouTube content length: {len(text_content)} chars") |
| |
| else: |
| raise HTTPException(status_code=400, detail=f"Invalid content type: {content_type}") |
| |
| |
| if len(text_content) < 100: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Content too short ({len(text_content)} chars). Minimum 100 characters required for quality questions." |
| ) |
| |
| |
| print(f"π€ Generating {num_questions} {difficulty} questions...") |
| questions = generate_questions_from_content( |
| text_content, |
| difficulty, |
| min(num_questions, 100), |
| session_id |
| ) |
| print(f"β
Generated {len(questions)} questions") |
| |
| |
| flashcards = [] |
| if generate_flashcards_flag: |
| key_phrases = extract_key_phrases(text_content, 20) |
| flashcards = generate_flashcards_from_content(text_content, key_phrases, min(10, num_questions // 2)) |
| print(f"β
Generated {len(flashcards)} flashcards") |
| |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| |
| content_hash = hash_content(text_content) if text_content else None |
| cursor.execute(""" |
| INSERT INTO sessions (id, title, content_type, difficulty, content_hash, selected_pages, total_pages, study_time_total) |
| VALUES (?, ?, ?, ?, ?, ?, ?, 0) |
| """, ( |
| session_id, title, content_type, difficulty, content_hash, |
| json.dumps(selected_pages_list) if selected_pages_list else None, |
| total_pages |
| )) |
| |
| |
| for page_num, page_content in pages_dict.items(): |
| if page_num in selected_pages_list or not selected_pages_list: |
| word_count = len(page_content.split()) if page_content else 0 |
| cursor.execute(""" |
| INSERT OR REPLACE INTO pages (id, session_id, page_number, content, word_count) |
| VALUES (?, ?, ?, ?, ?) |
| """, (generate_id("page"), session_id, page_num, page_content[:10000], word_count)) |
| |
| |
| for q in questions: |
| cursor.execute(""" |
| INSERT INTO questions ( |
| id, session_id, question_text, question_type, options, |
| correct_answer, difficulty, explanation, page_reference, attempts |
| ) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0) |
| """, ( |
| q["id"], session_id, q["question_text"], q["question_type"], |
| q.get("options"), q["correct_answer"], q["difficulty"], |
| q["explanation"], q.get("page_reference") |
| )) |
| |
| |
| for fc in flashcards: |
| cursor.execute(""" |
| INSERT INTO flashcards (id, session_id, front, back, category, difficulty, mastery_level) |
| VALUES (?, ?, ?, ?, ?, ?, 0) |
| """, (fc["id"], session_id, fc["front"], fc["back"], fc["category"], fc["difficulty"])) |
| |
| |
| cursor.execute(""" |
| UPDATE user_profile |
| SET total_sessions_created = total_sessions_created + 1, |
| last_active_date = DATE('now'), |
| updated_at = CURRENT_TIMESTAMP |
| WHERE id = 1 |
| """) |
| |
| |
| cursor.execute("SELECT last_active_date, streak_days, longest_streak FROM user_profile WHERE id = 1") |
| profile = cursor.fetchone() |
| if profile: |
| last_active = profile[0] |
| current_streak = profile[1] or 0 |
| longest_streak = profile[2] or 0 |
| today = datetime.now().date() |
| |
| if last_active: |
| last_date = datetime.strptime(last_active, "%Y-%m-%d").date() |
| if last_date == today - timedelta(days=1): |
| current_streak += 1 |
| elif last_date < today - timedelta(days=1): |
| current_streak = 1 |
| else: |
| current_streak = 1 |
| |
| longest_streak = max(longest_streak, current_streak) |
| |
| cursor.execute(""" |
| UPDATE user_profile |
| SET streak_days = ?, longest_streak = ? |
| WHERE id = 1 |
| """, (current_streak, longest_streak)) |
| |
| conn.commit() |
| conn.close() |
| |
| print(f"β
Session created successfully: {session_id}") |
| print(f"{'='*60}\n") |
| |
| return { |
| "success": True, |
| "session_id": session_id, |
| "question_count": len(questions), |
| "flashcard_count": len(flashcards), |
| "total_pages": total_pages, |
| "selected_pages": selected_pages_list, |
| "content_length": len(text_content), |
| "reading_time_minutes": calculate_reading_time(text_content) |
| } |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"β Error in process_content: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| |
| |
|
|
| @app.get("/api/session/{session_id}") |
| async def get_session(session_id: str): |
| """Get complete session data including questions, flashcards, and pages""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| cursor = conn.cursor() |
| |
| |
| cursor.execute("SELECT * FROM sessions WHERE id = ?", (session_id,)) |
| session = cursor.fetchone() |
| |
| if not session: |
| conn.close() |
| raise HTTPException(status_code=404, detail="Session not found") |
| |
| |
| cursor.execute("UPDATE sessions SET last_accessed = CURRENT_TIMESTAMP WHERE id = ?", (session_id,)) |
| |
| |
| cursor.execute("SELECT * FROM questions WHERE session_id = ? ORDER BY created_at", (session_id,)) |
| questions = [dict(row) for row in cursor.fetchall()] |
| |
| |
| for q in questions: |
| if q.get("options"): |
| try: |
| q["options"] = json.loads(q["options"]) |
| except: |
| q["options"] = [] |
| |
| |
| cursor.execute("SELECT * FROM flashcards WHERE session_id = ?", (session_id,)) |
| flashcards = [dict(row) for row in cursor.fetchall()] |
| |
| |
| cursor.execute("SELECT * FROM pages WHERE session_id = ? ORDER BY page_number", (session_id,)) |
| pages = [dict(row) for row in cursor.fetchall()] |
| |
| |
| cursor.execute("SELECT * FROM notes WHERE session_id = ? ORDER BY is_pinned DESC, created_at DESC", (session_id,)) |
| notes = [dict(row) for row in cursor.fetchall()] |
| |
| |
| cursor.execute("SELECT * FROM highlights WHERE session_id = ? ORDER BY created_at DESC", (session_id,)) |
| highlights = [dict(row) for row in cursor.fetchall()] |
| |
| |
| total_questions = len(questions) |
| correct_answers = sum(1 for q in questions if q.get("is_correct") == 1) |
| answered = len([q for q in questions if q.get("user_answer")]) |
| |
| accuracy = round((correct_answers / total_questions * 100) if total_questions > 0 else 0, 1) |
| completion_rate = round((answered / total_questions * 100) if total_questions > 0 else 0, 1) |
| |
| conn.close() |
| |
| return { |
| "session": dict(session), |
| "questions": questions, |
| "flashcards": flashcards, |
| "pages": pages, |
| "notes": notes, |
| "highlights": highlights, |
| "performance": { |
| "total_questions": total_questions, |
| "correct_answers": correct_answers, |
| "accuracy": accuracy, |
| "answered": answered, |
| "completion_rate": completion_rate |
| } |
| } |
|
|
| @app.get("/api/user/sessions") |
| async def get_user_sessions( |
| limit: int = 50, |
| offset: int = 0, |
| archived: bool = False |
| ): |
| """Get all user sessions with pagination and filtering""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| cursor = conn.cursor() |
| |
| query = """ |
| SELECT s.*, |
| (SELECT COUNT(*) FROM questions WHERE session_id = s.id) as question_count, |
| (SELECT SUM(is_correct) FROM questions WHERE session_id = s.id) as correct_count, |
| (SELECT COUNT(*) FROM notes WHERE session_id = s.id) as note_count |
| FROM sessions s |
| WHERE s.is_archived = ? |
| ORDER BY s.last_accessed DESC |
| LIMIT ? OFFSET ? |
| """ |
| |
| cursor.execute(query, (1 if archived else 0, limit, offset)) |
| sessions = [dict(row) for row in cursor.fetchall()] |
| |
| for session in sessions: |
| total = session.get("question_count", 0) |
| correct = session.get("correct_count", 0) or 0 |
| session["accuracy"] = round((correct / total * 100) if total > 0 else 0, 1) |
| session["completion"] = round(len([q for q in session.get("questions", []) if q.get("user_answer")]) / total * 100 if total > 0 else 0, 1) |
| |
| |
| cursor.execute("SELECT COUNT(*) FROM sessions WHERE is_archived = ?", (1 if archived else 0,)) |
| total_count = cursor.fetchone()[0] |
| |
| conn.close() |
| |
| return { |
| "sessions": sessions, |
| "total": total_count, |
| "limit": limit, |
| "offset": offset |
| } |
|
|
| |
| |
| |
|
|
| @app.post("/api/submit-answer") |
| async def submit_answer( |
| session_id: str = Form(...), |
| question_id: str = Form(...), |
| user_answer: str = Form(...), |
| time_spent: int = Form(0) |
| ): |
| """Submit and evaluate an answer with intelligent scoring""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| |
| cursor.execute(""" |
| SELECT correct_answer, question_type, explanation, difficulty, attempts |
| FROM questions |
| WHERE id = ? AND session_id = ? |
| """, (question_id, session_id)) |
| result = cursor.fetchone() |
| |
| if not result: |
| conn.close() |
| return { |
| "is_correct": True, |
| "correct_answer": "", |
| "feedback": "Answer recorded!", |
| "points_earned": 0 |
| } |
| |
| correct_answer = result[0] |
| question_type = result[1] |
| explanation = result[2] if len(result) > 2 else "" |
| difficulty = result[3] if len(result) > 3 else "medium" |
| attempts = (result[4] or 0) + 1 |
| |
| |
| points_map = {"easy": 10, "medium": 20, "hard": 35} |
| base_points = points_map.get(difficulty, 15) |
| |
| |
| is_correct = 0 |
| user_clean = user_answer.strip().lower() |
| correct_clean = correct_answer.strip().lower() |
| |
| if question_type == "multiple_choice": |
| is_correct = 1 if user_clean == correct_clean else 0 |
| |
| elif question_type == "true_false": |
| is_correct = 1 if user_clean == correct_clean else 0 |
| |
| else: |
| |
| if user_clean == correct_clean: |
| is_correct = 1 |
| |
| elif len(user_clean) > 40 and (correct_clean in user_clean or user_clean in correct_clean): |
| is_correct = 1 |
| else: |
| |
| keywords = re.findall(r'\b[a-z]{4,}\b', correct_clean) |
| keyword_matches = sum(1 for kw in keywords if kw in user_clean) |
| is_correct = 1 if keyword_matches >= max(1, len(keywords) * 0.3) else 0 |
| |
| |
| points_earned = base_points if is_correct else 0 |
| if attempts > 1: |
| points_earned = int(points_earned * (0.8 ** (attempts - 1))) |
| |
| |
| cursor.execute(""" |
| UPDATE questions |
| SET user_answer = ?, is_correct = ?, time_spent = ?, attempts = ?, last_attempt = CURRENT_TIMESTAMP |
| WHERE id = ? AND session_id = ? |
| """, (user_answer, is_correct, time_spent, attempts, question_id, session_id)) |
| |
| |
| cursor.execute(""" |
| UPDATE user_profile |
| SET total_questions_answered = total_questions_answered + 1, |
| total_correct_answers = total_correct_answers + ?, |
| xp_points = xp_points + ?, |
| updated_at = CURRENT_TIMESTAMP |
| WHERE id = 1 |
| """, (is_correct, points_earned)) |
| |
| |
| cursor.execute("SELECT xp_points, level FROM user_profile WHERE id = 1") |
| profile = cursor.fetchone() |
| xp = profile[0] if profile else 0 |
| current_level = profile[1] if profile else 1 |
| new_level = max(1, int(xp ** 0.4)) |
| |
| level_up = new_level > current_level |
| if level_up: |
| cursor.execute("UPDATE user_profile SET level = ? WHERE id = 1", (new_level,)) |
| |
| |
| cursor.execute(""" |
| INSERT INTO study_activity (session_id, activity_type, duration) |
| VALUES (?, 'answer', ?) |
| """, (session_id, time_spent)) |
| |
| conn.commit() |
| conn.close() |
| |
| return { |
| "is_correct": bool(is_correct), |
| "correct_answer": correct_answer, |
| "feedback": "Correct! π Great job!" if is_correct else f"Not quite right. The correct answer is: {correct_answer[:300]}", |
| "explanation": explanation, |
| "points_earned": points_earned, |
| "level_up": level_up, |
| "new_level": new_level if level_up else None |
| } |
|
|
| |
| |
| |
|
|
| @app.delete("/api/session/{session_id}") |
| async def delete_session(session_id: str): |
| """Delete a session and all associated data""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("SELECT id FROM sessions WHERE id = ?", (session_id,)) |
| if not cursor.fetchone(): |
| conn.close() |
| raise HTTPException(status_code=404, detail="Session not found") |
| |
| cursor.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "message": "Session deleted successfully"} |
|
|
| @app.post("/api/session/{session_id}/archive") |
| async def archive_session(session_id: str): |
| """Archive a session""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("UPDATE sessions SET is_archived = 1 WHERE id = ?", (session_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "message": "Session archived"} |
|
|
| @app.post("/api/session/{session_id}/restore") |
| async def restore_session(session_id: str): |
| """Restore an archived session""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("UPDATE sessions SET is_archived = 0 WHERE id = ?", (session_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "message": "Session restored"} |
|
|
| |
| |
| |
|
|
| @app.post("/api/save-note") |
| async def save_note( |
| session_id: str = Form(...), |
| title: str = Form(...), |
| content: str = Form(...), |
| tags: str = Form(None), |
| color: str = Form(None), |
| note_id: str = Form(None) |
| ): |
| """Save or update a note""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| if note_id: |
| |
| cursor.execute(""" |
| UPDATE notes |
| SET title = ?, content = ?, tags = ?, color = ?, updated_at = CURRENT_TIMESTAMP |
| WHERE id = ? AND session_id = ? |
| """, (title, content, tags, color, note_id, session_id)) |
| else: |
| |
| note_id = generate_id("note") |
| cursor.execute(""" |
| INSERT INTO notes (id, session_id, title, content, tags, color) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, (note_id, session_id, title, content, tags, color)) |
| |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "note_id": note_id} |
|
|
| @app.delete("/api/note/{note_id}") |
| async def delete_note(note_id: str): |
| """Delete a note""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("DELETE FROM notes WHERE id = ?", (note_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True} |
|
|
| @app.post("/api/note/{note_id}/pin") |
| async def pin_note(note_id: str): |
| """Pin or unpin a note""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("UPDATE notes SET is_pinned = NOT is_pinned WHERE id = ?", (note_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True} |
|
|
| |
| |
| |
|
|
| @app.post("/api/highlight") |
| async def create_highlight( |
| session_id: str = Form(...), |
| text: str = Form(...), |
| context: str = Form(None), |
| color: str = Form("#fef08a"), |
| page_number: int = Form(None) |
| ): |
| """Create a text highlight""" |
| |
| highlight_id = generate_id("hl") |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute(""" |
| INSERT INTO highlights (id, session_id, text, context, color, page_number) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, (highlight_id, session_id, text, context, color, page_number)) |
| |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "highlight_id": highlight_id} |
|
|
| @app.delete("/api/highlight/{highlight_id}") |
| async def delete_highlight(highlight_id: str): |
| """Delete a highlight""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute("DELETE FROM highlights WHERE id = ?", (highlight_id,)) |
| conn.commit() |
| conn.close() |
| |
| return {"success": True} |
|
|
| |
| |
| |
|
|
| @app.post("/api/flashcard/review") |
| async def review_flashcard( |
| flashcard_id: str = Form(...), |
| quality: int = Form(...) |
| ): |
| """Review a flashcard with spaced repetition""" |
| |
| |
| quality_map = {0: 0, 1: 0, 2: 0, 3: 2, 4: 3, 5: 4} |
| ease_factor_map = {0: 1.3, 1: 1.3, 2: 1.3, 3: 1.8, 4: 2.2, 5: 2.5} |
| |
| new_ease = ease_factor_map.get(quality, 1.8) |
| new_interval = max(1, int(quality_map.get(quality, 1))) |
| |
| if quality >= 4: |
| new_mastery = min(100, quality * 20) |
| else: |
| new_mastery = max(0, quality * 10) |
| |
| next_review = datetime.now() + timedelta(days=new_interval) |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute(""" |
| UPDATE flashcards |
| SET mastery_level = ?, last_reviewed = CURRENT_TIMESTAMP, next_review = ? |
| WHERE id = ? |
| """, (new_mastery, next_review, flashcard_id)) |
| |
| conn.commit() |
| conn.close() |
| |
| return {"success": True, "next_review": next_review.isoformat()} |
|
|
| |
| |
| |
|
|
| @app.get("/api/user/profile") |
| async def get_user_profile(): |
| """Get complete user profile with analytics""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| cursor = conn.cursor() |
| |
| cursor.execute("SELECT * FROM user_profile WHERE id = 1") |
| profile = dict(cursor.fetchone() or {}) |
| |
| |
| cursor.execute(""" |
| SELECT date, COUNT(*) as activity_count, SUM(duration) as total_duration |
| FROM study_activity |
| WHERE date >= DATE('now', '-7 days') |
| GROUP BY date |
| ORDER BY date |
| """) |
| weekly_activity = [dict(row) for row in cursor.fetchall()] |
| |
| |
| cursor.execute(""" |
| SELECT difficulty, |
| COUNT(*) as total, |
| SUM(is_correct) as correct |
| FROM questions |
| WHERE user_answer IS NOT NULL |
| GROUP BY difficulty |
| """) |
| performance_by_difficulty = [dict(row) for row in cursor.fetchall()] |
| |
| |
| cursor.execute(""" |
| SELECT DISTINCT date |
| FROM study_activity |
| WHERE date >= DATE('now', '-30 days') |
| ORDER BY date |
| """) |
| active_dates = [row[0] for row in cursor.fetchall()] |
| |
| total_questions = profile.get("total_questions_answered", 0) |
| correct_answers = profile.get("total_correct_answers", 0) |
| accuracy = round((correct_answers / total_questions * 100) if total_questions > 0 else 0, 1) |
| |
| conn.close() |
| |
| |
| current_level = profile.get("level", 1) |
| current_xp = profile.get("xp_points", 0) |
| xp_for_next = int((current_level + 1) ** 2.5) |
| xp_for_current = int(current_level ** 2.5) |
| xp_progress = current_xp - xp_for_current |
| xp_needed = xp_for_next - xp_for_current |
| |
| return { |
| "profile": profile, |
| "accuracy": accuracy, |
| "weekly_activity": weekly_activity, |
| "performance_by_difficulty": performance_by_difficulty, |
| "active_dates": active_dates, |
| "level_progress": { |
| "current_level": current_level, |
| "current_xp": current_xp, |
| "xp_needed_for_next": xp_needed, |
| "xp_progress_percent": min(100, int((xp_progress / xp_needed) * 100)) if xp_needed > 0 else 0 |
| }, |
| "badges": { |
| "has_streak_7": profile.get("longest_streak", 0) >= 7, |
| "has_streak_30": profile.get("longest_streak", 0) >= 30, |
| "has_100_questions": total_questions >= 100, |
| "has_1000_questions": total_questions >= 1000, |
| "has_90_percent_accuracy": accuracy >= 90 |
| } |
| } |
|
|
| @app.post("/api/update-study-time") |
| async def update_study_time( |
| session_id: str = Form(...), |
| time_spent: int = Form(0) |
| ): |
| """Update total study time for a session""" |
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
| |
| cursor.execute(""" |
| UPDATE sessions |
| SET study_time_total = study_time_total + ?, last_accessed = CURRENT_TIMESTAMP |
| WHERE id = ? |
| """, (time_spent, session_id)) |
| |
| cursor.execute(""" |
| UPDATE user_profile |
| SET total_study_time = total_study_time + ?, updated_at = CURRENT_TIMESTAMP |
| WHERE id = 1 |
| """, (time_spent,)) |
| |
| cursor.execute(""" |
| INSERT INTO study_activity (session_id, activity_type, duration) |
| VALUES (?, 'study', ?) |
| """, (session_id, time_spent)) |
| |
| conn.commit() |
| conn.close() |
| |
| return {"success": True} |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| |
| print("\n" + "=" * 80) |
| print("π StudyFlow AI Backend - COMPLETE PRODUCTION VERSION 5.0") |
| print("=" * 80) |
| print(f"π Database: {DB_PATH}") |
| print(f"π€ AI Mode: Advanced Local NLP (15 question types)") |
| print(f"π Features: PDF extraction, YouTube transcripts, Smart analytics") |
| print(f"π― Difficulty Levels: Easy, Medium, Hard") |
| print(f"π Question Types: Definition, Fact, True/False, Fill Blank, Concept,") |
| print(f" Relationship, Multiple Choice, Cause/Effect, Analysis,") |
| print(f" Evaluation, Application, Synthesis, Comparison") |
| print("=" * 80) |
| print("π Server: http://0.0.0.0:7860") |
| print("π API Docs: http://0.0.0.0:7860/docs") |
| print("π Redoc: http://0.0.0.0:7860/redoc") |
| print("=" * 80) |
| print("π‘ Tip: Create a session with your study material and the AI will") |
| print(" generate intelligent questions based on your actual content!") |
| print("=" * 80 + "\n") |
| |
| uvicorn.run( |
| app, |
| host="0.0.0.0", |
| port=7860, |
| log_level="info", |
| access_log=True |
| ) |