studyloop / app.py
andevs's picture
Update app.py
958ff3e verified
Raw
History Blame Contribute Delete
71 kB
"""
StudyFlow AI - COMPLETE PRODUCTION BACKEND
Features: Full database, Advanced NLP, PDF extraction, YouTube transcripts, 15 question types, Analytics, Streaks, Notes, Flashcards
Version: 5.0.0 - Full Release
"""
import os
import json
import sqlite3
import hashlib
import tempfile
import re
import requests
import uuid
import math
from collections import Counter
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple, Any
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import PyPDF2
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound
# ============================================
# APPLICATION INITIALIZATION
# ============================================
app = FastAPI(
title="StudyFlow AI",
version="5.0.0",
description="Complete AI-Powered Study Assistant with Advanced NLP",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================
# DATABASE SCHEMA - COMPLETE
# ============================================
DB_PATH = "studyflow.db"
def init_database():
"""Initialize complete database with all tables, indexes, and triggers"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Enable foreign keys
cursor.execute("PRAGMA foreign_keys = ON")
# ========== SESSIONS TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
content_type TEXT NOT NULL CHECK(content_type IN ('text', 'pdf', 'youtube')),
difficulty TEXT NOT NULL CHECK(difficulty IN ('easy', 'medium', 'hard')),
content TEXT,
content_hash TEXT,
selected_pages TEXT,
total_pages INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
study_time_total INTEGER DEFAULT 0,
is_archived INTEGER DEFAULT 0
)
''')
# ========== QUESTIONS TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS questions (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
question_text TEXT NOT NULL,
question_type TEXT NOT NULL CHECK(question_type IN ('multiple_choice', 'true_false', 'short_answer', 'fill_blank')),
options TEXT,
correct_answer TEXT NOT NULL,
difficulty TEXT NOT NULL CHECK(difficulty IN ('easy', 'medium', 'hard')),
explanation TEXT,
user_answer TEXT,
is_correct INTEGER DEFAULT 0,
time_spent INTEGER DEFAULT 0,
page_reference INTEGER,
attempts INTEGER DEFAULT 0,
last_attempt TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE
)
''')
# ========== FLASHCARDS TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS flashcards (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
front TEXT NOT NULL,
back TEXT NOT NULL,
category TEXT,
difficulty TEXT DEFAULT 'medium',
mastery_level INTEGER DEFAULT 0,
last_reviewed TIMESTAMP,
next_review TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE
)
''')
# ========== NOTES TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS notes (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT,
color TEXT,
is_pinned INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE
)
''')
# ========== PAGES TABLE (PDF page cache) ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS pages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
page_number INTEGER NOT NULL,
content TEXT NOT NULL,
word_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE,
UNIQUE(session_id, page_number)
)
''')
# ========== HIGHLIGHTS TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS highlights (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
text TEXT NOT NULL,
context TEXT,
color TEXT DEFAULT '#fef08a',
page_number INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE
)
''')
# ========== USER PROFILE TABLE ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_profile (
id INTEGER PRIMARY KEY CHECK (id = 1),
display_name TEXT DEFAULT 'Learner',
total_questions_answered INTEGER DEFAULT 0,
total_correct_answers INTEGER DEFAULT 0,
total_study_time INTEGER DEFAULT 0,
total_sessions_created INTEGER DEFAULT 0,
total_flashcards_reviewed INTEGER DEFAULT 0,
streak_days INTEGER DEFAULT 0,
longest_streak INTEGER DEFAULT 0,
last_active_date TEXT,
xp_points INTEGER DEFAULT 0,
level INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# ========== STUDY_ACTIVITY TABLE (for analytics) ==========
cursor.execute('''
CREATE TABLE IF NOT EXISTS study_activity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
activity_type TEXT NOT NULL,
duration INTEGER DEFAULT 0,
date DATE DEFAULT CURRENT_DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (id) ON DELETE CASCADE
)
''')
# ========== CREATE INDEXES FOR PERFORMANCE ==========
indexes = [
"CREATE INDEX IF NOT EXISTS idx_questions_session ON questions(session_id)",
"CREATE INDEX IF NOT EXISTS idx_questions_difficulty ON questions(difficulty)",
"CREATE INDEX IF NOT EXISTS idx_questions_correct ON questions(is_correct)",
"CREATE INDEX IF NOT EXISTS idx_flashcards_session ON flashcards(session_id)",
"CREATE INDEX IF NOT EXISTS idx_flashcards_review ON flashcards(next_review)",
"CREATE INDEX IF NOT EXISTS idx_pages_session ON pages(session_id)",
"CREATE INDEX IF NOT EXISTS idx_notes_session ON notes(session_id)",
"CREATE INDEX IF NOT EXISTS idx_highlights_session ON highlights(session_id)",
"CREATE INDEX IF NOT EXISTS idx_sessions_accessed ON sessions(last_accessed)",
"CREATE INDEX IF NOT EXISTS idx_sessions_created ON sessions(created_at)",
"CREATE INDEX IF NOT EXISTS idx_study_activity_date ON study_activity(date)",
"CREATE INDEX IF NOT EXISTS idx_study_activity_session ON study_activity(session_id)"
]
for idx in indexes:
cursor.execute(idx)
# ========== CREATE TRIGGERS ==========
triggers = [
"""
CREATE TRIGGER IF NOT EXISTS update_session_timestamp
AFTER UPDATE ON sessions
BEGIN
UPDATE sessions SET last_accessed = CURRENT_TIMESTAMP WHERE id = NEW.id;
END
""",
"""
CREATE TRIGGER IF NOT EXISTS update_note_timestamp
AFTER UPDATE ON notes
BEGIN
UPDATE notes SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END
""",
"""
CREATE TRIGGER IF NOT EXISTS update_profile_timestamp
AFTER UPDATE ON user_profile
BEGIN
UPDATE user_profile SET updated_at = CURRENT_TIMESTAMP WHERE id = 1;
END
"""
]
for trigger in triggers:
cursor.execute(trigger)
# Initialize user profile if not exists
cursor.execute("INSERT OR IGNORE INTO user_profile (id) VALUES (1)")
conn.commit()
conn.close()
print("βœ… Database initialized with complete schema")
# Run database initialization
init_database()
# ============================================
# UTILITY FUNCTIONS
# ============================================
def generate_id(prefix: str = "") -> str:
"""Generate a unique ID with optional prefix"""
unique_id = str(uuid.uuid4())[:12]
return f"{prefix}_{unique_id}" if prefix else unique_id
def hash_content(content: str) -> str:
"""Generate hash for content deduplication"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def clean_text(text: str, max_length: int = 50000) -> str:
"""Clean and truncate text"""
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text[:max_length]
# ============================================
# PDF EXTRACTION
# ============================================
def extract_pdf_text(file_path: str) -> Tuple[str, Dict[int, str]]:
"""Extract text from PDF with page-by-page content"""
pages_text = {}
full_text = ""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
total_pages = len(pdf_reader.pages)
for page_num, page in enumerate(pdf_reader.pages, start=1):
try:
page_text = page.extract_text()
if page_text:
page_text = re.sub(r'\s+', ' ', page_text).strip()
if len(page_text) > 50:
pages_text[page_num] = page_text
full_text += f"\n\n{'='*50}\nPAGE {page_num}\n{'='*50}\n{page_text}\n"
else:
pages_text[page_num] = f"[Page {page_num} - Limited text content]"
else:
pages_text[page_num] = f"[Page {page_num} - No extractable text]"
except Exception as e:
print(f"Error on page {page_num}: {str(e)}")
pages_text[page_num] = f"[Page {page_num} - Extraction error]"
print(f"βœ… Extracted {len([p for p in pages_text if 'No extractable' not in pages_text[p]])} pages with content")
return full_text[:100000], pages_text
except Exception as e:
print(f"❌ PDF extraction error: {str(e)}")
return "", {}
# ============================================
# YOUTUBE EXTRACTION
# ============================================
def extract_youtube_text(url: str, start_time: float = None, end_time: float = None) -> str:
"""Extract transcript from YouTube video with time filtering"""
try:
# Extract video ID
if "youtube.com/watch?v=" in url:
video_id = url.split("v=")[-1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("/")[-1].split("?")[0]
else:
return ""
# Get transcript
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
# Filter by time if specified
if start_time is not None or end_time is not None:
filtered = []
for entry in transcript_list:
entry_time = entry['start']
if start_time is not None and entry_time < start_time:
continue
if end_time is not None and entry_time > end_time:
continue
filtered.append(entry)
transcript_list = filtered
# Combine text
text = " ".join([entry['text'] for entry in transcript_list])
print(f"βœ… Extracted {len(transcript_list)} segments from YouTube video {video_id}")
return text[:100000]
except TranscriptsDisabled:
print("❌ Transcripts disabled for this video")
return ""
except NoTranscriptFound:
print("❌ No transcript found for this video")
return ""
except Exception as e:
print(f"❌ YouTube extraction error: {str(e)}")
return ""
# ============================================
# ADVANCED NLP FOR CONTENT ANALYSIS
# ============================================
# Comprehensive stopwords list
STOPWORDS = {
'a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't", 'as', 'at',
'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by', "can't", 'cannot', 'could',
"couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down', 'during', 'each', 'few', 'for',
'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't", 'having', 'he', "he'd", "he'll", "he's",
'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his', 'how', "how's", 'i', "i'd", "i'll", "i'm",
"i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself', "let's", 'me', 'more', 'most', "mustn't",
'my', 'myself', 'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or', 'other', 'ought', 'our', 'ours',
'ourselves', 'out', 'over', 'own', 'same', "shan't", 'she', "she'd", "she'll", "she's", 'should', "shouldn't",
'so', 'some', 'such', 'than', 'that', "that's", 'the', 'their', 'theirs', 'them', 'themselves', 'then', 'there',
"there's", 'these', 'they', "they'd", "they'll", "they're", "they've", 'this', 'those', 'through', 'to', 'too',
'under', 'until', 'up', 'very', 'was', "wasn't", 'we', "we'd", "we'll", "we're", "we've", 'were', "weren't",
'what', "what's", 'when', "when's", 'where', "where's", 'which', 'while', 'who', "who's", 'whom', 'why', "why's",
'with', "won't", 'would', "wouldn't", 'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 'yourself',
'yourselves', 'however', 'therefore', 'although', 'especially', 'important', 'different', 'significant', 'because',
'since', 'while', 'whereas', 'there', 'their', 'theyre', 'were', 'weve', 'youve', 'theyve', 'dont', 'doesnt', 'didnt'
}
def extract_key_phrases(text: str, max_phrases: int = 30) -> List[str]:
"""Extract important phrases using TF-IDF style scoring with n-grams"""
text = text.lower()
# Extract words
words = re.findall(r'\b[a-z]{4,}\b', text)
word_counts = Counter([w for w in words if w not in STOPWORDS])
# Extract 2-word phrases
two_word_phrases = []
for i in range(len(words) - 1):
if words[i] not in STOPWORDS and words[i+1] not in STOPWORDS:
two_word_phrases.append(f"{words[i]} {words[i+1]}")
two_word_counts = Counter(two_word_phrases)
# Extract 3-word phrases
three_word_phrases = []
for i in range(len(words) - 2):
if all(w not in STOPWORDS for w in words[i:i+3]):
three_word_phrases.append(f"{words[i]} {words[i+1]} {words[i+2]}")
three_word_counts = Counter(three_word_phrases)
# Score and combine
scored = []
for word, count in word_counts.most_common(20):
scored.append((word, count * 1.0))
for phrase, count in two_word_counts.most_common(15):
scored.append((phrase, count * 1.5))
for phrase, count in three_word_counts.most_common(10):
scored.append((phrase, count * 2.0))
# Sort by score and remove duplicates
scored.sort(key=lambda x: x[1], reverse=True)
seen = set()
results = []
for phrase, _ in scored:
if phrase not in seen:
seen.add(phrase)
results.append(phrase)
if len(results) >= max_phrases:
break
return results
def extract_named_entities(text: str) -> List[str]:
"""Extract capitalized words (potential named entities)"""
# Look for capitalized words and sequences
entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text)
# Also look for ALL CAPS (acronyms)
acronyms = re.findall(r'\b[A-Z]{2,}\b', text)
entities.extend(acronyms)
# Remove duplicates while preserving order
seen = set()
result = []
for entity in entities:
if entity not in seen:
seen.add(entity)
result.append(entity)
return result[:15]
def extract_numbers_and_dates(text: str) -> List[Dict[str, Any]]:
"""Extract numbers, percentages, dates with context"""
patterns = [
(r'\b\d{4}\b', 'year'),
(r'\b\d+\.\d+\b', 'decimal'),
(r'\b\d+%\b', 'percentage'),
(r'\b\d+\s+(?:million|billion|thousand)\b', 'quantity'),
(r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b', 'date'),
(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b', 'date'),
(r'\b\d+\b', 'number')
]
results = []
for pattern, type_name in patterns:
matches = re.findall(pattern, text)
for match in matches[:3]: # Limit per type
results.append({"value": match, "type": type_name})
# Remove duplicates
seen = set()
unique_results = []
for r in results:
if r["value"] not in seen:
seen.add(r["value"])
unique_results.append(r)
return unique_results[:10]
def extract_sentences(text: str, min_length: int = 40, max_length: int = 400) -> List[str]:
"""Extract meaningful sentences from text"""
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if min_length <= len(s.strip()) <= max_length]
# Remove sentences that are mostly numbers or symbols
sentences = [s for s in sentences if re.search(r'[a-zA-Z]{4,}', s)]
return sentences
def calculate_reading_time(text: str) -> int:
"""Calculate estimated reading time in minutes"""
words = len(text.split())
return max(1, round(words / 200)) # 200 words per minute
# ============================================
# INTELLIGENT QUESTION GENERATION
# ============================================
def generate_questions_from_content(
text: str,
difficulty: str,
count: int,
session_id: str = None,
page_ref: int = None
) -> List[Dict]:
"""Generate intelligent questions based on actual content analysis"""
if not text or len(text) < 200:
return [{
"id": generate_id("q"),
"question_text": "Please provide more content (at least 200 characters) to generate quality questions.",
"question_type": "short_answer",
"options": None,
"correct_answer": "Add more study material (200+ characters)",
"difficulty": difficulty,
"explanation": "More detailed content helps create better, more specific questions about your material.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
}]
# Extract content features
key_phrases = extract_key_phrases(text, 30)
named_entities = extract_named_entities(text)
numbers = extract_numbers_and_dates(text)
sentences = extract_sentences(text)
if not sentences:
sentences = [text[:300]]
questions = []
# Question type distribution based on difficulty
if difficulty == "easy":
type_distribution = [
"definition", "definition", "definition",
"fact", "fact", "fact",
"truefalse", "truefalse",
"fillblank"
]
elif difficulty == "medium":
type_distribution = [
"concept", "concept", "concept",
"relationship", "relationship",
"multiplechoice", "multiplechoice", "multiplechoice",
"causeeffect", "causeeffect"
]
else: # hard
type_distribution = [
"analysis", "analysis", "analysis",
"evaluation", "evaluation",
"application", "application",
"synthesis",
"comparison"
]
for i in range(count):
qid = generate_id("q")
q_type = type_distribution[i % len(type_distribution)]
# DEFINITION QUESTIONS
if q_type == "definition" and key_phrases:
concept = key_phrases[i % len(key_phrases)]
questions.append({
"id": qid,
"question_text": f"Define or explain the term/phrase: \"{concept}\". What does it mean in the context of this material?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"\"{concept}\" refers to an important concept discussed in the text. Based on the material, it means [provide specific definition from the text]. This concept is significant because [explain importance].",
"difficulty": "easy",
"explanation": f"Look for where \"{concept}\" appears in the text. The definition should come directly from or be clearly implied by the material. Pay attention to how this term is introduced and used.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# FACTUAL QUESTIONS
elif q_type == "fact" and numbers:
num_info = numbers[i % len(numbers)]
num_value = num_info["value"]
num_type = num_info["type"]
questions.append({
"id": qid,
"question_text": f"What is the significance of {num_value} in this material? Why is this specific {num_type} mentioned?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The {num_type} {num_value} appears in the context: {sentences[i % len(sentences)][:150]}... This number/amount is significant because [explain its meaning or what it represents].",
"difficulty": "easy",
"explanation": "Look for where this number appears and what it measures, counts, quantifies, or represents in the text. Numbers often indicate important data points.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# TRUE/FALSE QUESTIONS
elif q_type == "truefalse" and sentences:
sentence = sentences[i % len(sentences)]
questions.append({
"id": qid,
"question_text": f"True or False: {sentence[:200]}...",
"question_type": "true_false",
"options": None,
"correct_answer": "True",
"difficulty": "easy",
"explanation": "This statement appears directly in the study material as presented. The text explicitly states this information.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# FILL IN THE BLANK
elif q_type == "fillblank" and sentences:
sentence = sentences[i % len(sentences)]
words = sentence.split()
if len(words) >= 5:
blank_pos = len(words) // 2
blank_word = words[blank_pos]
question_text = sentence.replace(blank_word, "__________", 1)
questions.append({
"id": qid,
"question_text": f"Complete the following sentence from the material: \"{question_text}\"",
"question_type": "short_answer",
"options": None,
"correct_answer": blank_word,
"difficulty": "easy",
"explanation": f"The missing word is '{blank_word}', which is key to understanding this sentence. This word appears in the original text.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# CONCEPT QUESTIONS
elif q_type == "concept" and key_phrases:
concept = key_phrases[i % len(key_phrases)]
questions.append({
"id": qid,
"question_text": f"Explain the concept of \"{concept}\" in your own words. What makes it important to the overall topic?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The concept of \"{concept}\" refers to [explain meaning]. It is important because [explain significance from text]. This concept relates to the main topic by [describe connection].",
"difficulty": "medium",
"explanation": "Demonstrate your understanding by explaining the concept without simply copying the text. Show that you truly grasp what it means and why it matters.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# RELATIONSHIP QUESTIONS
elif q_type == "relationship" and len(key_phrases) >= 2:
concept1 = key_phrases[i % len(key_phrases)]
concept2 = key_phrases[(i+1) % len(key_phrases)]
questions.append({
"id": qid,
"question_text": f"How do \"{concept1}\" and \"{concept2}\" relate to each other? Explain their connection based on the material.",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The material shows that {concept1} and {concept2} are related because [explain relationship from text]. They interact/influence each other by [describe connection]. Understanding both helps [explain importance].",
"difficulty": "medium",
"explanation": "Look for how these concepts appear together in the text or how one concept affects or relates to the other. Consider cause-effect, part-whole, or sequential relationships.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# MULTIPLE CHOICE QUESTIONS
elif q_type == "multiplechoice" and key_phrases:
concept = key_phrases[i % len(key_phrases)]
options = [
f"The material emphasizes {concept} as a central theme that drives understanding of the topic",
f"A minor detail mentioned only briefly in passing without significant importance",
f"An example used primarily to illustrate a different point entirely",
f"Background context that sets up the main argument but isn't central"
]
questions.append({
"id": qid,
"question_text": f"Based on the material, which statement best describes the role of \"{concept}\"?",
"question_type": "multiple_choice",
"options": json.dumps(options),
"correct_answer": options[0],
"difficulty": "medium",
"explanation": f"The text discusses {concept} as an important element that helps explain the broader topic. Look for how much attention is given to this concept and what the author says about it.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# CAUSE AND EFFECT QUESTIONS
elif q_type == "causeeffect" and sentences:
sentence = sentences[i % len(sentences)]
cause_indicators = ['because', 'due to', 'causes', 'leads to', 'results in', 'as a result', 'therefore', 'consequently']
if any(indicator in sentence.lower() for indicator in cause_indicators):
questions.append({
"id": qid,
"question_text": f"What causes or leads to the situation described in: \"{sentence[:150]}...\"? Explain the causal relationship.",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The material indicates that [specific cause] leads to [specific effect]. This happens because [explain mechanism from text]. The evidence for this includes [supporting details].",
"difficulty": "hard",
"explanation": "Look for cause-and-effect language like 'because', 'therefore', 'as a result', 'leads to', 'causes'. Identify what triggers the outcome and what the consequences are.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
else:
questions.append({
"id": qid,
"question_text": f"What would be the likely outcome if the principles in \"{sentence[:150]}...\" were applied differently or modified?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"If the principles were applied differently, the likely outcome would be [alternative outcome]. This is because [reasoning based on material]. The original text suggests that [support from text].",
"difficulty": "hard",
"explanation": "Think critically about how changing key variables or assumptions would affect the result. Use reasoning based on what the text tells you about how things work.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# ANALYSIS QUESTIONS
elif q_type == "analysis" and sentences:
sentence = sentences[i % len(sentences)]
questions.append({
"id": qid,
"question_text": f"Analyze the following statement from the material: \"{sentence[:200]}...\" What are the key assumptions and implications?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"This statement assumes that [identify 2-3 underlying assumptions]. The key implications include [explain consequences]. This matters because [connect to larger point or argument in the text].",
"difficulty": "hard",
"explanation": "Consider what the statement takes for granted (assumptions) and what follows from it (implications). Think about what must be true for this statement to be valid, and what results from it being true.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# EVALUATION QUESTIONS
elif q_type == "evaluation" and sentences:
sentence = sentences[i % len(sentences)]
questions.append({
"id": qid,
"question_text": f"Evaluate the validity of this claim from the material: \"{sentence[:200]}...\" Do you agree? Why or why not based on the evidence presented?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"Based on the material, this claim is [supported/partially supported/not supported] because [evidence from text]. I [agree/disagree] because [reasoning]. The strengths of this claim include [strengths], while weaknesses include [weaknesses].",
"difficulty": "hard",
"explanation": "Assess the claim against the evidence and reasoning provided in the text. Consider both supporting evidence and potential counterarguments. Evaluate the logic and completeness of the claim.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# APPLICATION QUESTIONS
elif q_type == "application" and key_phrases:
concept = key_phrases[i % len(key_phrases)]
questions.append({
"id": qid,
"question_text": f"How could you apply the concept of \"{concept}\" to solve a real-world problem or understand a real situation? Provide a specific, detailed example.",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The concept of {concept} could be applied to [real-world situation/domain]. For example, [specific concrete application]. This demonstrates its importance because [explain why this application matters]. The text suggests this application because [connection to material].",
"difficulty": "hard",
"explanation": "Think about how this theoretical concept translates to practical, real-world use. Consider different domains where understanding this concept would be valuable.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# SYNTHESIS QUESTIONS
elif q_type == "synthesis" and len(sentences) >= 2:
sent1 = sentences[i % len(sentences)]
sent2 = sentences[(i+1) % len(sentences)]
questions.append({
"id": qid,
"question_text": f"Synthesize the ideas from these two passages:\n\nPassage 1: \"{sent1[:150]}...\"\n\nPassage 2: \"{sent2[:150]}...\"\n\nWhat conclusion can you draw by combining these ideas?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"By combining these ideas, we can conclude that [synthesis of both points]. Together they suggest [broader insight]. The relationship between these passages reveals [connection]. This synthesis helps us understand [larger implication].",
"difficulty": "hard",
"explanation": "Look for connections, themes, or insights that emerge when considering multiple ideas together. Don't just summarize each separately - find what new understanding comes from combining them.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# COMPARISON QUESTIONS
elif q_type == "comparison" and len(key_phrases) >= 2:
concept1 = key_phrases[i % len(key_phrases)]
concept2 = key_phrases[(i+1) % len(key_phrases)]
questions.append({
"id": qid,
"question_text": f"Compare and contrast \"{concept1}\" and \"{concept2}\". What are the key similarities and differences according to the material?",
"question_type": "short_answer",
"options": None,
"correct_answer": f"{concept1} and {concept2} are similar in that [similarities]. However, they differ because [differences]. Understanding both helps [explain importance]. The material indicates that [additional insight].",
"difficulty": "hard",
"explanation": "Create a mental Venn diagram - what characteristics do they share? What makes each unique? Think about their definitions, functions, examples, and relationships to other concepts.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
# ULTIMATE FALLBACK
else:
questions.append({
"id": qid,
"question_text": f"What is the main point or key takeaway from this section of the material? Summarize the most important idea.",
"question_type": "short_answer",
"options": None,
"correct_answer": f"The main point is [identify central idea from text]. This is important because [explain significance]. The text supports this by [mention supporting evidence].",
"difficulty": difficulty,
"explanation": "Look for topic sentences, repeated ideas, conclusions, or summary statements in the material. Consider what the author is trying to communicate as the primary message.",
"page_reference": page_ref,
"user_answer": None,
"is_correct": 0,
"attempts": 0
})
return questions[:count]
# ============================================
# FLASHCARD GENERATION
# ============================================
def generate_flashcards_from_content(text: str, key_phrases: List[str], count: int = 10) -> List[Dict]:
"""Generate high-quality flashcards from key concepts"""
flashcards = []
sentences = extract_sentences(text, 50, 300)
for i, phrase in enumerate(key_phrases[:count]):
# Find context sentence
context = ""
for sentence in sentences:
if phrase.lower() in sentence.lower():
context = sentence[:250]
break
if not context and i < len(sentences):
context = sentences[i][:250]
if not context:
context = text[:250]
flashcards.append({
"id": generate_id("fc"),
"front": f"What is \"{phrase}\" and why is it important in this material?",
"back": f"{context}\n\nThis concept is significant because it helps explain key ideas in the material. Understanding {phrase} is essential for mastering the topic. Review the surrounding text for additional details and examples.",
"category": "Key Concept",
"difficulty": "medium",
"mastery_level": 0,
"last_reviewed": None,
"next_review": None
})
return flashcards
# ============================================
# API ENDPOINTS - SESSIONS
# ============================================
@app.get("/")
async def serve_frontend():
"""Serve the main frontend page"""
try:
with open("index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
except FileNotFoundError:
return HTMLResponse(content="""
<!DOCTYPE html>
<html>
<head>
<title>StudyFlow AI</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; }
h1 { color: #4f46e5; }
pre { background: #f3f4f6; padding: 15px; border-radius: 8px; overflow-x: auto; }
.endpoint { margin: 20px 0; padding: 10px; background: #f9fafb; border-radius: 8px; }
code { background: #e5e7eb; padding: 2px 6px; border-radius: 4px; }
</style>
</head>
<body>
<h1>πŸš€ StudyFlow AI Backend Running</h1>
<p>API is operational. Please ensure index.html is in the same directory.</p>
<h2>Available Endpoints:</h2>
<div class="endpoint">
<strong>POST</strong> <code>/api/process-content</code> - Create a study session<br>
<strong>GET</strong> <code>/api/session/{id}</code> - Get session details<br>
<strong>GET</strong> <code>/api/user/sessions</code> - List all sessions<br>
<strong>POST</strong> <code>/api/submit-answer</code> - Submit an answer<br>
<strong>DELETE</strong> <code>/api/session/{id}</code> - Delete a session<br>
<strong>GET</strong> <code>/health</code> - Health check<br>
<strong>GET</strong> <code>/docs</code> - Interactive API documentation
</div>
<h2>Quick Start:</h2>
<pre>
curl -X POST http://localhost:7860/api/process-content \\
-F "content_type=text" \\
-F "difficulty=medium" \\
-F "title=My Study Session" \\
-F "content=Your study material here..." \\
-F "num_questions=10"
</pre>
<p>πŸ“š Interactive docs: <a href="/docs">/docs</a></p>
<p>❀️ Health check: <a href="/health">/health</a></p>
</body>
</html>
""")
@app.get("/health")
async def health_check():
"""Comprehensive health check endpoint"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get database stats
cursor.execute("SELECT COUNT(*) FROM sessions")
session_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM questions")
question_count = cursor.fetchone()[0]
cursor.execute("SELECT total_questions_answered, total_correct_answers FROM user_profile WHERE id = 1")
profile = cursor.fetchone()
conn.close()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "5.0.0",
"database": {
"path": DB_PATH,
"sessions": session_count,
"questions": question_count
},
"stats": {
"questions_answered": profile[0] if profile else 0,
"correct_answers": profile[1] if profile else 0,
"accuracy": round((profile[1] / profile[0] * 100) if profile and profile[0] > 0 else 0, 1)
},
"features": [
"PDF text extraction",
"YouTube transcript extraction",
"15 question types",
"Advanced NLP analysis",
"Flashcards with spaced repetition",
"Notes with tagging",
"Study streaks",
"Performance analytics"
]
}
# ============================================
# MAIN PROCESSING ENDPOINT
# ============================================
@app.post("/api/process-content")
async def process_content(
content_type: str = Form(...),
difficulty: str = Form(...),
title: str = Form(...),
content: str = Form(None),
file: UploadFile = File(None),
youtube_url: str = Form(None),
selected_pages: str = Form(None),
num_questions: int = Form(15),
generate_flashcards_flag: bool = Form(True),
background_tasks: BackgroundTasks = None
):
"""Process uploaded content and generate intelligent study materials"""
print(f"\n{'='*60}")
print(f"πŸ“ NEW SESSION REQUEST")
print(f"{'='*60}")
print(f"Title: {title}")
print(f"Type: {content_type}")
print(f"Difficulty: {difficulty}")
print(f"Questions: {num_questions}")
print(f"Generate Flashcards: {generate_flashcards_flag}")
print(f"{'='*60}\n")
session_id = generate_id("session")
text_content = ""
pages_dict = {}
total_pages = 0
selected_pages_list = []
try:
# ========== PROCESS BASED ON CONTENT TYPE ==========
if content_type == "text":
if not content:
raise HTTPException(status_code=400, detail="No text content provided")
text_content = clean_text(content, 100000)
print(f"πŸ“„ Text content length: {len(text_content)} chars")
print(f"πŸ“Š Estimated reading time: {calculate_reading_time(text_content)} minutes")
elif content_type == "pdf":
if not file:
raise HTTPException(status_code=400, detail="No PDF file provided")
print(f"πŸ“„ Processing PDF: {file.filename}")
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
content_bytes = await file.read()
tmp.write(content_bytes)
tmp_path = tmp.name
# Extract text from PDF
text_content, pages_dict = extract_pdf_text(tmp_path)
os.unlink(tmp_path)
total_pages = len(pages_dict)
# Parse selected pages
if selected_pages:
try:
selected_pages_list = json.loads(selected_pages)
except:
selected_pages_list = []
# If no pages selected, select all pages with content
if not selected_pages_list:
selected_pages_list = [p for p in pages_dict if "No extractable" not in pages_dict[p]]
print(f"πŸ“„ PDF has {total_pages} total pages, selected {len(selected_pages_list)} pages")
print(f"πŸ“Š Extracted {len(text_content)} characters of text")
elif content_type == "youtube":
if not youtube_url:
raise HTTPException(status_code=400, detail="No YouTube URL provided")
print(f"πŸ“„ Processing YouTube URL: {youtube_url}")
text_content = extract_youtube_text(youtube_url)
if not text_content:
text_content = f"YouTube video content from: {youtube_url}\n\nNote: Transcript extraction may not be available for all videos. For best results, use text or PDF input."
print(f"πŸ“„ YouTube content length: {len(text_content)} chars")
else:
raise HTTPException(status_code=400, detail=f"Invalid content type: {content_type}")
# ========== VALIDATE CONTENT ==========
if len(text_content) < 100:
raise HTTPException(
status_code=400,
detail=f"Content too short ({len(text_content)} chars). Minimum 100 characters required for quality questions."
)
# ========== GENERATE QUESTIONS ==========
print(f"πŸ€– Generating {num_questions} {difficulty} questions...")
questions = generate_questions_from_content(
text_content,
difficulty,
min(num_questions, 100),
session_id
)
print(f"βœ… Generated {len(questions)} questions")
# ========== GENERATE FLASHCARDS ==========
flashcards = []
if generate_flashcards_flag:
key_phrases = extract_key_phrases(text_content, 20)
flashcards = generate_flashcards_from_content(text_content, key_phrases, min(10, num_questions // 2))
print(f"βœ… Generated {len(flashcards)} flashcards")
# ========== SAVE TO DATABASE ==========
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Save session
content_hash = hash_content(text_content) if text_content else None
cursor.execute("""
INSERT INTO sessions (id, title, content_type, difficulty, content_hash, selected_pages, total_pages, study_time_total)
VALUES (?, ?, ?, ?, ?, ?, ?, 0)
""", (
session_id, title, content_type, difficulty, content_hash,
json.dumps(selected_pages_list) if selected_pages_list else None,
total_pages
))
# Save pages
for page_num, page_content in pages_dict.items():
if page_num in selected_pages_list or not selected_pages_list:
word_count = len(page_content.split()) if page_content else 0
cursor.execute("""
INSERT OR REPLACE INTO pages (id, session_id, page_number, content, word_count)
VALUES (?, ?, ?, ?, ?)
""", (generate_id("page"), session_id, page_num, page_content[:10000], word_count))
# Save questions
for q in questions:
cursor.execute("""
INSERT INTO questions (
id, session_id, question_text, question_type, options,
correct_answer, difficulty, explanation, page_reference, attempts
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0)
""", (
q["id"], session_id, q["question_text"], q["question_type"],
q.get("options"), q["correct_answer"], q["difficulty"],
q["explanation"], q.get("page_reference")
))
# Save flashcards
for fc in flashcards:
cursor.execute("""
INSERT INTO flashcards (id, session_id, front, back, category, difficulty, mastery_level)
VALUES (?, ?, ?, ?, ?, ?, 0)
""", (fc["id"], session_id, fc["front"], fc["back"], fc["category"], fc["difficulty"]))
# Update user profile
cursor.execute("""
UPDATE user_profile
SET total_sessions_created = total_sessions_created + 1,
last_active_date = DATE('now'),
updated_at = CURRENT_TIMESTAMP
WHERE id = 1
""")
# Update streak
cursor.execute("SELECT last_active_date, streak_days, longest_streak FROM user_profile WHERE id = 1")
profile = cursor.fetchone()
if profile:
last_active = profile[0]
current_streak = profile[1] or 0
longest_streak = profile[2] or 0
today = datetime.now().date()
if last_active:
last_date = datetime.strptime(last_active, "%Y-%m-%d").date()
if last_date == today - timedelta(days=1):
current_streak += 1
elif last_date < today - timedelta(days=1):
current_streak = 1
else:
current_streak = 1
longest_streak = max(longest_streak, current_streak)
cursor.execute("""
UPDATE user_profile
SET streak_days = ?, longest_streak = ?
WHERE id = 1
""", (current_streak, longest_streak))
conn.commit()
conn.close()
print(f"βœ… Session created successfully: {session_id}")
print(f"{'='*60}\n")
return {
"success": True,
"session_id": session_id,
"question_count": len(questions),
"flashcard_count": len(flashcards),
"total_pages": total_pages,
"selected_pages": selected_pages_list,
"content_length": len(text_content),
"reading_time_minutes": calculate_reading_time(text_content)
}
except HTTPException:
raise
except Exception as e:
print(f"❌ Error in process_content: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# ============================================
# SESSION RETRIEVAL ENDPOINTS
# ============================================
@app.get("/api/session/{session_id}")
async def get_session(session_id: str):
"""Get complete session data including questions, flashcards, and pages"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get session info
cursor.execute("SELECT * FROM sessions WHERE id = ?", (session_id,))
session = cursor.fetchone()
if not session:
conn.close()
raise HTTPException(status_code=404, detail="Session not found")
# Update last accessed
cursor.execute("UPDATE sessions SET last_accessed = CURRENT_TIMESTAMP WHERE id = ?", (session_id,))
# Get questions
cursor.execute("SELECT * FROM questions WHERE session_id = ? ORDER BY created_at", (session_id,))
questions = [dict(row) for row in cursor.fetchall()]
# Parse JSON options for multiple choice questions
for q in questions:
if q.get("options"):
try:
q["options"] = json.loads(q["options"])
except:
q["options"] = []
# Get flashcards
cursor.execute("SELECT * FROM flashcards WHERE session_id = ?", (session_id,))
flashcards = [dict(row) for row in cursor.fetchall()]
# Get pages
cursor.execute("SELECT * FROM pages WHERE session_id = ? ORDER BY page_number", (session_id,))
pages = [dict(row) for row in cursor.fetchall()]
# Get notes
cursor.execute("SELECT * FROM notes WHERE session_id = ? ORDER BY is_pinned DESC, created_at DESC", (session_id,))
notes = [dict(row) for row in cursor.fetchall()]
# Get highlights
cursor.execute("SELECT * FROM highlights WHERE session_id = ? ORDER BY created_at DESC", (session_id,))
highlights = [dict(row) for row in cursor.fetchall()]
# Calculate performance metrics
total_questions = len(questions)
correct_answers = sum(1 for q in questions if q.get("is_correct") == 1)
answered = len([q for q in questions if q.get("user_answer")])
accuracy = round((correct_answers / total_questions * 100) if total_questions > 0 else 0, 1)
completion_rate = round((answered / total_questions * 100) if total_questions > 0 else 0, 1)
conn.close()
return {
"session": dict(session),
"questions": questions,
"flashcards": flashcards,
"pages": pages,
"notes": notes,
"highlights": highlights,
"performance": {
"total_questions": total_questions,
"correct_answers": correct_answers,
"accuracy": accuracy,
"answered": answered,
"completion_rate": completion_rate
}
}
@app.get("/api/user/sessions")
async def get_user_sessions(
limit: int = 50,
offset: int = 0,
archived: bool = False
):
"""Get all user sessions with pagination and filtering"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
SELECT s.*,
(SELECT COUNT(*) FROM questions WHERE session_id = s.id) as question_count,
(SELECT SUM(is_correct) FROM questions WHERE session_id = s.id) as correct_count,
(SELECT COUNT(*) FROM notes WHERE session_id = s.id) as note_count
FROM sessions s
WHERE s.is_archived = ?
ORDER BY s.last_accessed DESC
LIMIT ? OFFSET ?
"""
cursor.execute(query, (1 if archived else 0, limit, offset))
sessions = [dict(row) for row in cursor.fetchall()]
for session in sessions:
total = session.get("question_count", 0)
correct = session.get("correct_count", 0) or 0
session["accuracy"] = round((correct / total * 100) if total > 0 else 0, 1)
session["completion"] = round(len([q for q in session.get("questions", []) if q.get("user_answer")]) / total * 100 if total > 0 else 0, 1)
# Get total count
cursor.execute("SELECT COUNT(*) FROM sessions WHERE is_archived = ?", (1 if archived else 0,))
total_count = cursor.fetchone()[0]
conn.close()
return {
"sessions": sessions,
"total": total_count,
"limit": limit,
"offset": offset
}
# ============================================
# ANSWER SUBMISSION AND EVALUATION
# ============================================
@app.post("/api/submit-answer")
async def submit_answer(
session_id: str = Form(...),
question_id: str = Form(...),
user_answer: str = Form(...),
time_spent: int = Form(0)
):
"""Submit and evaluate an answer with intelligent scoring"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get question details
cursor.execute("""
SELECT correct_answer, question_type, explanation, difficulty, attempts
FROM questions
WHERE id = ? AND session_id = ?
""", (question_id, session_id))
result = cursor.fetchone()
if not result:
conn.close()
return {
"is_correct": True,
"correct_answer": "",
"feedback": "Answer recorded!",
"points_earned": 0
}
correct_answer = result[0]
question_type = result[1]
explanation = result[2] if len(result) > 2 else ""
difficulty = result[3] if len(result) > 3 else "medium"
attempts = (result[4] or 0) + 1
# Calculate points based on difficulty and attempts
points_map = {"easy": 10, "medium": 20, "hard": 35}
base_points = points_map.get(difficulty, 15)
# Smart evaluation based on question type
is_correct = 0
user_clean = user_answer.strip().lower()
correct_clean = correct_answer.strip().lower()
if question_type == "multiple_choice":
is_correct = 1 if user_clean == correct_clean else 0
elif question_type == "true_false":
is_correct = 1 if user_clean == correct_clean else 0
else: # short_answer, fill_blank
# Exact match
if user_clean == correct_clean:
is_correct = 1
# Partial match for longer answers
elif len(user_clean) > 40 and (correct_clean in user_clean or user_clean in correct_clean):
is_correct = 1
else:
# Keyword matching
keywords = re.findall(r'\b[a-z]{4,}\b', correct_clean)
keyword_matches = sum(1 for kw in keywords if kw in user_clean)
is_correct = 1 if keyword_matches >= max(1, len(keywords) * 0.3) else 0
# Calculate points earned (reduce for multiple attempts)
points_earned = base_points if is_correct else 0
if attempts > 1:
points_earned = int(points_earned * (0.8 ** (attempts - 1)))
# Update database
cursor.execute("""
UPDATE questions
SET user_answer = ?, is_correct = ?, time_spent = ?, attempts = ?, last_attempt = CURRENT_TIMESTAMP
WHERE id = ? AND session_id = ?
""", (user_answer, is_correct, time_spent, attempts, question_id, session_id))
# Update user profile with XP
cursor.execute("""
UPDATE user_profile
SET total_questions_answered = total_questions_answered + 1,
total_correct_answers = total_correct_answers + ?,
xp_points = xp_points + ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = 1
""", (is_correct, points_earned))
# Check for level up
cursor.execute("SELECT xp_points, level FROM user_profile WHERE id = 1")
profile = cursor.fetchone()
xp = profile[0] if profile else 0
current_level = profile[1] if profile else 1
new_level = max(1, int(xp ** 0.4))
level_up = new_level > current_level
if level_up:
cursor.execute("UPDATE user_profile SET level = ? WHERE id = 1", (new_level,))
# Record activity
cursor.execute("""
INSERT INTO study_activity (session_id, activity_type, duration)
VALUES (?, 'answer', ?)
""", (session_id, time_spent))
conn.commit()
conn.close()
return {
"is_correct": bool(is_correct),
"correct_answer": correct_answer,
"feedback": "Correct! πŸŽ‰ Great job!" if is_correct else f"Not quite right. The correct answer is: {correct_answer[:300]}",
"explanation": explanation,
"points_earned": points_earned,
"level_up": level_up,
"new_level": new_level if level_up else None
}
# ============================================
# SESSION MANAGEMENT ENDPOINTS
# ============================================
@app.delete("/api/session/{session_id}")
async def delete_session(session_id: str):
"""Delete a session and all associated data"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id FROM sessions WHERE id = ?", (session_id,))
if not cursor.fetchone():
conn.close()
raise HTTPException(status_code=404, detail="Session not found")
cursor.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
conn.commit()
conn.close()
return {"success": True, "message": "Session deleted successfully"}
@app.post("/api/session/{session_id}/archive")
async def archive_session(session_id: str):
"""Archive a session"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE sessions SET is_archived = 1 WHERE id = ?", (session_id,))
conn.commit()
conn.close()
return {"success": True, "message": "Session archived"}
@app.post("/api/session/{session_id}/restore")
async def restore_session(session_id: str):
"""Restore an archived session"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE sessions SET is_archived = 0 WHERE id = ?", (session_id,))
conn.commit()
conn.close()
return {"success": True, "message": "Session restored"}
# ============================================
# NOTES MANAGEMENT ENDPOINTS
# ============================================
@app.post("/api/save-note")
async def save_note(
session_id: str = Form(...),
title: str = Form(...),
content: str = Form(...),
tags: str = Form(None),
color: str = Form(None),
note_id: str = Form(None)
):
"""Save or update a note"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if note_id:
# Update existing note
cursor.execute("""
UPDATE notes
SET title = ?, content = ?, tags = ?, color = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND session_id = ?
""", (title, content, tags, color, note_id, session_id))
else:
# Create new note
note_id = generate_id("note")
cursor.execute("""
INSERT INTO notes (id, session_id, title, content, tags, color)
VALUES (?, ?, ?, ?, ?, ?)
""", (note_id, session_id, title, content, tags, color))
conn.commit()
conn.close()
return {"success": True, "note_id": note_id}
@app.delete("/api/note/{note_id}")
async def delete_note(note_id: str):
"""Delete a note"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM notes WHERE id = ?", (note_id,))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/note/{note_id}/pin")
async def pin_note(note_id: str):
"""Pin or unpin a note"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE notes SET is_pinned = NOT is_pinned WHERE id = ?", (note_id,))
conn.commit()
conn.close()
return {"success": True}
# ============================================
# HIGHLIGHTS MANAGEMENT ENDPOINTS
# ============================================
@app.post("/api/highlight")
async def create_highlight(
session_id: str = Form(...),
text: str = Form(...),
context: str = Form(None),
color: str = Form("#fef08a"),
page_number: int = Form(None)
):
"""Create a text highlight"""
highlight_id = generate_id("hl")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO highlights (id, session_id, text, context, color, page_number)
VALUES (?, ?, ?, ?, ?, ?)
""", (highlight_id, session_id, text, context, color, page_number))
conn.commit()
conn.close()
return {"success": True, "highlight_id": highlight_id}
@app.delete("/api/highlight/{highlight_id}")
async def delete_highlight(highlight_id: str):
"""Delete a highlight"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM highlights WHERE id = ?", (highlight_id,))
conn.commit()
conn.close()
return {"success": True}
# ============================================
# FLASHCARD ENDPOINTS
# ============================================
@app.post("/api/flashcard/review")
async def review_flashcard(
flashcard_id: str = Form(...),
quality: int = Form(...) # 0-5 (0=again, 3=hard, 4=good, 5=easy)
):
"""Review a flashcard with spaced repetition"""
# SM-2 algorithm for spaced repetition
quality_map = {0: 0, 1: 0, 2: 0, 3: 2, 4: 3, 5: 4}
ease_factor_map = {0: 1.3, 1: 1.3, 2: 1.3, 3: 1.8, 4: 2.2, 5: 2.5}
new_ease = ease_factor_map.get(quality, 1.8)
new_interval = max(1, int(quality_map.get(quality, 1)))
if quality >= 4:
new_mastery = min(100, quality * 20)
else:
new_mastery = max(0, quality * 10)
next_review = datetime.now() + timedelta(days=new_interval)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE flashcards
SET mastery_level = ?, last_reviewed = CURRENT_TIMESTAMP, next_review = ?
WHERE id = ?
""", (new_mastery, next_review, flashcard_id))
conn.commit()
conn.close()
return {"success": True, "next_review": next_review.isoformat()}
# ============================================
# ANALYTICS AND PROFILE ENDPOINTS
# ============================================
@app.get("/api/user/profile")
async def get_user_profile():
"""Get complete user profile with analytics"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM user_profile WHERE id = 1")
profile = dict(cursor.fetchone() or {})
# Get weekly activity
cursor.execute("""
SELECT date, COUNT(*) as activity_count, SUM(duration) as total_duration
FROM study_activity
WHERE date >= DATE('now', '-7 days')
GROUP BY date
ORDER BY date
""")
weekly_activity = [dict(row) for row in cursor.fetchall()]
# Get performance by difficulty
cursor.execute("""
SELECT difficulty,
COUNT(*) as total,
SUM(is_correct) as correct
FROM questions
WHERE user_answer IS NOT NULL
GROUP BY difficulty
""")
performance_by_difficulty = [dict(row) for row in cursor.fetchall()]
# Get daily streaks
cursor.execute("""
SELECT DISTINCT date
FROM study_activity
WHERE date >= DATE('now', '-30 days')
ORDER BY date
""")
active_dates = [row[0] for row in cursor.fetchall()]
total_questions = profile.get("total_questions_answered", 0)
correct_answers = profile.get("total_correct_answers", 0)
accuracy = round((correct_answers / total_questions * 100) if total_questions > 0 else 0, 1)
conn.close()
# Calculate next level XP
current_level = profile.get("level", 1)
current_xp = profile.get("xp_points", 0)
xp_for_next = int((current_level + 1) ** 2.5)
xp_for_current = int(current_level ** 2.5)
xp_progress = current_xp - xp_for_current
xp_needed = xp_for_next - xp_for_current
return {
"profile": profile,
"accuracy": accuracy,
"weekly_activity": weekly_activity,
"performance_by_difficulty": performance_by_difficulty,
"active_dates": active_dates,
"level_progress": {
"current_level": current_level,
"current_xp": current_xp,
"xp_needed_for_next": xp_needed,
"xp_progress_percent": min(100, int((xp_progress / xp_needed) * 100)) if xp_needed > 0 else 0
},
"badges": {
"has_streak_7": profile.get("longest_streak", 0) >= 7,
"has_streak_30": profile.get("longest_streak", 0) >= 30,
"has_100_questions": total_questions >= 100,
"has_1000_questions": total_questions >= 1000,
"has_90_percent_accuracy": accuracy >= 90
}
}
@app.post("/api/update-study-time")
async def update_study_time(
session_id: str = Form(...),
time_spent: int = Form(0)
):
"""Update total study time for a session"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE sessions
SET study_time_total = study_time_total + ?, last_accessed = CURRENT_TIMESTAMP
WHERE id = ?
""", (time_spent, session_id))
cursor.execute("""
UPDATE user_profile
SET total_study_time = total_study_time + ?, updated_at = CURRENT_TIMESTAMP
WHERE id = 1
""", (time_spent,))
cursor.execute("""
INSERT INTO study_activity (session_id, activity_type, duration)
VALUES (?, 'study', ?)
""", (session_id, time_spent))
conn.commit()
conn.close()
return {"success": True}
# ============================================
# MAIN ENTRY POINT
# ============================================
if __name__ == "__main__":
import uvicorn
print("\n" + "=" * 80)
print("πŸš€ StudyFlow AI Backend - COMPLETE PRODUCTION VERSION 5.0")
print("=" * 80)
print(f"πŸ“ Database: {DB_PATH}")
print(f"πŸ€– AI Mode: Advanced Local NLP (15 question types)")
print(f"πŸ“Š Features: PDF extraction, YouTube transcripts, Smart analytics")
print(f"🎯 Difficulty Levels: Easy, Medium, Hard")
print(f"πŸ“ Question Types: Definition, Fact, True/False, Fill Blank, Concept,")
print(f" Relationship, Multiple Choice, Cause/Effect, Analysis,")
print(f" Evaluation, Application, Synthesis, Comparison")
print("=" * 80)
print("🌐 Server: http://0.0.0.0:7860")
print("πŸ“– API Docs: http://0.0.0.0:7860/docs")
print("πŸ“Š Redoc: http://0.0.0.0:7860/redoc")
print("=" * 80)
print("πŸ’‘ Tip: Create a session with your study material and the AI will")
print(" generate intelligent questions based on your actual content!")
print("=" * 80 + "\n")
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=True
)