Spaces:
Running
Running
| from flask import Blueprint, render_template, request, jsonify, current_app, url_for | |
| from flask_login import login_required, current_user | |
| from utils import get_db_connection | |
| import requests | |
| import time | |
| import os | |
| import json | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from bs4 import BeautifulSoup | |
| import math | |
| import imgkit | |
| from gemini_classifier import classify_questions_with_gemini | |
| from nova_classifier import classify_questions_with_nova | |
| from json_processor import _process_json_and_generate_pdf | |
| from json_processor import _process_json_and_generate_pdf | |
| neetprep_bp = Blueprint('neetprep_bp', __name__) | |
| # ... (Constants and GraphQL queries remain the same) ... | |
| ENDPOINT_URL = "https://www.neetprep.com/graphql" | |
| USER_ID = "VXNlcjozNTY5Mzcw=" | |
| HEADERS = { | |
| 'accept': '*/*', | |
| 'content-type': 'application/json', | |
| 'origin': 'https://www.neetprep.com', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', | |
| } | |
| # --- Queries --- | |
| query_template_step1 = 'query GetAttempts {{ testAttempts( limit: {limit}, offset: {offset}, where: {{ userId: "{userId}" }} ) {{ id completed }} }}' | |
| query_template_step2 = 'query GetIncorrectIds {{ incorrectQuestions( testAttemptId: "{attemptId}", first: 200 ) {{ id }} }}' | |
| query_template_step3 = ''' | |
| query GetQuestionDetails {{ | |
| question(id: "{questionId}") {{ | |
| id | |
| question | |
| options | |
| correctOptionIndex | |
| level | |
| topics(first: 1) {{ | |
| edges {{ | |
| node {{ | |
| name | |
| subjects(first: 1) {{ | |
| edges {{ | |
| node {{ name }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| ''' | |
| def fetch_question_details(q_id): | |
| """Worker function to fetch details for a single question.""" | |
| result = run_hardcoded_query(query_template_step3, questionId=q_id) | |
| if result and 'data' in result and 'question' in result['data'] and result['data']['question']: | |
| return result['data']['question'] | |
| return None | |
| def index(): | |
| """Renders the main NeetPrep UI with topics and counts.""" | |
| conn = get_db_connection() | |
| selected_subject = request.args.get('subject', 'All') | |
| AVAILABLE_SUBJECTS = ["All", "Biology", "Chemistry", "Physics", "Mathematics"] | |
| neetprep_topic_counts = {} | |
| unclassified_count = 0 | |
| if current_user.neetprep_enabled: | |
| # Get NeetPrep question counts per topic, filtered by subject | |
| if selected_subject != 'All': | |
| neetprep_topics_query = 'SELECT topic, COUNT(*) as count FROM neetprep_questions WHERE subject = ? GROUP BY topic' | |
| neetprep_topics = conn.execute(neetprep_topics_query, (selected_subject,)).fetchall() | |
| else: | |
| neetprep_topics_query = 'SELECT topic, COUNT(*) as count FROM neetprep_questions GROUP BY topic' | |
| neetprep_topics = conn.execute(neetprep_topics_query).fetchall() | |
| neetprep_topic_counts = {row['topic']: row['count'] for row in neetprep_topics} | |
| unclassified_count = conn.execute("SELECT COUNT(*) as count FROM neetprep_questions WHERE topic = 'Unclassified'").fetchone()['count'] | |
| # Get classified question counts per chapter for the current user, filtered by subject | |
| query_params = [current_user.id] | |
| base_query = """ | |
| SELECT q.chapter, COUNT(*) as count | |
| FROM questions q | |
| JOIN sessions s ON q.session_id = s.id | |
| WHERE s.user_id = ? AND q.subject IS NOT NULL AND q.chapter IS NOT NULL | |
| """ | |
| if selected_subject != 'All': | |
| base_query += " AND q.subject = ? " | |
| query_params.append(selected_subject) | |
| base_query += " GROUP BY q.chapter" | |
| classified_chapters = conn.execute(base_query, query_params).fetchall() | |
| classified_chapter_counts = {row['chapter']: row['count'] for row in classified_chapters} | |
| # Combine the topics | |
| all_topics = set(neetprep_topic_counts.keys()) | set(classified_chapter_counts.keys()) | |
| combined_topics = [] | |
| for topic in sorted(list(all_topics)): | |
| combined_topics.append({ | |
| 'topic': topic, | |
| 'neetprep_count': neetprep_topic_counts.get(topic, 0), | |
| 'my_questions_count': classified_chapter_counts.get(topic, 0) | |
| }) | |
| conn.close() | |
| return render_template('neetprep.html', | |
| topics=combined_topics, | |
| unclassified_count=unclassified_count, | |
| available_subjects=AVAILABLE_SUBJECTS, | |
| selected_subject=selected_subject, | |
| neetprep_enabled=current_user.neetprep_enabled) | |
| def sync_neetprep_data(): | |
| data = request.json | |
| force_sync = data.get('force', False) | |
| print(f"NeetPrep sync started by user {current_user.id}. Force sync: {force_sync}") | |
| try: | |
| conn = get_db_connection() | |
| if force_sync: | |
| print("Force sync enabled. Clearing processed attempts and questions tables.") | |
| conn.execute('DELETE FROM neetprep_processed_attempts') | |
| conn.execute('DELETE FROM neetprep_questions') | |
| conn.commit() | |
| processed_attempts_rows = conn.execute('SELECT attempt_id FROM neetprep_processed_attempts').fetchall() | |
| processed_attempt_ids = {row['attempt_id'] for row in processed_attempts_rows} | |
| all_attempt_ids = [] | |
| offset = 0 | |
| limit = 100 | |
| print("Fetching test attempts from NeetPrep API...") | |
| while True: | |
| result = run_hardcoded_query(query_template_step1, limit=limit, offset=offset, userId=USER_ID) | |
| if not result or 'data' not in result or not result['data'].get('testAttempts'): | |
| break | |
| attempts = result['data']['testAttempts'] | |
| if not attempts: break | |
| all_attempt_ids.extend([a['id'] for a in attempts if a.get('completed')]) | |
| offset += limit | |
| time.sleep(0.2) | |
| new_attempts = [aid for aid in all_attempt_ids if aid not in processed_attempt_ids] | |
| print(f"Found {len(new_attempts)} new attempts to process.") | |
| if not new_attempts: | |
| conn.close() | |
| return jsonify({'status': 'No new test attempts to sync. Everything is up-to-date.'}), 200 | |
| incorrect_question_ids = set() | |
| print("Fetching incorrect question IDs for new attempts...") | |
| for attempt_id in new_attempts: | |
| result = run_hardcoded_query(query_template_step2, attemptId=attempt_id) | |
| if result and 'data' in result and result['data'].get('incorrectQuestions'): | |
| for q in result['data']['incorrectQuestions']: | |
| incorrect_question_ids.add(q['id']) | |
| time.sleep(0.2) | |
| existing_question_ids_rows = conn.execute('SELECT id FROM neetprep_questions').fetchall() | |
| existing_question_ids = {row['id'] for row in existing_question_ids_rows} | |
| new_question_ids = list(incorrect_question_ids - existing_question_ids) | |
| print(f"Found {len(new_question_ids)} new unique incorrect questions to fetch details for.") | |
| if not new_question_ids: | |
| for attempt_id in new_attempts: | |
| conn.execute('INSERT INTO neetprep_processed_attempts (attempt_id) VALUES (?)', (attempt_id,)) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'status': 'Sync complete. No new questions found, but attempts log updated.'}), 200 | |
| questions_to_insert = [] | |
| total_new = len(new_question_ids) | |
| completed = 0 | |
| print(f"Fetching details for {total_new} questions...") | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| future_to_qid = {executor.submit(fetch_question_details, qid): qid for qid in new_question_ids} | |
| for future in as_completed(future_to_qid): | |
| q_data = future.result() | |
| if q_data: | |
| topic_name = "Unclassified" | |
| try: | |
| topic_name = q_data['topics']['edges'][0]['node']['name'] | |
| except (IndexError, TypeError, KeyError): pass | |
| questions_to_insert.append((q_data.get('id'), q_data.get('question'), json.dumps(q_data.get('options', [])), q_data.get('correctOptionIndex'), q_data.get('level', 'N/A'), topic_name, "Unclassified")) | |
| completed += 1 | |
| percentage = int((completed / total_new) * 100) | |
| sys.stdout.write(f'\rSync Progress: {completed}/{total_new} ({percentage}%)') | |
| sys.stdout.flush() | |
| print("\nAll questions fetched.") | |
| if questions_to_insert: | |
| conn.executemany("INSERT INTO neetprep_questions (id, question_text, options, correct_answer_index, level, topic, subject) VALUES (?, ?, ?, ?, ?, ?, ?)", questions_to_insert) | |
| for attempt_id in new_attempts: | |
| conn.execute('INSERT INTO neetprep_processed_attempts (attempt_id) VALUES (?)', (attempt_id,)) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'status': f'Sync complete. Added {len(questions_to_insert)} new questions.'}), 200 | |
| except Exception as e: | |
| current_app.logger.error(f"Error during NeetPrep sync: {repr(e)}") | |
| if 'conn' in locals() and conn: | |
| conn.close() | |
| return jsonify({'error': f"A critical error occurred during sync: {repr(e)}"}), 500 | |
| def classify_unclassified_questions(): | |
| """Classifies all questions marked as 'Unclassified' in batches.""" | |
| print("Starting classification of 'Unclassified' questions.") | |
| conn = get_db_connection() | |
| unclassified_questions = conn.execute("SELECT id, question_text FROM neetprep_questions WHERE topic = 'Unclassified'").fetchall() | |
| total_to_classify = len(unclassified_questions) | |
| if total_to_classify == 0: | |
| conn.close() | |
| return jsonify({'status': 'No unclassified questions to process.'}) | |
| batch_size = 10 | |
| num_batches = math.ceil(total_to_classify / batch_size) | |
| total_classified_count = 0 | |
| print(f"Found {total_to_classify} questions. Processing in {num_batches} batches of {batch_size}.") | |
| for i in range(num_batches): | |
| batch_start_time = time.time() | |
| start_index = i * batch_size | |
| end_index = start_index + batch_size | |
| batch = unclassified_questions[start_index:end_index] | |
| question_texts = [q['question_text'] for q in batch] | |
| question_ids = [q['id'] for q in batch] | |
| print(f"\nProcessing Batch {i+1}/{num_batches}...") | |
| try: | |
| # Choose classifier based on user preference | |
| classifier_model = getattr(current_user, 'classifier_model', 'gemini') | |
| if classifier_model == 'nova': | |
| print("Classifying with Nova API...") | |
| classification_result = classify_questions_with_nova(question_texts, start_index=0) | |
| model_name = "Nova" | |
| else: | |
| print("Classifying with Gemini API...") | |
| classification_result = classify_questions_with_gemini(question_texts, start_index=0) | |
| model_name = "Gemini" | |
| if not classification_result or not classification_result.get('data'): | |
| print(f"Batch {i+1} failed: {model_name} API did not return valid data.") | |
| continue | |
| update_count_in_batch = 0 | |
| for item in classification_result.get('data', []): | |
| item_index = item.get('index') | |
| if item_index is not None and 1 <= item_index <= len(question_ids): | |
| # The item['index'] is 1-based, so we convert to 0-based | |
| matched_id = question_ids[item_index - 1] | |
| new_topic = item.get('chapter_title') | |
| if new_topic: | |
| conn.execute('UPDATE neetprep_questions SET topic = ? WHERE id = ?', (new_topic, matched_id)) | |
| update_count_in_batch += 1 | |
| conn.commit() | |
| total_classified_count += update_count_in_batch | |
| print(f"Batch {i+1} complete. Classified {update_count_in_batch} questions.") | |
| # Wait before the next batch | |
| if i < num_batches - 1: | |
| print("Waiting 6 seconds before next batch...") | |
| time.sleep(6) | |
| except Exception as e: | |
| print(f"\nAn error occurred during batch {i+1}: {repr(e)}") | |
| continue | |
| conn.close() | |
| print(f"\nClassification finished. In total, {total_classified_count} questions were updated.") | |
| return jsonify({'status': f'Classification complete. Updated {total_classified_count} of {total_to_classify} questions.'}) | |
| from rich.table import Table | |
| from rich.console import Console | |
| def generate_neetprep_pdf(): | |
| if request.is_json: | |
| data = request.json | |
| else: | |
| data = request.form | |
| pdf_type = data.get('type') | |
| topics_str = data.get('topics') | |
| topics = json.loads(topics_str) if topics_str and topics_str != '[]' else [] | |
| source_filter = data.get('source', 'all') # 'all', 'neetprep', or 'classified' | |
| conn = get_db_connection() | |
| all_questions = [] | |
| include_neetprep = source_filter in ['all', 'neetprep'] and current_user.neetprep_enabled | |
| include_classified = source_filter in ['all', 'classified'] | |
| # Fetch NeetPrep questions if enabled and filter allows | |
| if include_neetprep: | |
| if pdf_type == 'quiz' and topics: | |
| placeholders = ', '.join('?' for _ in topics) | |
| neetprep_questions_from_db = conn.execute(f"SELECT * FROM neetprep_questions WHERE topic IN ({placeholders})", topics).fetchall() | |
| for q in neetprep_questions_from_db: | |
| try: | |
| html_content = f"""<html><head><meta charset="utf-8"></head><body>{q['question_text']}</body></html>""" | |
| img_filename = f"neetprep_{q['id']}.jpg" | |
| img_path = os.path.join(current_app.config['TEMP_FOLDER'], img_filename) | |
| imgkit.from_string(html_content, img_path, options={'width': 800}) | |
| all_questions.append({ | |
| 'image_path': f"/tmp/{img_filename}", | |
| 'details': {'id': q['id'], 'options': json.loads(q['options']), 'correct_answer_index': q['correct_answer_index'], 'user_answer_index': None, 'source': 'neetprep', 'topic': q['topic'], 'subject': q['subject']} | |
| }) | |
| except Exception as e: | |
| current_app.logger.error(f"Failed to convert NeetPrep question {q['id']} to image: {e}") | |
| elif pdf_type == 'all': | |
| neetprep_questions_from_db = conn.execute("SELECT * FROM neetprep_questions").fetchall() | |
| for q in neetprep_questions_from_db: | |
| all_questions.append({"id": q['id'], "question_text": q['question_text'], "options": json.loads(q['options']), "correct_answer_index": q['correct_answer_index'], "user_answer_index": None, "status": "wrong", "source": "neetprep", "custom_fields": {"difficulty": q['level'], "topic": q['topic'], "subject": q['subject']}}) | |
| elif pdf_type == 'selected' and topics: | |
| placeholders = ', '.join('?' for _ in topics) | |
| neetprep_questions_from_db = conn.execute(f"SELECT * FROM neetprep_questions WHERE topic IN ({placeholders})", topics).fetchall() | |
| for q in neetprep_questions_from_db: | |
| all_questions.append({"id": q['id'], "question_text": q['question_text'], "options": json.loads(q['options']), "correct_answer_index": q['correct_answer_index'], "user_answer_index": None, "status": "wrong", "source": "neetprep", "custom_fields": {"difficulty": q['level'], "topic": q['topic'], "subject": q['subject']}}) | |
| # Fetch classified questions if filter allows | |
| if include_classified: | |
| if topics and pdf_type in ['quiz', 'selected']: | |
| placeholders = ', '.join('?' for _ in topics) | |
| classified_questions_from_db = conn.execute(f""" | |
| SELECT q.* FROM questions q JOIN sessions s ON q.session_id = s.id | |
| WHERE q.chapter IN ({placeholders}) AND s.user_id = ? | |
| """, (*topics, current_user.id)).fetchall() | |
| for q in classified_questions_from_db: | |
| image_info = conn.execute("SELECT processed_filename, note_filename FROM images WHERE id = ?", (q['image_id'],)).fetchone() | |
| if image_info and image_info['processed_filename']: | |
| if pdf_type == 'quiz': | |
| all_questions.append({ | |
| 'image_path': f"/processed/{image_info['processed_filename']}", | |
| 'details': {'id': q['id'], 'options': [], 'correct_answer_index': q['actual_solution'], 'user_answer_index': q['marked_solution'], 'source': 'classified', 'topic': q['chapter'], 'subject': q['subject'], 'note_filename': image_info['note_filename']} | |
| }) | |
| else: | |
| all_questions.append({"id": q['id'], "question_text": f"<img src=\"{os.path.join(current_app.config['PROCESSED_FOLDER'], image_info['processed_filename'])}\" />", "options": [], "correct_answer_index": q['actual_solution'], "user_answer_index": q['marked_solution'], "status": q['status'], "source": "classified", "custom_fields": {"subject": q['subject'], "chapter": q['chapter'], "question_number": q['question_number']}}) | |
| elif pdf_type == 'all': | |
| classified_questions_from_db = conn.execute(""" | |
| SELECT q.* FROM questions q JOIN sessions s ON q.session_id = s.id | |
| WHERE s.user_id = ? AND q.subject IS NOT NULL AND q.chapter IS NOT NULL | |
| """, (current_user.id,)).fetchall() | |
| for q in classified_questions_from_db: | |
| image_info = conn.execute("SELECT processed_filename FROM images WHERE id = ?", (q['image_id'],)).fetchone() | |
| if image_info and image_info['processed_filename']: | |
| all_questions.append({"id": q['id'], "question_text": f"<img src=\"{os.path.join(current_app.config['PROCESSED_FOLDER'], image_info['processed_filename'])}\" />", "options": [], "correct_answer_index": q['actual_solution'], "user_answer_index": q['marked_solution'], "status": q['status'], "source": "classified", "custom_fields": {"subject": q['subject'], "chapter": q['chapter'], "question_number": q['question_number']}}) | |
| conn.close() | |
| # Check if topics are required but not provided | |
| if pdf_type in ['quiz', 'selected'] and not topics: | |
| return jsonify({'error': 'No topics selected.'}), 400 | |
| if not all_questions: | |
| return jsonify({'error': 'No questions found for the selected criteria.'}), 404 | |
| if pdf_type == 'quiz': | |
| return render_template('quiz_v2.html', questions=all_questions) | |
| test_name = "All Incorrect Questions" | |
| if pdf_type == 'selected': | |
| test_name = f"Incorrect Questions - {', '.join(topics)}" | |
| final_json_output = { | |
| "version": "2.1", "test_name": test_name, | |
| "config": { "font_size": 22, "auto_generate_pdf": False, "layout": data.get('layout', {}) }, | |
| "metadata": { "source_book": "NeetPrep & Classified", "student_id": USER_ID, "tags": ", ".join(topics) }, | |
| "questions": all_questions, "view": True | |
| } | |
| try: | |
| result, status_code = _process_json_and_generate_pdf(final_json_output, current_user.id) | |
| if status_code != 200: | |
| return jsonify(result), status_code | |
| if result.get('success'): | |
| return jsonify({'success': True, 'pdf_url': result.get('view_url')}) | |
| else: | |
| return jsonify({'error': result.get('error', 'Failed to generate PDF via internal call.')}), 500 | |
| except Exception as e: | |
| current_app.logger.error(f"Failed to call _process_json_and_generate_pdf: {repr(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def edit_neetprep_questions(): | |
| """Renders the page for editing NeetPrep questions.""" | |
| conn = get_db_connection() | |
| topics = conn.execute('SELECT DISTINCT topic FROM neetprep_questions ORDER BY topic').fetchall() | |
| questions = conn.execute('SELECT id, question_text, topic, subject FROM neetprep_questions ORDER BY id').fetchall() | |
| questions_plain = [] | |
| for q in questions: | |
| q_dict = dict(q) | |
| soup = BeautifulSoup(q_dict['question_text'], 'html.parser') | |
| plain_text = soup.get_text(strip=True) | |
| q_dict['question_text_plain'] = (plain_text[:100] + '...') if len(plain_text) > 100 else plain_text | |
| questions_plain.append(q_dict) | |
| conn.close() | |
| return render_template('neetprep_edit.html', questions=questions_plain, topics=[t['topic'] for t in topics]) | |
| def update_neetprep_question(question_id): | |
| """Handles updating a question's metadata.""" | |
| # This route modifies global neetprep data. In a real multi-user app, | |
| # this should be restricted to admin users. For now, @login_required is a basic protection. | |
| data = request.json | |
| new_topic = data.get('topic') | |
| new_subject = data.get('subject') | |
| if not new_topic or not new_subject: | |
| return jsonify({'error': 'Topic and Subject cannot be empty.'}), 400 | |
| try: | |
| conn = get_db_connection() | |
| conn.execute( | |
| 'UPDATE neetprep_questions SET topic = ?, subject = ? WHERE id = ?', | |
| (new_topic, new_subject, question_id) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| current_app.logger.error(f"Error updating question {question_id}: {repr(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def get_neetprep_suggestions(question_id): | |
| """Get AI classification suggestions for a NeetPrep question using NVIDIA NIM.""" | |
| import os | |
| from nvidia_prompts import BIOLOGY_PROMPT_TEMPLATE, CHEMISTRY_PROMPT_TEMPLATE, PHYSICS_PROMPT_TEMPLATE, MATHEMATICS_PROMPT_TEMPLATE, GENERAL_CLASSIFICATION_PROMPT | |
| data = request.json or {} | |
| subject = data.get('subject') # Can be None for auto-detection | |
| conn = get_db_connection() | |
| question = conn.execute('SELECT question_text FROM neetprep_questions WHERE id = ?', (question_id,)).fetchone() | |
| conn.close() | |
| if not question: | |
| return jsonify({'success': True, 'suggestions': ['Unclassified'], 'subject': 'Biology', 'warning': 'Question not found'}) | |
| # Strip HTML from question text for classification | |
| question_text = question['question_text'] or '' | |
| if not question_text: | |
| return jsonify({'success': True, 'suggestions': ['Unclassified'], 'subject': 'Biology', 'warning': 'No question text'}) | |
| soup = BeautifulSoup(question_text, 'html.parser') | |
| plain_text = soup.get_text(strip=True) | |
| if not plain_text: | |
| return jsonify({'success': True, 'suggestions': ['Unclassified'], 'subject': 'Biology', 'warning': 'Empty question text'}) | |
| NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") | |
| if not NVIDIA_API_KEY: | |
| return jsonify({'error': 'NVIDIA_API_KEY not set', 'suggestions': ['Unclassified'], 'subject': 'Biology'}), 200 | |
| # Get the appropriate prompt template | |
| def get_nvidia_prompt(subj, input_questions): | |
| if not subj or subj.lower() == 'auto': | |
| return GENERAL_CLASSIFICATION_PROMPT.format(input_questions=input_questions) | |
| if subj.lower() == 'biology': return BIOLOGY_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'chemistry': return CHEMISTRY_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'physics': return PHYSICS_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'mathematics': return MATHEMATICS_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| return GENERAL_CLASSIFICATION_PROMPT.format(input_questions=input_questions) | |
| prompt_content = get_nvidia_prompt(subject, f"1. {plain_text[:500]}") # Limit text length | |
| try: | |
| res = requests.post( | |
| 'https://integrate.api.nvidia.com/v1/chat/completions', | |
| headers={'Authorization': f'Bearer {NVIDIA_API_KEY}', 'Accept': 'application/json', 'Content-Type': 'application/json'}, | |
| json={"model": "nvidia/nemotron-3-nano-30b-a3b", "messages": [{"content": prompt_content, "role": "user"}], "temperature": 0.2, "top_p": 1, "max_tokens": 1024, "stream": False}, | |
| timeout=30 | |
| ) | |
| res.raise_for_status() | |
| content = res.json()['choices'][0]['message']['content'] | |
| # Parse JSON from response | |
| if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() | |
| elif "```" in content: content = content.split("```")[1].split("```")[0].strip() | |
| result_data = json.loads(content) | |
| suggestions = [] | |
| detected_subject = subject or 'Biology' # Default | |
| other_subjects = [] | |
| if result_data.get('data'): | |
| item = result_data['data'][0] | |
| # Extract detected subject from AI response | |
| detected_subject = item.get('subject', subject) or 'Biology' | |
| # Extract other possible subjects | |
| other_subjects = item.get('other_possible_subjects', []) | |
| if isinstance(other_subjects, str): | |
| other_subjects = [other_subjects] | |
| primary = item.get('chapter_title') | |
| if primary and primary != 'Unclassified': | |
| suggestions.append(primary) | |
| others = item.get('other_possible_chapters', []) | |
| if isinstance(others, list): | |
| suggestions.extend([c for c in others if c and c != 'Unclassified']) | |
| # Always return at least one suggestion | |
| if not suggestions: | |
| suggestions = ['Unclassified'] | |
| return jsonify({ | |
| 'success': True, | |
| 'suggestions': suggestions[:5], | |
| 'subject': detected_subject, | |
| 'other_possible_subjects': other_subjects | |
| }) | |
| except Exception as e: | |
| current_app.logger.error(f"Error getting suggestions for {question_id}: {repr(e)}") | |
| return jsonify({ | |
| 'success': True, | |
| 'suggestions': ['Unclassified'], | |
| 'subject': subject or 'Biology', | |
| 'warning': str(e) | |
| }) | |
| def get_neetprep_suggestions_batch(): | |
| """Batch endpoint for getting topic suggestions for multiple NeetPrep questions at once. | |
| Requires a subject to be specified. Processes up to 8 questions in a single API call.""" | |
| import os | |
| from nvidia_prompts import BIOLOGY_PROMPT_TEMPLATE, CHEMISTRY_PROMPT_TEMPLATE, PHYSICS_PROMPT_TEMPLATE, MATHEMATICS_PROMPT_TEMPLATE | |
| data = request.json | |
| question_ids = data.get('question_ids', []) | |
| subject = data.get('subject') | |
| current_app.logger.info(f"[BATCH-NEETPREP] Received request for {len(question_ids)} questions, subject={subject}") | |
| if not question_ids: | |
| return jsonify({'error': 'No question_ids provided'}), 400 | |
| if not subject or subject.lower() == 'auto': | |
| return jsonify({'error': 'Subject must be specified for batch requests'}), 400 | |
| if len(question_ids) > 8: | |
| return jsonify({'error': 'Maximum 8 questions per batch'}), 400 | |
| NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") | |
| if not NVIDIA_API_KEY: | |
| return jsonify({'error': 'NVIDIA_API_KEY not set'}), 500 | |
| def get_nvidia_prompt(subj, input_questions): | |
| if subj.lower() == 'biology': return BIOLOGY_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'chemistry': return CHEMISTRY_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'physics': return PHYSICS_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| if subj.lower() == 'mathematics': return MATHEMATICS_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| return BIOLOGY_PROMPT_TEMPLATE.format(input_questions=input_questions) | |
| try: | |
| conn = get_db_connection() | |
| questions_data = [] | |
| for idx, qid in enumerate(question_ids): | |
| question = conn.execute('SELECT question_text FROM neetprep_questions WHERE id = ?', (qid,)).fetchone() | |
| if question and question['question_text']: | |
| soup = BeautifulSoup(question['question_text'], 'html.parser') | |
| plain_text = soup.get_text(strip=True) | |
| if plain_text: | |
| questions_data.append({'id': qid, 'index': idx + 1, 'text': plain_text[:500]}) | |
| current_app.logger.debug(f"[BATCH-NEETPREP] Question {qid}: got text ({len(plain_text)} chars)") | |
| else: | |
| current_app.logger.warning(f"[BATCH-NEETPREP] Question {qid}: empty plain text after HTML strip") | |
| else: | |
| current_app.logger.warning(f"[BATCH-NEETPREP] Question {qid}: not found or no question_text") | |
| conn.close() | |
| current_app.logger.info(f"[BATCH-NEETPREP] Got question text for {len(questions_data)}/{len(question_ids)} questions") | |
| if not questions_data: | |
| return jsonify({'error': 'Could not obtain question text for any questions'}), 400 | |
| # Build multi-question prompt | |
| input_questions = "\n".join(f"{q['index']}. {q['text']}" for q in questions_data) | |
| prompt_content = get_nvidia_prompt(subject, input_questions) | |
| current_app.logger.info(f"[BATCH-NEETPREP] Sending {len(questions_data)} questions to NVIDIA API") | |
| current_app.logger.debug(f"[BATCH-NEETPREP] Prompt preview: {input_questions[:500]}...") | |
| # Make single API call for all questions | |
| res = requests.post( | |
| 'https://integrate.api.nvidia.com/v1/chat/completions', | |
| headers={'Authorization': f'Bearer {NVIDIA_API_KEY}', 'Accept': 'application/json', 'Content-Type': 'application/json'}, | |
| json={"model": "nvidia/nemotron-3-nano-30b-a3b", "messages": [{"content": prompt_content, "role": "user"}], "temperature": 0.2, "top_p": 1, "max_tokens": 4096, "stream": False}, | |
| timeout=60 | |
| ) | |
| res.raise_for_status() | |
| content = res.json()['choices'][0]['message']['content'] | |
| current_app.logger.info(f"[BATCH-NEETPREP] NVIDIA API response length: {len(content)} chars") | |
| current_app.logger.debug(f"[BATCH-NEETPREP] Raw response: {content[:1000]}...") | |
| # Parse JSON from response | |
| if "```json" in content: | |
| content = content.split("```json")[1].split("```")[0].strip() | |
| elif "```" in content: | |
| content = content.split("```")[1].split("```")[0].strip() | |
| parsed_data = json.loads(content) | |
| current_app.logger.info(f"[BATCH-NEETPREP] Parsed data has {len(parsed_data.get('data', []))} items") | |
| # Build results for each question | |
| results = {} | |
| fallback_topics = { | |
| 'biology': ['Cell: The Unit of Life', 'Biomolecules', 'Human Reproduction'], | |
| 'chemistry': ['Organic Chemistry – Some Basic Principles and Techniques (GOC)', 'Chemical Bonding and Molecular Structure'], | |
| 'physics': ['Laws of Motion', 'Work, Energy and Power', 'Current Electricity'], | |
| 'mathematics': ['Calculus', 'Algebra', 'Coordinate Geometry'] | |
| } | |
| if parsed_data.get('data'): | |
| data_items = parsed_data['data'] | |
| current_app.logger.info(f"[BATCH-NEETPREP] AI returned {len(data_items)} items, we have {len(questions_data)} questions") | |
| # Helper to extract suggestions from item (handles both old and new format) | |
| def extract_suggestions(item): | |
| suggestions = [] | |
| # Try new compact format first, then old format | |
| primary = item.get('ch') or item.get('chapter_title') | |
| if primary and primary != 'Unclassified': | |
| suggestions.append(primary) | |
| # Handle alternatives - new format uses 'alt' (string), old uses 'other_possible_chapters' (list) | |
| alt = item.get('alt') | |
| if alt and alt != 'Unclassified' and alt != 'null': | |
| suggestions.append(alt) | |
| others = item.get('other_possible_chapters') | |
| if isinstance(others, list): | |
| suggestions.extend([c for c in others if c and c != 'Unclassified']) | |
| return suggestions | |
| # Helper to get index from item | |
| def get_item_index(item): | |
| return item.get('i') or item.get('index') or 0 | |
| # Try to match by index first, then fall back to order-based matching | |
| matched_by_index = 0 | |
| for item in data_items: | |
| item_index = get_item_index(item) | |
| current_app.logger.debug(f"[BATCH-NEETPREP] Processing item with index={item_index}: {item}") | |
| matching_q = next((q for q in questions_data if q['index'] == item_index), None) | |
| if matching_q: | |
| matched_by_index += 1 | |
| suggestions = extract_suggestions(item) | |
| if not suggestions: | |
| suggestions = fallback_topics.get(subject.lower(), ['Unclassified']) | |
| results[matching_q['id']] = { | |
| 'success': True, | |
| 'suggestions': suggestions[:5], | |
| 'subject': subject, | |
| 'other_possible_subjects': [] | |
| } | |
| current_app.logger.info(f"[BATCH-NEETPREP] Question {matching_q['id']}: matched by index, suggestions={suggestions[:3]}") | |
| # If index matching failed, try order-based matching | |
| if matched_by_index == 0 and len(data_items) > 0: | |
| current_app.logger.warning(f"[BATCH-NEETPREP] Index matching failed, trying order-based matching") | |
| for i, item in enumerate(data_items): | |
| if i < len(questions_data): | |
| q = questions_data[i] | |
| if q['id'] not in results: | |
| suggestions = extract_suggestions(item) | |
| if not suggestions: | |
| suggestions = fallback_topics.get(subject.lower(), ['Unclassified']) | |
| results[q['id']] = { | |
| 'success': True, | |
| 'suggestions': suggestions[:5], | |
| 'subject': subject, | |
| 'other_possible_subjects': [] | |
| } | |
| current_app.logger.info(f"[BATCH-NEETPREP] Question {q['id']}: matched by order, suggestions={suggestions[:3]}") | |
| # Fill in any missing results | |
| for q in questions_data: | |
| if q['id'] not in results: | |
| current_app.logger.warning(f"[BATCH-NEETPREP] Question {q['id']}: using fallback (no match in API response)") | |
| results[q['id']] = { | |
| 'success': True, | |
| 'suggestions': fallback_topics.get(subject.lower(), ['Unclassified']), | |
| 'subject': subject, | |
| 'other_possible_subjects': [] | |
| } | |
| current_app.logger.info(f"[BATCH-NEETPREP] Returning results for {len(results)} questions") | |
| return jsonify({'success': True, 'results': results}) | |
| except Exception as e: | |
| current_app.logger.error(f"Error in batch suggestions: {repr(e)}") | |
| fallback_result = { | |
| 'success': True, | |
| 'suggestions': ['Unclassified'], | |
| 'subject': subject, | |
| 'other_possible_subjects': [] | |
| } | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e), | |
| 'results': {qid: fallback_result for qid in question_ids} | |
| }) | |
| # ============== BOOKMARK FEATURE ============== | |
| def get_bookmark_collections(): | |
| """Get all bookmark collections (sessions of type neetprep_collection) for the user.""" | |
| conn = get_db_connection() | |
| collections = conn.execute(""" | |
| SELECT s.id, s.name, s.subject, s.tags, s.notes, s.created_at, | |
| COUNT(b.id) as question_count | |
| FROM sessions s | |
| LEFT JOIN neetprep_bookmarks b ON s.id = b.session_id AND b.user_id = ? | |
| WHERE s.user_id = ? AND s.session_type = 'neetprep_collection' | |
| GROUP BY s.id | |
| ORDER BY s.created_at DESC | |
| """, (current_user.id, current_user.id)).fetchall() | |
| conn.close() | |
| return jsonify({'success': True, 'collections': [dict(c) for c in collections]}) | |
| def create_bookmark_collection(): | |
| """Create a new bookmark collection (session).""" | |
| import uuid | |
| data = request.json | |
| name = data.get('name', 'New Collection') | |
| subject = data.get('subject', '') | |
| tags = data.get('tags', '') | |
| notes = data.get('notes', '') | |
| session_id = str(uuid.uuid4()) | |
| conn = get_db_connection() | |
| conn.execute(""" | |
| INSERT INTO sessions (id, name, subject, tags, notes, user_id, session_type, persist) | |
| VALUES (?, ?, ?, ?, ?, ?, 'neetprep_collection', 1) | |
| """, (session_id, name, subject, tags, notes, current_user.id)) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True, 'session_id': session_id, 'name': name}) | |
| def delete_bookmark_collection(session_id): | |
| """Delete a bookmark collection and all its bookmarks.""" | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id FROM sessions WHERE id = ? AND user_id = ?', (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| conn.execute('DELETE FROM neetprep_bookmarks WHERE session_id = ? AND user_id = ?', (session_id, current_user.id)) | |
| conn.execute('DELETE FROM sessions WHERE id = ?', (session_id,)) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True}) | |
| def update_bookmark_collection(session_id): | |
| """Update collection metadata.""" | |
| data = request.json | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id FROM sessions WHERE id = ? AND user_id = ?', (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| updates = [] | |
| params = [] | |
| if 'name' in data: | |
| updates.append('name = ?') | |
| params.append(data['name']) | |
| if 'subject' in data: | |
| updates.append('subject = ?') | |
| params.append(data['subject']) | |
| if 'tags' in data: | |
| updates.append('tags = ?') | |
| params.append(data['tags']) | |
| if 'notes' in data: | |
| updates.append('notes = ?') | |
| params.append(data['notes']) | |
| if updates: | |
| params.append(session_id) | |
| conn.execute(f"UPDATE sessions SET {', '.join(updates)} WHERE id = ?", params) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True}) | |
| def add_bookmark(): | |
| """Add a question to a collection.""" | |
| data = request.json | |
| question_id = data.get('question_id') | |
| session_id = data.get('session_id') | |
| question_type = data.get('question_type', 'neetprep') # 'neetprep' or 'classified' | |
| if not question_id or not session_id: | |
| return jsonify({'error': 'question_id and session_id are required'}), 400 | |
| conn = get_db_connection() | |
| # Verify session ownership | |
| session = conn.execute('SELECT id FROM sessions WHERE id = ? AND user_id = ?', (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| # Check if already bookmarked | |
| existing = conn.execute( | |
| 'SELECT id FROM neetprep_bookmarks WHERE user_id = ? AND neetprep_question_id = ? AND session_id = ? AND question_type = ?', | |
| (current_user.id, str(question_id), session_id, question_type) | |
| ).fetchone() | |
| if existing: | |
| conn.close() | |
| return jsonify({'success': True, 'message': 'Already bookmarked'}) | |
| conn.execute( | |
| 'INSERT INTO neetprep_bookmarks (user_id, neetprep_question_id, session_id, question_type) VALUES (?, ?, ?, ?)', | |
| (current_user.id, str(question_id), session_id, question_type) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True}) | |
| def remove_bookmark(): | |
| """Remove a question from a collection.""" | |
| data = request.json | |
| question_id = data.get('question_id') | |
| session_id = data.get('session_id') | |
| question_type = data.get('question_type', 'neetprep') | |
| if not question_id or not session_id: | |
| return jsonify({'error': 'question_id and session_id are required'}), 400 | |
| conn = get_db_connection() | |
| # Verify session ownership | |
| session = conn.execute('SELECT id FROM sessions WHERE id = ? AND user_id = ?', (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| conn.execute( | |
| 'DELETE FROM neetprep_bookmarks WHERE user_id = ? AND neetprep_question_id = ? AND session_id = ? AND question_type = ?', | |
| (current_user.id, str(question_id), session_id, question_type) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True}) | |
| def bulk_bookmark(): | |
| """Add multiple neetprep questions to a collection at once.""" | |
| data = request.json | |
| question_ids = data.get('question_ids', []) | |
| session_id = data.get('session_id') | |
| if not question_ids or not session_id: | |
| return jsonify({'error': 'question_ids and session_id are required'}), 400 | |
| conn = get_db_connection() | |
| # Verify session ownership | |
| session = conn.execute('SELECT id FROM sessions WHERE id = ? AND user_id = ?', (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| added_count = 0 | |
| for qid in question_ids: | |
| existing = conn.execute( | |
| 'SELECT id FROM neetprep_bookmarks WHERE user_id = ? AND neetprep_question_id = ? AND session_id = ?', | |
| (current_user.id, qid, session_id) | |
| ).fetchone() | |
| if not existing: | |
| conn.execute( | |
| 'INSERT INTO neetprep_bookmarks (user_id, neetprep_question_id, session_id) VALUES (?, ?, ?)', | |
| (current_user.id, qid, session_id) | |
| ) | |
| added_count += 1 | |
| conn.commit() | |
| conn.close() | |
| return jsonify({'success': True, 'added_count': added_count}) | |
| def get_collection_questions(session_id): | |
| """Get all questions in a bookmark collection.""" | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id, name, subject, tags, notes FROM sessions WHERE id = ? AND user_id = ?', | |
| (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| questions = conn.execute(""" | |
| SELECT nq.id, nq.question_text, nq.options, nq.correct_answer_index, | |
| nq.level, nq.topic, nq.subject, b.created_at as bookmarked_at | |
| FROM neetprep_bookmarks b | |
| JOIN neetprep_questions nq ON b.neetprep_question_id = nq.id | |
| WHERE b.session_id = ? AND b.user_id = ? | |
| ORDER BY b.created_at DESC | |
| """, (session_id, current_user.id)).fetchall() | |
| conn.close() | |
| return jsonify({ | |
| 'success': True, | |
| 'collection': dict(session), | |
| 'questions': [dict(q) for q in questions] | |
| }) | |
| def view_collection(session_id): | |
| """View a bookmark collection with its questions.""" | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id, name, subject, tags, notes, created_at FROM sessions WHERE id = ? AND user_id = ?', | |
| (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| from flask import redirect, flash | |
| flash('Collection not found', 'danger') | |
| return redirect(url_for('dashboard.dashboard', filter='collections')) | |
| # Fetch neetprep questions | |
| neetprep_questions = conn.execute(""" | |
| SELECT nq.id, nq.question_text, nq.options, nq.correct_answer_index, | |
| nq.level, nq.topic, nq.subject, b.created_at as bookmarked_at, 'neetprep' as question_type | |
| FROM neetprep_bookmarks b | |
| JOIN neetprep_questions nq ON b.neetprep_question_id = nq.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'neetprep' | |
| ORDER BY nq.topic, b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| # Fetch classified questions | |
| classified_questions = conn.execute(""" | |
| SELECT q.id, q.question_text, NULL as options, q.actual_solution as correct_answer_index, | |
| NULL as level, q.chapter as topic, q.subject, b.created_at as bookmarked_at, 'classified' as question_type, | |
| i.processed_filename as image_filename, q.question_number | |
| FROM neetprep_bookmarks b | |
| JOIN questions q ON CAST(b.neetprep_question_id AS INTEGER) = q.id | |
| LEFT JOIN images i ON q.image_id = i.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'classified' | |
| ORDER BY q.chapter, b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| conn.close() | |
| # Combine all questions | |
| all_questions = [] | |
| for q in neetprep_questions: | |
| qd = dict(q) | |
| qd['image_filename'] = None | |
| all_questions.append(qd) | |
| for q in classified_questions: | |
| all_questions.append(dict(q)) | |
| # Group questions by topic | |
| topics = {} | |
| for q in all_questions: | |
| topic = q['topic'] or 'Unclassified' | |
| if topic not in topics: | |
| topics[topic] = [] | |
| topics[topic].append(q) | |
| return render_template('collection_view.html', | |
| collection=dict(session), | |
| questions=all_questions, | |
| topics=topics, | |
| question_count=len(all_questions)) | |
| def get_question_collections(question_id): | |
| """Get which collections a question is bookmarked in.""" | |
| conn = get_db_connection() | |
| collections = conn.execute(""" | |
| SELECT s.id, s.name | |
| FROM neetprep_bookmarks b | |
| JOIN sessions s ON b.session_id = s.id | |
| WHERE b.neetprep_question_id = ? AND b.user_id = ? | |
| """, (question_id, current_user.id)).fetchall() | |
| conn.close() | |
| return jsonify({ | |
| 'success': True, | |
| 'collections': [dict(c) for c in collections] | |
| }) | |
| def get_batch_bookmark_statuses(): | |
| """Get bookmark statuses for multiple questions at once.""" | |
| data = request.get_json() | |
| question_ids = data.get('question_ids', []) | |
| if not question_ids: | |
| return jsonify({'success': True, 'bookmarks': {}}) | |
| conn = get_db_connection() | |
| # Build query for all question IDs | |
| placeholders = ','.join(['?' for _ in question_ids]) | |
| query = f""" | |
| SELECT neetprep_question_id, session_id | |
| FROM neetprep_bookmarks | |
| WHERE neetprep_question_id IN ({placeholders}) AND user_id = ? | |
| """ | |
| bookmarks = conn.execute(query, question_ids + [current_user.id]).fetchall() | |
| conn.close() | |
| # Group by question_id | |
| result = {} | |
| for b in bookmarks: | |
| qid = str(b['neetprep_question_id']) | |
| if qid not in result: | |
| result[qid] = [] | |
| result[qid].append(b['session_id']) | |
| return jsonify({ | |
| 'success': True, | |
| 'bookmarks': result | |
| }) | |
| def collection_quiz(session_id): | |
| """Start a quiz from a bookmark collection.""" | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id, name FROM sessions WHERE id = ? AND user_id = ?', | |
| (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| from flask import redirect, flash | |
| flash('Collection not found', 'danger') | |
| return redirect(url_for('dashboard.dashboard', filter='collections')) | |
| all_questions = [] | |
| # Get neetprep bookmarked questions | |
| neetprep_questions = conn.execute(""" | |
| SELECT nq.id, nq.question_text, nq.options, nq.correct_answer_index, | |
| nq.level, nq.topic, nq.subject | |
| FROM neetprep_bookmarks b | |
| JOIN neetprep_questions nq ON b.neetprep_question_id = nq.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'neetprep' | |
| ORDER BY b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| for q in neetprep_questions: | |
| try: | |
| html_content = f"""<html><head><meta charset="utf-8"></head><body>{q['question_text']}</body></html>""" | |
| img_filename = f"neetprep_{q['id']}.jpg" | |
| img_path = os.path.join(current_app.config['TEMP_FOLDER'], img_filename) | |
| imgkit.from_string(html_content, img_path, options={'width': 800}) | |
| all_questions.append({ | |
| 'image_path': f"/tmp/{img_filename}", | |
| 'details': { | |
| 'id': q['id'], | |
| 'options': json.loads(q['options']) if q['options'] else [], | |
| 'correct_answer_index': q['correct_answer_index'], | |
| 'user_answer_index': None, | |
| 'source': 'neetprep', | |
| 'topic': q['topic'], | |
| 'subject': q['subject'] | |
| } | |
| }) | |
| except Exception as e: | |
| current_app.logger.error(f"Failed to convert question {q['id']} to image: {e}") | |
| # Get classified bookmarked questions | |
| classified_questions = conn.execute(""" | |
| SELECT q.id, q.actual_solution as correct_answer_index, q.marked_solution as user_answer_index, | |
| q.chapter as topic, q.subject, i.processed_filename, i.note_filename | |
| FROM neetprep_bookmarks b | |
| JOIN questions q ON CAST(b.neetprep_question_id AS INTEGER) = q.id | |
| LEFT JOIN images i ON q.image_id = i.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'classified' | |
| ORDER BY b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| for q in classified_questions: | |
| if q['processed_filename']: | |
| all_questions.append({ | |
| 'image_path': f"/processed/{q['processed_filename']}", | |
| 'details': { | |
| 'id': q['id'], | |
| 'options': [], | |
| 'correct_answer_index': q['correct_answer_index'], | |
| 'user_answer_index': q['user_answer_index'], | |
| 'source': 'classified', | |
| 'topic': q['topic'], | |
| 'subject': q['subject'], | |
| 'note_filename': q['note_filename'] | |
| } | |
| }) | |
| conn.close() | |
| if not all_questions: | |
| from flask import redirect, flash | |
| flash('No questions in this collection', 'warning') | |
| return redirect(url_for('neetprep_bp.view_collection', session_id=session_id)) | |
| return render_template('quiz_v2.html', questions=all_questions) | |
| def generate_collection_pdf(session_id): | |
| """Generate a PDF from a bookmark collection.""" | |
| conn = get_db_connection() | |
| # Verify ownership | |
| session = conn.execute('SELECT id, name, subject FROM sessions WHERE id = ? AND user_id = ?', | |
| (session_id, current_user.id)).fetchone() | |
| if not session: | |
| conn.close() | |
| return jsonify({'error': 'Collection not found'}), 404 | |
| # Get neetprep bookmarked questions | |
| neetprep_questions = conn.execute(""" | |
| SELECT nq.id, nq.question_text, nq.options, nq.correct_answer_index, | |
| nq.level, nq.topic, nq.subject | |
| FROM neetprep_bookmarks b | |
| JOIN neetprep_questions nq ON b.neetprep_question_id = nq.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'neetprep' | |
| ORDER BY nq.topic, b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| # Get classified bookmarked questions | |
| classified_questions = conn.execute(""" | |
| SELECT q.id, q.question_text, q.actual_solution as correct_answer_index, | |
| q.chapter as topic, q.subject, i.processed_filename | |
| FROM neetprep_bookmarks b | |
| JOIN questions q ON CAST(b.neetprep_question_id AS INTEGER) = q.id | |
| LEFT JOIN images i ON q.image_id = i.id | |
| WHERE b.session_id = ? AND b.user_id = ? AND b.question_type = 'classified' | |
| ORDER BY q.chapter, b.created_at | |
| """, (session_id, current_user.id)).fetchall() | |
| conn.close() | |
| if not neetprep_questions and not classified_questions: | |
| return jsonify({'error': 'No questions in this collection'}), 400 | |
| data = request.json or {} | |
| all_questions = [] | |
| for q in neetprep_questions: | |
| all_questions.append({ | |
| "id": q['id'], | |
| "question_text": q['question_text'], | |
| "options": json.loads(q['options']) if q['options'] else [], | |
| "correct_answer_index": q['correct_answer_index'], | |
| "user_answer_index": None, | |
| "status": "wrong", | |
| "source": "neetprep", | |
| "custom_fields": { | |
| "difficulty": q['level'], | |
| "topic": q['topic'], | |
| "subject": q['subject'] | |
| } | |
| }) | |
| for q in classified_questions: | |
| if q['processed_filename']: | |
| # Use absolute path for PDF generation | |
| abs_img_path = os.path.abspath(os.path.join(current_app.config['PROCESSED_FOLDER'], q['processed_filename'])) | |
| all_questions.append({ | |
| "id": q['id'], | |
| "question_text": f"<img src=\"{abs_img_path}\" style=\"max-width:100%;\" />", | |
| "options": [], | |
| "correct_answer_index": q['correct_answer_index'], | |
| "user_answer_index": None, | |
| "status": "wrong", | |
| "source": "classified", | |
| "custom_fields": { | |
| "topic": q['topic'], | |
| "subject": q['subject'] | |
| } | |
| }) | |
| topics = list(set(q['custom_fields']['topic'] for q in all_questions if q['custom_fields'].get('topic'))) | |
| final_json_output = { | |
| "version": "2.1", | |
| "test_name": session['name'] or "Bookmark Collection", | |
| "config": {"font_size": 22, "auto_generate_pdf": False, "layout": data.get('layout', {})}, | |
| "metadata": {"source_book": "NeetPrep Collection", "tags": ", ".join(topics)}, | |
| "questions": all_questions, | |
| "view": True | |
| } | |
| try: | |
| result, status_code = _process_json_and_generate_pdf(final_json_output, current_user.id) | |
| if status_code != 200: | |
| return jsonify(result), status_code | |
| if result.get('success'): | |
| return jsonify({'success': True, 'pdf_url': result.get('view_url')}) | |
| else: | |
| return jsonify({'error': result.get('error', 'Failed to generate PDF')}), 500 | |
| except Exception as e: | |
| current_app.logger.error(f"Failed to generate collection PDF: {repr(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ============== END BOOKMARK FEATURE ============== | |
| def run_hardcoded_query(query_template, **kwargs): | |
| """Helper function to run a GraphQL query.""" | |
| final_query = query_template.format(**kwargs) | |
| payload = {'query': final_query, 'variables': {}} | |
| try: | |
| response = requests.post(ENDPOINT_URL, headers=HEADERS, json=payload, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| current_app.logger.error(f"NeetPrep API Request Error: {repr(e)}") | |
| return None | |