cert-study-app / app.py
Kentlo's picture
์ค‘๋ณต๋‹ต๋ณ€ ๊ฐ€๋Šฅ ๋ฒ„์ „
08348fb
# ==============================================
# app.py โ€” CBT + REVIEW(โญ) + WRONG + ํ†ตํ•ฉ ๋…ธํŠธ + OCR Parser (์ตœ์ ํ™” + ์ž๋™ DBํ™” ์™„์„ฑ๋ณธ)
# ==============================================
import os
import json
from datetime import datetime
from flask import Flask, request, jsonify, render_template
from dotenv import load_dotenv
from db import init_db, SessionLocal
from models import Question, Attempt
from ingest import ingest_questions
try:
from pdf_parser_adaptive import parse_pdf as _parse_pdf
except Exception:
_parse_pdf = None
load_dotenv()
app = Flask(__name__)
# -----------------------------
# Paths
# -----------------------------
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
UPLOAD_DIR = os.path.join(DATA_DIR, "uploads")
PARSED_DIR = os.path.join(DATA_DIR, "parsed_json")
for d in [DATA_DIR, UPLOAD_DIR, PARSED_DIR]:
os.makedirs(d, exist_ok=True)
DEFAULT_USER = "default"
# -----------------------------
# DB init (ํŒจ์น˜/๊ตฌ๋ฒ„์ „ ๋ชจ๋‘ ํ˜ธํ™˜)
# -----------------------------
try:
init_db(apply_light_migrations=True)
except TypeError:
init_db()
# -----------------------------
# Utils
# -----------------------------
def normalize_options(raw):
"""
์–ด๋–ค ํ˜•ํƒœ๋กœ ์ €์žฅ๋˜์–ด ์žˆ์–ด๋„ "ํ‘œ์‹œ์šฉ ๋ฌธ์ž์—ด ๋ฆฌ์ŠคํŠธ"๋กœ ๋ณ€ํ™˜.
- ํ‘œ์ค€: [{"key":"A","text":"..."}] -> ["A. ...", ...]
- ๋ ˆ๊ฑฐ์‹œ: {"A":"..."} -> ["A. ...", ...]
- ๋ ˆ๊ฑฐ์‹œ: ["...","..."] -> ๊ทธ๋Œ€๋กœ
"""
if not raw:
return []
# ํ‘œ์ค€: list[dict]
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
out = []
for o in raw:
k = str(o.get("key", "")).strip()
t = str(o.get("text", "")).strip()
if k and t:
out.append(f"{k}. {t}")
elif t:
out.append(t)
elif k:
out.append(k)
return out
# ๋ ˆ๊ฑฐ์‹œ: dict
if isinstance(raw, dict):
try:
return [f"{k}. {v}" for k, v in sorted(raw.items(), key=lambda kv: kv[0])]
except Exception:
return [str(raw)]
# ๋ ˆ๊ฑฐ์‹œ: list[str]
if isinstance(raw, list):
return [str(x) for x in raw]
return []
def parse_pdf_wrapper(pdf_path):
if not _parse_pdf:
raise RuntimeError("pdf_parser_adaptive ๋ชจ๋“ˆ์ด ์—†์Šต๋‹ˆ๋‹ค.")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_json = os.path.join(PARSED_DIR, f"parsed_{ts}.json")
_parse_pdf(pdf_path, out_json, use_llm=True, lang="korean", dpi=200)
return out_json
def first_question(db):
"""
์ฒซ ๋ฌธ์ œ ์„ ํƒ:
- page_no/q_no_on_page/global_no ์ปฌ๋Ÿผ์ด ์žˆ์œผ๋ฉด PDF ์ˆœ์„œ๋กœ
- ์—†์œผ๋ฉด id๋กœ
"""
q = db.query(Question)
if hasattr(Question, "page_no") and hasattr(Question, "q_no_on_page"):
return q.order_by(
Question.page_no.asc().nulls_last(),
Question.q_no_on_page.asc().nulls_last(),
Question.id.asc()
).first()
if hasattr(Question, "global_no"):
return q.order_by(
Question.global_no.asc().nulls_last(),
Question.id.asc()
).first()
return q.order_by(Question.id.asc()).first()
def api_payload(db, q: Question):
"""
๊ธฐ์กด ํ”„๋ก ํŠธ ํ˜ธํ™˜ ํ•„๋“œ ์œ ์ง€ + ์‹ ๊ทœ ํ•„๋“œ(์žˆ์œผ๋ฉด)๋งŒ ์ถ”๊ฐ€
"""
total = db.query(Question).count()
payload = {
"id": q.id,
"question": q.stem,
"options": normalize_options(q.get_options()),
"answer": q.answer, # ๋ ˆ๊ฑฐ์‹œ ์œ ์ง€
"explanation": q.explanation,
"total": total,
}
# ์‹ ๊ทœ(๋ชจ๋ธ์ด ์ง€์›ํ•  ๋•Œ๋งŒ)
if hasattr(q, "get_answer_keys"):
payload["answer_keys"] = q.get_answer_keys()
if hasattr(q, "get_answer_steps"):
payload["answer_steps"] = q.get_answer_steps()
if hasattr(q, "get_images"):
payload["images"] = q.get_images()
# ์ˆœ์„œ ๋ฉ”ํƒ€(์žˆ์„ ๋•Œ๋งŒ)
if hasattr(q, "page_no"):
payload["page_no"] = q.page_no
if hasattr(q, "q_no_on_page"):
payload["q_no_on_page"] = q.q_no_on_page
if hasattr(q, "global_no"):
payload["global_no"] = q.global_no
return payload
def is_correct(q: Question, chosen):
"""
์ตœ์†Œ ๋ณ€๊ฒฝ์œผ๋กœ ์ •๋‹ต ํŒ์ • ๊ฐœ์„ :
- models.py์— get_answer_keys/get_answer_steps ์žˆ์œผ๋ฉด ๊ทธ๊ฑธ ์šฐ์„  ์‚ฌ์šฉ
- ์—†์œผ๋ฉด ๋ ˆ๊ฑฐ์‹œ answer(string)๋กœ ๋น„๊ต
"""
# chosen ํ‘œ์ค€ํ™” (string/list ๋ชจ๋‘ ํ—ˆ์šฉ)
if isinstance(chosen, list):
chosen_list = [str(x).strip().upper() for x in chosen if str(x).strip()]
else:
s = str(chosen).strip()
if "," in s:
chosen_list = [x.strip().upper() for x in s.split(",") if x.strip()]
else:
chosen_list = [s.upper()] if s else []
# Steps ์šฐ์„ 
if hasattr(q, "get_answer_steps"):
ans_steps = [str(x).strip().upper() for x in (q.get_answer_steps() or [])]
if ans_steps:
return chosen_list == ans_steps # ์ˆœ์„œ/์ค‘๋ณต๊นŒ์ง€ ๋™์ผ
# ๋ณต์ˆ˜์ •๋‹ต(์ค‘๋ณต ํฌํ•จ) ์šฐ์„ 
if hasattr(q, "get_answer_keys"):
ans_keys = [str(x).strip().upper() for x in (q.get_answer_keys() or [])]
if ans_keys:
from collections import Counter
return Counter(chosen_list) == Counter(ans_keys)
# ๋ ˆ๊ฑฐ์‹œ ๋‹จ์ผ ๋น„๊ต
return str(chosen).strip().upper() == str(q.answer or "").strip().upper()
def auto_ingest_json_if_empty(json_path="data/questions.json", source_name="az104_dump"):
"""
โœ… ์„œ๋ฒ„ ์‹œ์ž‘ ์‹œ ์ž๋™ DBํ™”:
- DB์— ๋ฌธ์ œ๊ฐ€ 0๊ฐœ๋ฉด json_path๋ฅผ ์ž๋™ ingest
- DB ์‚ญ์ œ ํ›„ ์žฌ์‹คํ–‰ํ•ด๋„ ์ž๋™์œผ๋กœ ๋‹ค์‹œ ์Œ“์ž„
"""
# JSON ์กด์žฌ ํ™•์ธ
abs_json = json_path if os.path.isabs(json_path) else os.path.join(BASE_DIR, json_path)
if not os.path.exists(abs_json):
print(f"[WARN] JSON not found: {abs_json}")
return
db = SessionLocal()
try:
n = db.query(Question).count()
finally:
db.close()
if n > 0:
print(f"[INFO] DB already has {n} questions. skip auto ingest.")
return
print(f"[INFO] AUTO INGEST start: {abs_json}")
inserted = ingest_questions(abs_json, source_name=source_name)
print(f"[INFO] AUTO INGEST done: inserted={inserted}")
# โœ… ์—ฌ๊ธฐ์„œ ์ž๋™ ingest ์‹คํ–‰ (์›ํ•˜๋Š” ๊ฒฝ๋กœ: data/questions.json)
auto_ingest_json_if_empty("data/questions.json", "az104_dump")
# -----------------------------
# Pages
# -----------------------------
@app.route("/")
def home():
return render_template("index.html")
@app.route("/quiz")
def quiz():
return render_template("quiz.html")
@app.route("/review")
def review():
return render_template("review.html")
@app.route("/wrong")
def wrong():
return render_template("wrong.html")
@app.route("/healthz")
def healthz():
return jsonify({"ok": True})
# -----------------------------
# Admin: upload pdf -> parse -> ingest
# -----------------------------
@app.route("/admin/upload", methods=["GET", "POST"])
def upload_pdf():
if request.method == "GET":
return render_template("upload.html")
f = request.files.get("file")
if not f:
return jsonify({"error": "file ๋ˆ„๋ฝ"}), 400
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = os.path.join(UPLOAD_DIR, f"{ts}_{f.filename}")
f.save(save_path)
try:
out_json = parse_pdf_wrapper(save_path)
inserted = ingest_questions(out_json, source_name=f.filename)
return jsonify({"ok": True, "inserted": inserted})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
# -----------------------------
# API: question
# - โœ… id ์—†๊ฑฐ๋‚˜/์—†๋Š” id๋ฉด ์ฒซ ๋ฌธ์ œ fallback (DB ์‚ญ์ œ ํ›„ 404 ๋ฐฉ์ง€)
# -----------------------------
@app.route("/api/question", methods=["GET"])
def api_question():
qid = request.args.get("id", type=int)
db = SessionLocal()
try:
q = None
if qid:
q = db.query(Question).filter(Question.id == qid).first()
if not q:
q = first_question(db)
if not q:
return jsonify({"error": "๋ฌธ์ œ ์—†์Œ"}), 404
return jsonify(api_payload(db, q))
finally:
db.close()
# -----------------------------
# API: answer
# - โœ… chosen list/string ๋‘˜ ๋‹ค ํ—ˆ์šฉ + (๊ฐ€๋Šฅํ•˜๋ฉด) ๋ณต์ˆ˜/์ˆœ์„œ ์ •๋‹ต ํŒ์ •
# -----------------------------
@app.route("/api/answer", methods=["POST"])
def api_answer():
data = request.get_json(force=True) or {}
qid = data.get("question_id")
chosen = data.get("chosen")
user_id = str(data.get("user_id", DEFAULT_USER))
if not qid or chosen is None:
return jsonify({"error": "question_id ๋˜๋Š” chosen ๋ˆ„๋ฝ"}), 400
db = SessionLocal()
try:
q = db.query(Question).filter(Question.id == int(qid)).first()
if not q:
return jsonify({"error": "ํ•ด๋‹น ๋ฌธ์ œ ์—†์Œ"}), 404
correct = is_correct(q, chosen)
chosen_store = json.dumps(chosen, ensure_ascii=False) if isinstance(chosen, list) else str(chosen)
db.add(Attempt(
user_id=user_id,
question_id=q.id,
chosen=chosen_store,
correct=bool(correct),
note_type="wrong" if not correct else None
))
db.commit()
return jsonify({"correct": bool(correct), "answer": q.answer, "explanation": q.explanation})
finally:
db.close()
# -----------------------------
# API: next
# - โœ… ์ตœ์†Œ ์ˆ˜์ •: current_id ์—†์œผ๋ฉด ์ฒซ ๋ฌธ์ œ
# - (PDF ์ˆœ์„œ next๊นŒ์ง€ ์™„์ „ ๊ตฌํ˜„์€ ์ฝ”๋“œ๊ฐ€ ๊ธธ์–ด์ ธ์„œ, ์—ฌ๊ธฐ์„  ๋ ˆ๊ฑฐ์‹œ id-next ์œ ์ง€)
# -----------------------------
@app.route("/api/next", methods=["GET"])
def api_next():
current_id = request.args.get("current_id", type=int, default=None)
db = SessionLocal()
try:
if not current_id:
q = first_question(db)
if not q:
return jsonify({"end": True})
return jsonify(api_payload(db, q))
nxt = (
db.query(Question)
.filter(Question.id > current_id)
.order_by(Question.id.asc())
.first()
)
if not nxt:
return jsonify({"end": True})
return jsonify(api_payload(db, nxt))
finally:
db.close()
# -----------------------------
# Review add/remove
# -----------------------------
@app.route("/api/review_add", methods=["POST"])
def review_add():
data = request.get_json(force=True) or {}
qid = data.get("question_id")
user_id = str(data.get("user_id", DEFAULT_USER))
db = SessionLocal()
try:
db.add(Attempt(user_id=user_id, question_id=int(qid), correct=False, note_type="review"))
db.commit()
return jsonify({"message": "โญ ๋ณต์Šต ๋ชฉ๋ก์— ์ถ”๊ฐ€๋จ"})
finally:
db.close()
@app.route("/api/review_remove", methods=["POST"])
def review_remove():
data = request.get_json(force=True) or {}
qid = data.get("question_id")
user_id = str(data.get("user_id", DEFAULT_USER))
db = SessionLocal()
try:
db.query(Attempt).filter(
Attempt.user_id == user_id,
Attempt.question_id == int(qid),
Attempt.note_type == "review"
).delete()
db.commit()
return jsonify({"message": "๐Ÿ—‘๏ธ ๋ณต์Šต์—์„œ ์ œ๊ฑฐ๋จ"})
finally:
db.close()
# โœ… ์˜ค๋‹ต + ๋ณต์Šต ํ†ตํ•ฉ ์กฐํšŒ (๊ธฐ์กด ๊ทธ๋Œ€๋กœ ์œ ์ง€)
@app.route("/api/wrong_review", methods=["GET"])
def wrong_review():
user_id = request.args.get("user_id", DEFAULT_USER)
db = SessionLocal()
try:
rows = (
db.query(Attempt, Question)
.join(Question, Attempt.question_id == Question.id)
.filter(Attempt.user_id == user_id, Attempt.note_type.in_(["wrong", "review"]))
.order_by(Attempt.id.desc())
.all()
)
seen = set()
items = []
for att, q in rows:
if q.id in seen:
continue
seen.add(q.id)
items.append({
"question_id": q.id,
"stem": q.stem,
"options": q.get_options(),
"answer": q.answer,
"explanation": q.explanation,
"chosen": att.chosen
})
return jsonify({"count": len(items), "items": items})
finally:
db.close()
# โœ… ์˜ค๋‹ต์—์„œ ์ œ๊ฑฐ (๊ธฐ์กด ๊ทธ๋Œ€๋กœ ์œ ์ง€)
@app.route("/api/wrong_remove", methods=["POST"])
def wrong_remove():
data = request.get_json(force=True) or {}
qid = data.get("question_id")
user_id = str(data.get("user_id", DEFAULT_USER))
db = SessionLocal()
try:
db.query(Attempt).filter(
Attempt.user_id == user_id,
Attempt.question_id == int(qid),
Attempt.note_type == "wrong"
).delete()
db.commit()
return jsonify({"message": "๐Ÿ—‘๏ธ ์˜ค๋‹ต์—์„œ ์ œ๊ฑฐ๋จ"})
finally:
db.close()
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
print(f"[INFO] ์„œ๋ฒ„ ์‹œ์ž‘: 0.0.0.0:{port} (HF/๋กœ์ปฌ ๊ณตํ†ต)")
app.run(host="0.0.0.0", port=port, debug=False)