mcq2vid / app.py
roshcheeku's picture
Update app.py
d60fee2 verified
from flask import Flask, request, jsonify
import os
import uuid
import base64
import pdfplumber
from docx import Document
from werkzeug.utils import secure_filename
from model_utils import extract_mcqs_from_file, split_large_mcq_list
import re
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = '/tmp/uploads'
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
quiz_data_store = {}
chunk_tracker = {}
def extract_text_from_pdf(filepath):
text = []
with pdfplumber.open(filepath) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text)
return "\n".join(text)
def extract_text_from_docx(filepath):
doc = Document(filepath)
return "\n".join([para.text for para in doc.paragraphs])
# ✅ Enhanced offline MCQ extractor with explanation + answer detection
def extract_mcqs_offline(text):
lines = [line.strip() for line in text.splitlines() if line.strip()]
mcqs = []
i = 0
while i < len(lines):
line = lines[i]
if (
len(line.split()) >= 5 and
not re.match(r'^(A|B|C|D)[).]', line) and
any(keyword in line.lower() for keyword in ['which', 'what', 'identify', 'following', 'correct', 'true', 'false', 'not'])
):
question = line
options = []
explanation = ""
correct_answer = ""
# Collect options
j = i + 1
while j < len(lines) and len(options) < 4:
opt_match = re.match(r'^(A|B|C|D)[).]?\s+(.*)', lines[j])
if opt_match:
options.append((opt_match.group(1), opt_match.group(2)))
j += 1
# Look for answer and explanation
for k in range(j, min(j + 10, len(lines))):
ans_match = re.search(r'Correct answer\s*[:\-]?\s*([A-D])', lines[k], re.IGNORECASE)
if ans_match:
correct_answer = ans_match.group(1).upper()
# Explanation block starts
if lines[k].lower().startswith("explanation") or lines[k].startswith("(Choice"):
explanation_lines = []
l = k
while l < len(lines) and not re.match(r'^(Question|Which|What|[A-D][).])', lines[l]):
explanation_lines.append(lines[l])
l += 1
explanation = " ".join(explanation_lines)
break
if len(options) == 4:
mcqs.append({
'question': question,
'options': [opt[1] for opt in options],
'answer': correct_answer,
'explanation': explanation
})
i = j
continue
i += 1
return mcqs
def process_final_file(filename: str, file_bytes: bytes):
uid = str(uuid.uuid4())
safe_name = secure_filename(filename)
save_path = os.path.join(app.config['UPLOAD_FOLDER'], f"{uid}_{safe_name}")
with open(save_path, 'wb') as f:
f.write(file_bytes)
ext = filename.rsplit('.', 1)[-1].lower()
try:
if ext == 'pdf':
text = extract_text_from_pdf(save_path)
mcqs = extract_mcqs_from_file(save_path, raw_text=text)
if not mcqs:
mcqs = extract_mcqs_offline(text)
elif ext == 'docx':
text = extract_text_from_docx(save_path)
mcqs = extract_mcqs_from_file(save_path, raw_text=text)
if not mcqs:
mcqs = extract_mcqs_offline(text)
elif ext in ['xls', 'xlsx', 'csv']:
mcqs = extract_mcqs_from_file(save_path)
else:
return jsonify({'error': 'Unsupported file type'}), 400
quiz_id = str(uuid.uuid4())
return jsonify({'quiz_id': quiz_id, 'mcqs': mcqs})
except Exception as e:
return jsonify({'error': f'Extraction error: {str(e)}'}), 500
@app.route('/')
def home():
return "MCQ Extraction API is running!"
@app.route('/upload', methods=['POST'])
def upload_single():
if request.is_json:
data = request.get_json()
filename = data.get('filename')
file_data = data.get('file_data')
if not filename or not file_data:
return jsonify({'error': 'Missing filename or file_data'}), 400
try:
file_bytes = base64.b64decode(file_data)
except Exception as e:
return jsonify({'error': f'Invalid base64 data: {e}'}), 400
return process_final_file(filename, file_bytes)
if 'file' in request.files:
uploaded = request.files['file']
filename = secure_filename(uploaded.filename)
file_bytes = uploaded.read()
return process_final_file(filename, file_bytes)
return jsonify({'error': 'No file provided'}), 400
@app.route('/upload-chunk', methods=['POST'])
def upload_chunk():
file_id = request.form.get('file_id')
chunk_index = int(request.form.get('chunk_index', 0))
total_chunks = int(request.form.get('total_chunks', 1))
original_name = request.form.get('filename')
if 'chunk' not in request.files:
return jsonify({'error': 'Missing file chunk'}), 400
dirpath = os.path.join(app.config['UPLOAD_FOLDER'], file_id)
os.makedirs(dirpath, exist_ok=True)
part = os.path.join(dirpath, f"{chunk_index}.part")
request.files['chunk'].save(part)
chunk_tracker.setdefault(file_id, set()).add(chunk_index)
if len(chunk_tracker[file_id]) == total_chunks:
assembled = bytearray()
for i in range(total_chunks):
with open(os.path.join(dirpath, f"{i}.part"), 'rb') as pf:
assembled.extend(pf.read())
del chunk_tracker[file_id]
for f in os.listdir(dirpath):
os.remove(os.path.join(dirpath, f))
os.rmdir(dirpath)
result = process_final_file(original_name, bytes(assembled))
if isinstance(result, tuple):
return result
try:
result_data = result.get_json()
except:
return result
if 'mcqs' in result_data:
mcqs = result_data['mcqs']
quiz_id = result_data['quiz_id']
chunks = split_large_mcq_list(mcqs, max_per_chunk=500)
quizzes = []
for i, part_mcqs in enumerate(chunks):
part_id = f"{quiz_id}_part{i+1}"
quiz_data_store[part_id] = part_mcqs
quizzes.append({
"quiz_id": part_id,
"count": len(part_mcqs),
"mcqs": part_mcqs
})
return jsonify({
"quizzes": quizzes,
"total_mcqs": len(mcqs)
})
else:
return result
return jsonify({'status': f'Chunk {chunk_index + 1}/{total_chunks} received'})
@app.route('/submit', methods=['POST'])
def submit_quiz():
data = request.get_json()
quiz_id = data.get('quiz_id')
answers = data.get('answers')
if not quiz_id or quiz_id not in quiz_data_store:
return jsonify({'error': 'Invalid quiz_id'}), 400
mcqs = quiz_data_store[quiz_id]
correct = sum(
1 for i, ans in enumerate(answers)
if i < len(mcqs) and ans.upper() == mcqs[i].get('answer','').upper()
)
total = len(mcqs)
accuracy = (correct/total)*100 if total else 0
return jsonify({
'total': total,
'correct': correct,
'accuracy': round(accuracy,2)
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)