Spaces:
Sleeping
Sleeping
| # ============================================== | |
| # app.py โ CBT + REVIEW(โญ) + WRONG + ํตํฉ ๋ ธํธ + OCR Parser (์ต์ ํ + ์๋ DBํ ์์ฑ๋ณธ) | |
| # ============================================== | |
| import os | |
| import json | |
| from datetime import datetime | |
| from flask import Flask, request, jsonify, render_template | |
| from dotenv import load_dotenv | |
| from db import init_db, SessionLocal | |
| from models import Question, Attempt | |
| from ingest import ingest_questions | |
| try: | |
| from pdf_parser_adaptive import parse_pdf as _parse_pdf | |
| except Exception: | |
| _parse_pdf = None | |
| load_dotenv() | |
| app = Flask(__name__) | |
| # ----------------------------- | |
| # Paths | |
| # ----------------------------- | |
| BASE_DIR = os.path.abspath(os.path.dirname(__file__)) | |
| DATA_DIR = os.path.join(BASE_DIR, "data") | |
| UPLOAD_DIR = os.path.join(DATA_DIR, "uploads") | |
| PARSED_DIR = os.path.join(DATA_DIR, "parsed_json") | |
| for d in [DATA_DIR, UPLOAD_DIR, PARSED_DIR]: | |
| os.makedirs(d, exist_ok=True) | |
| DEFAULT_USER = "default" | |
| # ----------------------------- | |
| # DB init (ํจ์น/๊ตฌ๋ฒ์ ๋ชจ๋ ํธํ) | |
| # ----------------------------- | |
| try: | |
| init_db(apply_light_migrations=True) | |
| except TypeError: | |
| init_db() | |
| # ----------------------------- | |
| # Utils | |
| # ----------------------------- | |
| def normalize_options(raw): | |
| """ | |
| ์ด๋ค ํํ๋ก ์ ์ฅ๋์ด ์์ด๋ "ํ์์ฉ ๋ฌธ์์ด ๋ฆฌ์คํธ"๋ก ๋ณํ. | |
| - ํ์ค: [{"key":"A","text":"..."}] -> ["A. ...", ...] | |
| - ๋ ๊ฑฐ์: {"A":"..."} -> ["A. ...", ...] | |
| - ๋ ๊ฑฐ์: ["...","..."] -> ๊ทธ๋๋ก | |
| """ | |
| if not raw: | |
| return [] | |
| # ํ์ค: list[dict] | |
| if isinstance(raw, list) and raw and isinstance(raw[0], dict): | |
| out = [] | |
| for o in raw: | |
| k = str(o.get("key", "")).strip() | |
| t = str(o.get("text", "")).strip() | |
| if k and t: | |
| out.append(f"{k}. {t}") | |
| elif t: | |
| out.append(t) | |
| elif k: | |
| out.append(k) | |
| return out | |
| # ๋ ๊ฑฐ์: dict | |
| if isinstance(raw, dict): | |
| try: | |
| return [f"{k}. {v}" for k, v in sorted(raw.items(), key=lambda kv: kv[0])] | |
| except Exception: | |
| return [str(raw)] | |
| # ๋ ๊ฑฐ์: list[str] | |
| if isinstance(raw, list): | |
| return [str(x) for x in raw] | |
| return [] | |
| def parse_pdf_wrapper(pdf_path): | |
| if not _parse_pdf: | |
| raise RuntimeError("pdf_parser_adaptive ๋ชจ๋์ด ์์ต๋๋ค.") | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| out_json = os.path.join(PARSED_DIR, f"parsed_{ts}.json") | |
| _parse_pdf(pdf_path, out_json, use_llm=True, lang="korean", dpi=200) | |
| return out_json | |
| def first_question(db): | |
| """ | |
| ์ฒซ ๋ฌธ์ ์ ํ: | |
| - page_no/q_no_on_page/global_no ์ปฌ๋ผ์ด ์์ผ๋ฉด PDF ์์๋ก | |
| - ์์ผ๋ฉด id๋ก | |
| """ | |
| q = db.query(Question) | |
| if hasattr(Question, "page_no") and hasattr(Question, "q_no_on_page"): | |
| return q.order_by( | |
| Question.page_no.asc().nulls_last(), | |
| Question.q_no_on_page.asc().nulls_last(), | |
| Question.id.asc() | |
| ).first() | |
| if hasattr(Question, "global_no"): | |
| return q.order_by( | |
| Question.global_no.asc().nulls_last(), | |
| Question.id.asc() | |
| ).first() | |
| return q.order_by(Question.id.asc()).first() | |
| def api_payload(db, q: Question): | |
| """ | |
| ๊ธฐ์กด ํ๋ก ํธ ํธํ ํ๋ ์ ์ง + ์ ๊ท ํ๋(์์ผ๋ฉด)๋ง ์ถ๊ฐ | |
| """ | |
| total = db.query(Question).count() | |
| payload = { | |
| "id": q.id, | |
| "question": q.stem, | |
| "options": normalize_options(q.get_options()), | |
| "answer": q.answer, # ๋ ๊ฑฐ์ ์ ์ง | |
| "explanation": q.explanation, | |
| "total": total, | |
| } | |
| # ์ ๊ท(๋ชจ๋ธ์ด ์ง์ํ ๋๋ง) | |
| if hasattr(q, "get_answer_keys"): | |
| payload["answer_keys"] = q.get_answer_keys() | |
| if hasattr(q, "get_answer_steps"): | |
| payload["answer_steps"] = q.get_answer_steps() | |
| if hasattr(q, "get_images"): | |
| payload["images"] = q.get_images() | |
| # ์์ ๋ฉํ(์์ ๋๋ง) | |
| if hasattr(q, "page_no"): | |
| payload["page_no"] = q.page_no | |
| if hasattr(q, "q_no_on_page"): | |
| payload["q_no_on_page"] = q.q_no_on_page | |
| if hasattr(q, "global_no"): | |
| payload["global_no"] = q.global_no | |
| return payload | |
| def is_correct(q: Question, chosen): | |
| """ | |
| ์ต์ ๋ณ๊ฒฝ์ผ๋ก ์ ๋ต ํ์ ๊ฐ์ : | |
| - models.py์ get_answer_keys/get_answer_steps ์์ผ๋ฉด ๊ทธ๊ฑธ ์ฐ์ ์ฌ์ฉ | |
| - ์์ผ๋ฉด ๋ ๊ฑฐ์ answer(string)๋ก ๋น๊ต | |
| """ | |
| # chosen ํ์คํ (string/list ๋ชจ๋ ํ์ฉ) | |
| if isinstance(chosen, list): | |
| chosen_list = [str(x).strip().upper() for x in chosen if str(x).strip()] | |
| else: | |
| s = str(chosen).strip() | |
| if "," in s: | |
| chosen_list = [x.strip().upper() for x in s.split(",") if x.strip()] | |
| else: | |
| chosen_list = [s.upper()] if s else [] | |
| # Steps ์ฐ์ | |
| if hasattr(q, "get_answer_steps"): | |
| ans_steps = [str(x).strip().upper() for x in (q.get_answer_steps() or [])] | |
| if ans_steps: | |
| return chosen_list == ans_steps # ์์/์ค๋ณต๊น์ง ๋์ผ | |
| # ๋ณต์์ ๋ต(์ค๋ณต ํฌํจ) ์ฐ์ | |
| if hasattr(q, "get_answer_keys"): | |
| ans_keys = [str(x).strip().upper() for x in (q.get_answer_keys() or [])] | |
| if ans_keys: | |
| from collections import Counter | |
| return Counter(chosen_list) == Counter(ans_keys) | |
| # ๋ ๊ฑฐ์ ๋จ์ผ ๋น๊ต | |
| return str(chosen).strip().upper() == str(q.answer or "").strip().upper() | |
| def auto_ingest_json_if_empty(json_path="data/questions.json", source_name="az104_dump"): | |
| """ | |
| โ ์๋ฒ ์์ ์ ์๋ DBํ: | |
| - DB์ ๋ฌธ์ ๊ฐ 0๊ฐ๋ฉด json_path๋ฅผ ์๋ ingest | |
| - DB ์ญ์ ํ ์ฌ์คํํด๋ ์๋์ผ๋ก ๋ค์ ์์ | |
| """ | |
| # JSON ์กด์ฌ ํ์ธ | |
| abs_json = json_path if os.path.isabs(json_path) else os.path.join(BASE_DIR, json_path) | |
| if not os.path.exists(abs_json): | |
| print(f"[WARN] JSON not found: {abs_json}") | |
| return | |
| db = SessionLocal() | |
| try: | |
| n = db.query(Question).count() | |
| finally: | |
| db.close() | |
| if n > 0: | |
| print(f"[INFO] DB already has {n} questions. skip auto ingest.") | |
| return | |
| print(f"[INFO] AUTO INGEST start: {abs_json}") | |
| inserted = ingest_questions(abs_json, source_name=source_name) | |
| print(f"[INFO] AUTO INGEST done: inserted={inserted}") | |
| # โ ์ฌ๊ธฐ์ ์๋ ingest ์คํ (์ํ๋ ๊ฒฝ๋ก: data/questions.json) | |
| auto_ingest_json_if_empty("data/questions.json", "az104_dump") | |
| # ----------------------------- | |
| # Pages | |
| # ----------------------------- | |
| def home(): | |
| return render_template("index.html") | |
| def quiz(): | |
| return render_template("quiz.html") | |
| def review(): | |
| return render_template("review.html") | |
| def wrong(): | |
| return render_template("wrong.html") | |
| def healthz(): | |
| return jsonify({"ok": True}) | |
| # ----------------------------- | |
| # Admin: upload pdf -> parse -> ingest | |
| # ----------------------------- | |
| def upload_pdf(): | |
| if request.method == "GET": | |
| return render_template("upload.html") | |
| f = request.files.get("file") | |
| if not f: | |
| return jsonify({"error": "file ๋๋ฝ"}), 400 | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_path = os.path.join(UPLOAD_DIR, f"{ts}_{f.filename}") | |
| f.save(save_path) | |
| try: | |
| out_json = parse_pdf_wrapper(save_path) | |
| inserted = ingest_questions(out_json, source_name=f.filename) | |
| return jsonify({"ok": True, "inserted": inserted}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| # ----------------------------- | |
| # API: question | |
| # - โ id ์๊ฑฐ๋/์๋ id๋ฉด ์ฒซ ๋ฌธ์ fallback (DB ์ญ์ ํ 404 ๋ฐฉ์ง) | |
| # ----------------------------- | |
| def api_question(): | |
| qid = request.args.get("id", type=int) | |
| db = SessionLocal() | |
| try: | |
| q = None | |
| if qid: | |
| q = db.query(Question).filter(Question.id == qid).first() | |
| if not q: | |
| q = first_question(db) | |
| if not q: | |
| return jsonify({"error": "๋ฌธ์ ์์"}), 404 | |
| return jsonify(api_payload(db, q)) | |
| finally: | |
| db.close() | |
| # ----------------------------- | |
| # API: answer | |
| # - โ chosen list/string ๋ ๋ค ํ์ฉ + (๊ฐ๋ฅํ๋ฉด) ๋ณต์/์์ ์ ๋ต ํ์ | |
| # ----------------------------- | |
| def api_answer(): | |
| data = request.get_json(force=True) or {} | |
| qid = data.get("question_id") | |
| chosen = data.get("chosen") | |
| user_id = str(data.get("user_id", DEFAULT_USER)) | |
| if not qid or chosen is None: | |
| return jsonify({"error": "question_id ๋๋ chosen ๋๋ฝ"}), 400 | |
| db = SessionLocal() | |
| try: | |
| q = db.query(Question).filter(Question.id == int(qid)).first() | |
| if not q: | |
| return jsonify({"error": "ํด๋น ๋ฌธ์ ์์"}), 404 | |
| correct = is_correct(q, chosen) | |
| chosen_store = json.dumps(chosen, ensure_ascii=False) if isinstance(chosen, list) else str(chosen) | |
| db.add(Attempt( | |
| user_id=user_id, | |
| question_id=q.id, | |
| chosen=chosen_store, | |
| correct=bool(correct), | |
| note_type="wrong" if not correct else None | |
| )) | |
| db.commit() | |
| return jsonify({"correct": bool(correct), "answer": q.answer, "explanation": q.explanation}) | |
| finally: | |
| db.close() | |
| # ----------------------------- | |
| # API: next | |
| # - โ ์ต์ ์์ : current_id ์์ผ๋ฉด ์ฒซ ๋ฌธ์ | |
| # - (PDF ์์ next๊น์ง ์์ ๊ตฌํ์ ์ฝ๋๊ฐ ๊ธธ์ด์ ธ์, ์ฌ๊ธฐ์ ๋ ๊ฑฐ์ id-next ์ ์ง) | |
| # ----------------------------- | |
| def api_next(): | |
| current_id = request.args.get("current_id", type=int, default=None) | |
| db = SessionLocal() | |
| try: | |
| if not current_id: | |
| q = first_question(db) | |
| if not q: | |
| return jsonify({"end": True}) | |
| return jsonify(api_payload(db, q)) | |
| nxt = ( | |
| db.query(Question) | |
| .filter(Question.id > current_id) | |
| .order_by(Question.id.asc()) | |
| .first() | |
| ) | |
| if not nxt: | |
| return jsonify({"end": True}) | |
| return jsonify(api_payload(db, nxt)) | |
| finally: | |
| db.close() | |
| # ----------------------------- | |
| # Review add/remove | |
| # ----------------------------- | |
| def review_add(): | |
| data = request.get_json(force=True) or {} | |
| qid = data.get("question_id") | |
| user_id = str(data.get("user_id", DEFAULT_USER)) | |
| db = SessionLocal() | |
| try: | |
| db.add(Attempt(user_id=user_id, question_id=int(qid), correct=False, note_type="review")) | |
| db.commit() | |
| return jsonify({"message": "โญ ๋ณต์ต ๋ชฉ๋ก์ ์ถ๊ฐ๋จ"}) | |
| finally: | |
| db.close() | |
| def review_remove(): | |
| data = request.get_json(force=True) or {} | |
| qid = data.get("question_id") | |
| user_id = str(data.get("user_id", DEFAULT_USER)) | |
| db = SessionLocal() | |
| try: | |
| db.query(Attempt).filter( | |
| Attempt.user_id == user_id, | |
| Attempt.question_id == int(qid), | |
| Attempt.note_type == "review" | |
| ).delete() | |
| db.commit() | |
| return jsonify({"message": "๐๏ธ ๋ณต์ต์์ ์ ๊ฑฐ๋จ"}) | |
| finally: | |
| db.close() | |
| # โ ์ค๋ต + ๋ณต์ต ํตํฉ ์กฐํ (๊ธฐ์กด ๊ทธ๋๋ก ์ ์ง) | |
| def wrong_review(): | |
| user_id = request.args.get("user_id", DEFAULT_USER) | |
| db = SessionLocal() | |
| try: | |
| rows = ( | |
| db.query(Attempt, Question) | |
| .join(Question, Attempt.question_id == Question.id) | |
| .filter(Attempt.user_id == user_id, Attempt.note_type.in_(["wrong", "review"])) | |
| .order_by(Attempt.id.desc()) | |
| .all() | |
| ) | |
| seen = set() | |
| items = [] | |
| for att, q in rows: | |
| if q.id in seen: | |
| continue | |
| seen.add(q.id) | |
| items.append({ | |
| "question_id": q.id, | |
| "stem": q.stem, | |
| "options": q.get_options(), | |
| "answer": q.answer, | |
| "explanation": q.explanation, | |
| "chosen": att.chosen | |
| }) | |
| return jsonify({"count": len(items), "items": items}) | |
| finally: | |
| db.close() | |
| # โ ์ค๋ต์์ ์ ๊ฑฐ (๊ธฐ์กด ๊ทธ๋๋ก ์ ์ง) | |
| def wrong_remove(): | |
| data = request.get_json(force=True) or {} | |
| qid = data.get("question_id") | |
| user_id = str(data.get("user_id", DEFAULT_USER)) | |
| db = SessionLocal() | |
| try: | |
| db.query(Attempt).filter( | |
| Attempt.user_id == user_id, | |
| Attempt.question_id == int(qid), | |
| Attempt.note_type == "wrong" | |
| ).delete() | |
| db.commit() | |
| return jsonify({"message": "๐๏ธ ์ค๋ต์์ ์ ๊ฑฐ๋จ"}) | |
| finally: | |
| db.close() | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", 7860)) | |
| print(f"[INFO] ์๋ฒ ์์: 0.0.0.0:{port} (HF/๋ก์ปฌ ๊ณตํต)") | |
| app.run(host="0.0.0.0", port=port, debug=False) | |