Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import os | |
| import uuid | |
| import base64 | |
| import pdfplumber | |
| from docx import Document | |
| from werkzeug.utils import secure_filename | |
| from model_utils import extract_mcqs_from_file, split_large_mcq_list | |
| import re | |
| app = Flask(__name__) | |
| app.config['UPLOAD_FOLDER'] = '/tmp/uploads' | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| quiz_data_store = {} | |
| chunk_tracker = {} | |
| def extract_text_from_pdf(filepath): | |
| text = [] | |
| with pdfplumber.open(filepath) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text.append(page_text) | |
| return "\n".join(text) | |
| def extract_text_from_docx(filepath): | |
| doc = Document(filepath) | |
| return "\n".join([para.text for para in doc.paragraphs]) | |
| # ✅ Enhanced offline MCQ extractor with explanation + answer detection | |
| def extract_mcqs_offline(text): | |
| lines = [line.strip() for line in text.splitlines() if line.strip()] | |
| mcqs = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| if ( | |
| len(line.split()) >= 5 and | |
| not re.match(r'^(A|B|C|D)[).]', line) and | |
| any(keyword in line.lower() for keyword in ['which', 'what', 'identify', 'following', 'correct', 'true', 'false', 'not']) | |
| ): | |
| question = line | |
| options = [] | |
| explanation = "" | |
| correct_answer = "" | |
| # Collect options | |
| j = i + 1 | |
| while j < len(lines) and len(options) < 4: | |
| opt_match = re.match(r'^(A|B|C|D)[).]?\s+(.*)', lines[j]) | |
| if opt_match: | |
| options.append((opt_match.group(1), opt_match.group(2))) | |
| j += 1 | |
| # Look for answer and explanation | |
| for k in range(j, min(j + 10, len(lines))): | |
| ans_match = re.search(r'Correct answer\s*[:\-]?\s*([A-D])', lines[k], re.IGNORECASE) | |
| if ans_match: | |
| correct_answer = ans_match.group(1).upper() | |
| # Explanation block starts | |
| if lines[k].lower().startswith("explanation") or lines[k].startswith("(Choice"): | |
| explanation_lines = [] | |
| l = k | |
| while l < len(lines) and not re.match(r'^(Question|Which|What|[A-D][).])', lines[l]): | |
| explanation_lines.append(lines[l]) | |
| l += 1 | |
| explanation = " ".join(explanation_lines) | |
| break | |
| if len(options) == 4: | |
| mcqs.append({ | |
| 'question': question, | |
| 'options': [opt[1] for opt in options], | |
| 'answer': correct_answer, | |
| 'explanation': explanation | |
| }) | |
| i = j | |
| continue | |
| i += 1 | |
| return mcqs | |
| def process_final_file(filename: str, file_bytes: bytes): | |
| uid = str(uuid.uuid4()) | |
| safe_name = secure_filename(filename) | |
| save_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{uid}_{safe_name}") | |
| with open(save_path, 'wb') as f: | |
| f.write(file_bytes) | |
| ext = filename.rsplit('.', 1)[-1].lower() | |
| try: | |
| if ext == 'pdf': | |
| text = extract_text_from_pdf(save_path) | |
| mcqs = extract_mcqs_from_file(save_path, raw_text=text) | |
| if not mcqs: | |
| mcqs = extract_mcqs_offline(text) | |
| elif ext == 'docx': | |
| text = extract_text_from_docx(save_path) | |
| mcqs = extract_mcqs_from_file(save_path, raw_text=text) | |
| if not mcqs: | |
| mcqs = extract_mcqs_offline(text) | |
| elif ext in ['xls', 'xlsx', 'csv']: | |
| mcqs = extract_mcqs_from_file(save_path) | |
| else: | |
| return jsonify({'error': 'Unsupported file type'}), 400 | |
| quiz_id = str(uuid.uuid4()) | |
| return jsonify({'quiz_id': quiz_id, 'mcqs': mcqs}) | |
| except Exception as e: | |
| return jsonify({'error': f'Extraction error: {str(e)}'}), 500 | |
| def home(): | |
| return "MCQ Extraction API is running!" | |
| def upload_single(): | |
| if request.is_json: | |
| data = request.get_json() | |
| filename = data.get('filename') | |
| file_data = data.get('file_data') | |
| if not filename or not file_data: | |
| return jsonify({'error': 'Missing filename or file_data'}), 400 | |
| try: | |
| file_bytes = base64.b64decode(file_data) | |
| except Exception as e: | |
| return jsonify({'error': f'Invalid base64 data: {e}'}), 400 | |
| return process_final_file(filename, file_bytes) | |
| if 'file' in request.files: | |
| uploaded = request.files['file'] | |
| filename = secure_filename(uploaded.filename) | |
| file_bytes = uploaded.read() | |
| return process_final_file(filename, file_bytes) | |
| return jsonify({'error': 'No file provided'}), 400 | |
| def upload_chunk(): | |
| file_id = request.form.get('file_id') | |
| chunk_index = int(request.form.get('chunk_index', 0)) | |
| total_chunks = int(request.form.get('total_chunks', 1)) | |
| original_name = request.form.get('filename') | |
| if 'chunk' not in request.files: | |
| return jsonify({'error': 'Missing file chunk'}), 400 | |
| dirpath = os.path.join(app.config['UPLOAD_FOLDER'], file_id) | |
| os.makedirs(dirpath, exist_ok=True) | |
| part = os.path.join(dirpath, f"{chunk_index}.part") | |
| request.files['chunk'].save(part) | |
| chunk_tracker.setdefault(file_id, set()).add(chunk_index) | |
| if len(chunk_tracker[file_id]) == total_chunks: | |
| assembled = bytearray() | |
| for i in range(total_chunks): | |
| with open(os.path.join(dirpath, f"{i}.part"), 'rb') as pf: | |
| assembled.extend(pf.read()) | |
| del chunk_tracker[file_id] | |
| for f in os.listdir(dirpath): | |
| os.remove(os.path.join(dirpath, f)) | |
| os.rmdir(dirpath) | |
| result = process_final_file(original_name, bytes(assembled)) | |
| if isinstance(result, tuple): | |
| return result | |
| try: | |
| result_data = result.get_json() | |
| except: | |
| return result | |
| if 'mcqs' in result_data: | |
| mcqs = result_data['mcqs'] | |
| quiz_id = result_data['quiz_id'] | |
| chunks = split_large_mcq_list(mcqs, max_per_chunk=500) | |
| quizzes = [] | |
| for i, part_mcqs in enumerate(chunks): | |
| part_id = f"{quiz_id}_part{i+1}" | |
| quiz_data_store[part_id] = part_mcqs | |
| quizzes.append({ | |
| "quiz_id": part_id, | |
| "count": len(part_mcqs), | |
| "mcqs": part_mcqs | |
| }) | |
| return jsonify({ | |
| "quizzes": quizzes, | |
| "total_mcqs": len(mcqs) | |
| }) | |
| else: | |
| return result | |
| return jsonify({'status': f'Chunk {chunk_index + 1}/{total_chunks} received'}) | |
| def submit_quiz(): | |
| data = request.get_json() | |
| quiz_id = data.get('quiz_id') | |
| answers = data.get('answers') | |
| if not quiz_id or quiz_id not in quiz_data_store: | |
| return jsonify({'error': 'Invalid quiz_id'}), 400 | |
| mcqs = quiz_data_store[quiz_id] | |
| correct = sum( | |
| 1 for i, ans in enumerate(answers) | |
| if i < len(mcqs) and ans.upper() == mcqs[i].get('answer','').upper() | |
| ) | |
| total = len(mcqs) | |
| accuracy = (correct/total)*100 if total else 0 | |
| return jsonify({ | |
| 'total': total, | |
| 'correct': correct, | |
| 'accuracy': round(accuracy,2) | |
| }) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |