Chess_Analyzer / app.py
Fu01978's picture
Update app.py
2aa6fbb verified
from flask import Flask, render_template, request, Response, jsonify, stream_with_context
import json
import queue
import threading
import uuid
from analyzer import analyze_game
app = Flask(__name__)
# In-memory job store (fine for a local tool)
_jobs: dict[str, dict] = {}
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/analyze", methods=["POST"])
def submit():
"""Accept a PGN + depth, queue the job, return a job_id."""
data = request.get_json(force=True)
pgn = data.get("pgn", "").strip()
depth = max(6, min(int(data.get("depth", 12)), 24))
if not pgn:
return jsonify({"error": "No PGN provided"}), 400
job_id = str(uuid.uuid4())
_jobs[job_id] = {"pgn": pgn, "depth": depth}
return jsonify({"job_id": job_id})
@app.route("/api/stream/<job_id>")
def stream(job_id: str):
"""SSE stream: progress updates followed by the complete result."""
job = _jobs.pop(job_id, None)
if not job:
return "Job not found", 404
pgn = job["pgn"]
depth = job["depth"]
q: queue.Queue = queue.Queue()
def run():
def cb(evt):
q.put(evt)
try:
analyze_game(pgn, depth, cb)
except Exception as exc:
q.put({"type": "error", "message": str(exc)})
finally:
q.put(None) # sentinel
threading.Thread(target=run, daemon=True).start()
def generate():
while True:
item = q.get()
if item is None:
break
yield f"data: {json.dumps(item)}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
if __name__ == "__main__":
app.run(debug=True, threaded=True, host='0.0.0.0', port=7860)