Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, jsonify, send_file, session, redirect, url_for, flash | |
| import os | |
| import json | |
| import tempfile | |
| from werkzeug.utils import secure_filename | |
| from exam_question_system import ExamQuestionSystem | |
| from datetime import datetime | |
| import uuid | |
| import threading | |
| import time | |
| from docx import Document | |
| import PyPDF2 | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| app.config['TEMP_INPUT_FOLDER'] = os.path.join(tempfile.gettempdir(), 'eqg_inputs') | |
| app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Disable caching for development | |
| app.secret_key = 'your-secret-key-change-this-in-production' | |
| # Create necessary directories if they don't exist | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| os.makedirs(app.config['TEMP_INPUT_FOLDER'], exist_ok=True) | |
| # Global variables for question system | |
| question_system = None | |
| system_loading = False | |
| system_load_error = None | |
| def read_file_content(filepath): | |
| """Read content from a file based on its extension.""" | |
| ext = os.path.splitext(filepath)[1].lower() | |
| try: | |
| if ext == '.pdf': | |
| text = "" | |
| with open(filepath, 'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| elif ext == '.docx': | |
| doc = Document(filepath) | |
| return "\n".join([paragraph.text for paragraph in doc.paragraphs]) | |
| else: | |
| # Default to text file | |
| with open(filepath, 'r', encoding='utf-8', errors='replace') as f: | |
| return f.read() | |
| except Exception as e: | |
| raise Exception(f"Error reading {ext} file: {str(e)}") | |
| def get_question_system(): | |
| """Get or initialize the question generation system.""" | |
| global question_system, system_loading, system_load_error | |
| if question_system is None and not system_loading: | |
| if system_load_error: | |
| raise Exception(f"System failed to load: {system_load_error}") | |
| system_loading = True | |
| try: | |
| print("Initializing question generation system...") | |
| question_system = ExamQuestionSystem() | |
| print("Question generation system loaded successfully!") | |
| except Exception as e: | |
| system_load_error = str(e) | |
| system_loading = False | |
| raise e | |
| finally: | |
| system_loading = False | |
| if system_loading: | |
| raise Exception("System is still loading, please wait...") | |
| return question_system | |
| # Utility: parse human-readable duration text into minutes when possible | |
| def parse_duration_to_minutes(duration_text): | |
| try: | |
| if not duration_text: | |
| return None | |
| text = duration_text.strip().lower() | |
| # Normalize | |
| text = text.replace('hrs', 'h').replace('hr', 'h').replace('hours', 'h').replace('hour', 'h') | |
| text = text.replace('minutes', 'm').replace('minute', 'm').replace('mins', 'm').replace('min', 'm') | |
| # Patterns like '2h 30m' | |
| import re | |
| hours = 0 | |
| minutes = 0 | |
| # Match hours | |
| h_match = re.search(r"(\d+)\s*h", text) | |
| if h_match: | |
| hours = int(h_match.group(1)) | |
| # Match minutes | |
| m_match = re.search(r"(\d+)\s*m", text) | |
| if m_match: | |
| minutes = int(m_match.group(1)) | |
| if h_match or m_match: | |
| return hours * 60 + minutes | |
| # If only a number, treat as minutes | |
| just_num = re.fullmatch(r"\s*(\d+)\s*", duration_text) | |
| if just_num: | |
| return int(just_num.group(1)) | |
| # If something like '3 hour' without m, captured above; if unparseable, return None | |
| return None | |
| except Exception: | |
| return None | |
| def index(): | |
| """Welcome page before the step flow.""" | |
| # Clear any existing session data | |
| session.clear() | |
| return render_template('welcome.html', project_name='AutoExamGen') | |
| def step1_input(): | |
| """Step 1: Syllabus input page.""" | |
| return render_template('step1_input.html') | |
| def step2_configuration(): | |
| """Step 2: Question configuration page.""" | |
| if request.method == 'GET': | |
| # If user tries to access /step2 directly, redirect to step1 | |
| return redirect(url_for('step1_input')) | |
| # Handle POST request (form submission from step1) | |
| content = None | |
| try: | |
| # Get text input (from textarea) | |
| text_input = request.form.get('text_input', '').strip() | |
| # Check if file was uploaded | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file and file.filename != '': | |
| try: | |
| # Save the uploaded file | |
| filename = secure_filename(file.filename) | |
| if not os.path.exists(app.config['UPLOAD_FOLDER']): | |
| os.makedirs(app.config['UPLOAD_FOLDER']) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| # Store file path in session | |
| session['content_file'] = filepath | |
| session.pop('content_text', None) # Clear any text content if it exists | |
| # Read the file content for processing | |
| content = read_file_content(filepath) | |
| print(f"File uploaded successfully: {filename}, Content length: {len(content)}") | |
| except Exception as e: | |
| error_msg = f'Error processing file: {str(e)}' | |
| print(error_msg) | |
| flash(error_msg, 'error') | |
| return redirect(url_for('step1_input')) | |
| # If no file but text content is provided | |
| if not content and text_input: | |
| # For small text content, store directly in session | |
| if len(text_input) < 2000: | |
| session['content_text'] = text_input | |
| content = text_input | |
| else: | |
| # For large content, save to a temporary file | |
| temp_file = os.path.join(app.config['UPLOAD_FOLDER'], f'temp_{int(time.time())}.txt') | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| f.write(text_input) | |
| session['content_file'] = temp_file | |
| content = text_input | |
| print(f"Text input processed, Content length: {len(content)}") | |
| # Validate that we have content | |
| if not content or not content.strip(): | |
| flash('Please provide either a file or paste content', 'error') | |
| return redirect(url_for('step1_input')) | |
| # Process the content for question generation | |
| try: | |
| # Initialize question system if not already done | |
| print("Initializing question system...") | |
| question_system = get_question_system() | |
| print("Question system initialized successfully") | |
| # Store word count for progress display | |
| word_count = len(content.split()) | |
| session['word_count'] = word_count | |
| print(f"Content processed: {word_count} words") | |
| # Render the configuration page | |
| return render_template('step2_config.html', | |
| word_count=word_count, | |
| has_syllabus=bool(session.get('syllabus_text', ''))) | |
| except Exception as e: | |
| error_msg = f'Error initializing question system: {str(e)}' | |
| print(error_msg) | |
| import traceback | |
| traceback.print_exc() | |
| flash(error_msg, 'error') | |
| return redirect(url_for('step1_input')) | |
| except Exception as e: | |
| error_msg = f'An error occurred: {str(e)}' | |
| print(error_msg) | |
| import traceback | |
| traceback.print_exc() | |
| flash(error_msg, 'error') | |
| return redirect(url_for('step1_input')) | |
| def step3_generate(): | |
| """Step 3: Generate and display question paper.""" | |
| try: | |
| # Get form data | |
| num_questions = int(request.form.get('num_questions', 5)) | |
| question_types = request.form.getlist('question_types') | |
| # Get content from session or uploaded file | |
| content = None | |
| # Check for uploaded file first | |
| if 'content_file' in session and os.path.exists(session['content_file']): | |
| try: | |
| content = read_file_content(session['content_file']) | |
| except Exception as e: | |
| flash(f'Error reading uploaded file: {str(e)}', 'error') | |
| return redirect(url_for('step1_input')) | |
| # Check for direct text content | |
| elif 'content_text' in session and session['content_text']: | |
| content = session['content_text'] | |
| # If no content found, redirect to step 1 | |
| if not content: | |
| flash('No content found. Please provide content first.', 'error') | |
| return redirect(url_for('step1_input')) | |
| # Initialize question system | |
| try: | |
| question_system = get_question_system() | |
| # Helper function to safely get integer values from form | |
| def get_int(form, key, default=0): | |
| try: | |
| return int(form.get(key, default)) | |
| except (ValueError, TypeError): | |
| return default | |
| # Store configuration in session with all required fields and safe defaults | |
| config = { | |
| 'exam_name': request.form.get('exam_name', 'Sample Exam'), | |
| 'subject_name': request.form.get('subject_name', 'Subject'), | |
| 'duration': get_int(request.form, 'duration', 60), | |
| 'short_questions': get_int(request.form, 'short_questions', 2), | |
| 'short_marks': get_int(request.form, 'short_marks', 2), | |
| 'long_questions': get_int(request.form, 'long_questions', 1), | |
| 'long_marks': get_int(request.form, 'long_marks', 5), | |
| 'long_attempt': get_int(request.form, 'long_attempt', 1), | |
| 'mcq_questions': get_int(request.form, 'mcq_questions', 2), | |
| 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| } | |
| session['exam_config'] = config | |
| # Get the syllabus/content text | |
| content = "" | |
| if 'content_text' in session: | |
| content = session['content_text'] | |
| elif 'content_file' in session and os.path.exists(session['content_file']): | |
| content = read_file_content(session['content_file']) | |
| if not content.strip(): | |
| raise ValueError("No content available for question generation.") | |
| # Generate questions based on content and configuration | |
| print("Generating questions from content...") | |
| # Get number of questions for each type | |
| num_mcq = config['mcq_questions'] | |
| num_short = config['short_questions'] | |
| num_long = config['long_questions'] | |
| # Generate questions using the question system | |
| # We request enough questions for all sections, but without auto-MCQ generation | |
| # so we can handle categorization manually | |
| total_questions_needed = num_mcq + num_short + num_long | |
| results = question_system.generate_exam_questions( | |
| input_text=content, | |
| max_questions=total_questions_needed, | |
| include_mcq=False, # We'll generate options manually for specific questions | |
| syllabus_text=content | |
| ) | |
| all_questions = results.get('questions', []) | |
| # Initialize categories | |
| generated_questions = { | |
| 'mcq_questions': [], | |
| 'short_questions': [], | |
| 'long_questions': [] | |
| } | |
| # Distribute questions | |
| # We prioritize Short and Long questions first to ensure they get content, | |
| # as MCQs are easier to fallback/generate | |
| # Filter out questions that are too simple for Long answers | |
| long_candidates = [q for q in all_questions if len(q.get('context', '').split()) > 10] | |
| short_candidates = [q for q in all_questions if q not in long_candidates] | |
| # If we don't have enough long candidates, take from short | |
| if len(long_candidates) < num_long: | |
| needed = num_long - len(long_candidates) | |
| long_candidates.extend(short_candidates[:needed]) | |
| short_candidates = short_candidates[needed:] | |
| # 3. Process Long Questions (Prioritize these) | |
| for _ in range(num_long): | |
| if long_candidates: | |
| q = long_candidates.pop(0) | |
| q['type'] = 'long_answer' | |
| generated_questions['long_questions'].append(q) | |
| # Remove from all_questions so we don't reuse | |
| if q in all_questions: | |
| all_questions.remove(q) | |
| # 2. Process Short Questions | |
| for _ in range(num_short): | |
| if short_candidates: | |
| q = short_candidates.pop(0) | |
| q['type'] = 'short_answer' | |
| generated_questions['short_questions'].append(q) | |
| if q in all_questions: | |
| all_questions.remove(q) | |
| elif all_questions: # Fallback to any remaining | |
| q = all_questions.pop(0) | |
| q['type'] = 'short_answer' | |
| generated_questions['short_questions'].append(q) | |
| # 1. Process MCQs (Use remaining questions) | |
| # Extract global keywords for distractors | |
| global_keywords = [k[1] for k in results.get('keywords', [])] | |
| for _ in range(num_mcq): | |
| if all_questions: | |
| q = all_questions.pop(0) | |
| # Generate options for this question | |
| try: | |
| mcq_data = question_system.option_generator.create_mcq_options( | |
| q['question'], | |
| q['context'], | |
| correct_answer=q.get('correct_answer'), | |
| global_keywords=global_keywords | |
| ) | |
| if mcq_data and 'options' in mcq_data: | |
| q.update(mcq_data) | |
| q['type'] = 'mcq' | |
| generated_questions['mcq_questions'].append(q) | |
| else: | |
| # Fallback if option generation fails | |
| q['type'] = 'short_answer' | |
| generated_questions['short_questions'].append(q) | |
| except Exception as e: | |
| print(f"Error generating options: {e}") | |
| q['type'] = 'short_answer' | |
| generated_questions['short_questions'].append(q) | |
| # Store the generated questions | |
| session['generated_questions'] = generated_questions | |
| # Calculate and store total marks | |
| total_marks = ( | |
| (len(session['generated_questions']['short_questions']) * config['short_marks']) + | |
| (len(session['generated_questions']['long_questions']) * config['long_marks']) + | |
| len(session['generated_questions']['mcq_questions']) # 1 mark per MCQ | |
| ) | |
| session['total_marks'] = total_marks | |
| # Redirect to results page | |
| return redirect(url_for('show_results')) | |
| except Exception as e: | |
| error_msg = f'Error generating questions: {str(e)}' | |
| print(error_msg) | |
| flash(error_msg, 'error') | |
| return redirect(url_for('step1_input')) | |
| except Exception as e: | |
| error_msg = f'An error occurred: {str(e)}' | |
| print(error_msg) | |
| flash(error_msg, 'error') | |
| return redirect(url_for('step1_input')) | |
| def download_paper(): | |
| """Download the generated question paper as HTML.""" | |
| try: | |
| if 'question_paper' not in session: | |
| return redirect(url_for('index')) | |
| # Generate a unique filename | |
| filename = f"question_paper_{uuid.uuid4().hex[:8]}.html" | |
| # Render the printable version | |
| # Determine display duration similarly to step3 | |
| cfg = session['config'] | |
| display_duration = cfg.get('exam_duration') if cfg.get('exam_duration') else int(round(session['total_marks'] * 1.5)) | |
| html_content = render_template('printable_paper.html', | |
| question_paper=session['question_paper'], | |
| config=cfg, | |
| total_marks=session['total_marks'], | |
| exam_date=session['exam_date'], | |
| display_duration=display_duration) | |
| # Create a temporary file | |
| temp_file = os.path.join(tempfile.gettempdir(), filename) | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| f.write(html_content) | |
| return send_file(temp_file, as_attachment=True, download_name=filename) | |
| except Exception as e: | |
| return jsonify({'error': f'Error downloading paper: {str(e)}'}), 500 | |
| def show_results(): | |
| """Display the generated questions.""" | |
| if 'generated_questions' not in session or 'exam_config' not in session: | |
| flash('No questions generated yet. Please start from the beginning.', 'error') | |
| return redirect(url_for('step1_input')) | |
| # Get config with defaults | |
| config = session.get('exam_config', {}) | |
| questions = session.get('generated_questions', {}) | |
| # Ensure all required question types exist in the questions dictionary | |
| for qtype in ['mcq_questions', 'short_questions', 'long_questions']: | |
| if qtype not in questions: | |
| questions[qtype] = [] | |
| # Calculate total marks | |
| total_marks = 0 | |
| if 'mcq_questions' in questions: | |
| total_marks += len(questions['mcq_questions']) | |
| if 'short_questions' in questions: | |
| total_marks += len(questions['short_questions']) * 2 # 2 marks per short question | |
| if 'long_questions' in questions: | |
| total_marks += len(questions['long_questions']) * 5 # 5 marks per long question | |
| # Get exam date from config or use current date | |
| exam_date = config.get('timestamp', datetime.now().strftime('%Y-%m-%d %H:%M:%S')) | |
| return render_template('step3_result.html', | |
| question_paper=questions, # Changed from questions to question_paper | |
| config=config, | |
| total_marks=total_marks, | |
| exam_date=datetime.strptime(exam_date, '%Y-%m-%d %H:%M:%S').strftime('%B %d, %Y'), | |
| display_duration=config.get('duration', 60)) # Use configured duration or default to 60 minutes | |
| def health_check(): | |
| """Health check endpoint.""" | |
| return jsonify({'status': 'healthy', 'service': 'Exam Question Generator'}) | |
| def warmup(): | |
| """Warmup endpoint to initialize the system.""" | |
| try: | |
| system = get_question_system() | |
| return jsonify({ | |
| 'status': 'ready', | |
| 'message': 'Question generation system is ready', | |
| 'uses_transformers': system.question_generator.use_transformers | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'status': 'loading' if 'still loading' in str(e) else 'error', | |
| 'message': str(e) | |
| }), 202 if 'still loading' in str(e) else 500 | |
| if __name__ == '__main__': | |
| print("π Starting Flask Web Application...") | |
| print("π Exam Question Generator Web Interface") | |
| print("π Access the application at: http://localhost:5000") | |
| print("π‘ Using rule-based question generation for faster startup") | |
| print("β‘ System will initialize on first use") | |
| # Configure Flask for better timeout handling | |
| app.config['PERMANENT_SESSION_LIFETIME'] = 1800 # 30 minutes | |
| port = int(os.environ.get("PORT", 7860)) # HF uses 7860 | |
| app.run(host="0.0.0.0", port=port, debug=False) | |