import os import secrets import sys import uuid from html import escape from pathlib import Path from flask import Flask, Response, jsonify, render_template, request, send_file, session from newest_model import PREFERRED_PRODUCTION_CHAT_MODELS, select_groq_chat_models from src.config import ( APP_PASSWORD, APP_USERNAME, AVAILABLE_MODELS, DATA_DIR, DEFAULT_OUTPUT_SHEET_NAME, SPACE_ID, ) from src.process_runner import stop_process, stream_process from src.utils import reference_sync_status, save_manual_references_to_hub from src.workbook_io import read_workbook_sheets, resolve_allowed_path, save_uploaded_excel APP_ROOT = Path(__file__).resolve().parent UPLOAD_DIR = APP_ROOT / DATA_DIR / "uploads" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # Download/apply routes only accept files inside data/ to avoid arbitrary file reads. ALLOWED_FILE_ROOTS = [APP_ROOT / DATA_DIR] app = Flask( __name__, template_folder=str(APP_ROOT / "ui" / "templates"), static_folder=str(APP_ROOT / "ui" / "static"), ) app.config["MAX_CONTENT_LENGTH"] = 100 * 1024 * 1024 app.secret_key = os.getenv("APP_SECRET_KEY", "local-dev-secret") DEFAULT_STATE = { "clean_path": "", "clean_filename": "", "clean_sheets": [], "clean_selected_sheet": "", "output_sheet": DEFAULT_OUTPUT_SHEET_NAME, "models": "", "apply_workbook_path": "", "apply_workbook_filename": "", "apply_sheets": [], "apply_selected_sheet": "", "apply_blueprint_path": "", "apply_blueprint_filename": "", } def fresh_state(): """Create a clean UI state for one browser session.""" return { key: list(value) if isinstance(value, list) else value for key, value in DEFAULT_STATE.items() } def get_state(): """Return the current browser's state, creating it on first visit.""" if "ui_state" not in session: session["ui_state"] = fresh_state() return session["ui_state"] def mark_state_changed(): """Tell Flask to re-sign the session cookie after nested state edits.""" session.modified = True def auth_required_response() -> Response: """Ask the browser for basic-auth credentials.""" return Response( "Authentication required", 401, {"WWW-Authenticate": 'Basic realm="MasterMap Cleaner"'}, ) def missing_auth_config_response() -> Response: """Fail closed on Hugging Face if password protection was not configured.""" return Response( "App login credentials are not configured.", 503, ) @app.before_request def require_basic_auth(): """Protect every app route with a shared password when configured.""" if not APP_PASSWORD or not APP_USERNAME: if SPACE_ID: return missing_auth_config_response() return None auth = request.authorization if not auth: return auth_required_response() valid_username = secrets.compare_digest(auth.username or "", APP_USERNAME) valid_password = secrets.compare_digest(auth.password or "", APP_PASSWORD) if valid_username and valid_password: return None return auth_required_response() @app.after_request def prevent_browser_cache(response): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response def default_models() -> str: """Prefer configured Groq models, falling back to the curated production list.""" preferred_model_ids = {model.lower() for model in PREFERRED_PRODUCTION_CHAT_MODELS} env_preferred_models = [ model for model in AVAILABLE_MODELS if model.lower() in preferred_model_ids ] return ",".join(env_preferred_models or PREFERRED_PRODUCTION_CHAT_MODELS) def read_readme_guide() -> str: """Render README.md content for the in-app collapsible guide.""" readme_path = APP_ROOT / "README.md" if not readme_path.is_file(): return "
User guide is not available.
" text = readme_path.read_text(encoding="utf-8") if text.startswith("---"): parts = text.split("---", 2) if len(parts) == 3: text = parts[2].lstrip() try: import markdown except ImportError: return f"{escape(text)}"
return markdown.markdown(text, extensions=["extra", "sane_lists"])
def render_page(message: str = "", error: str = ""):
"""Render the app with state scoped to the current browser session."""
state = get_state()
if state["clean_sheets"]:
state["clean_selected_sheet"] = pick_sheet(state["clean_sheets"], state["clean_selected_sheet"], state)
if state["apply_sheets"]:
state["apply_selected_sheet"] = pick_sheet(state["apply_sheets"], state["apply_selected_sheet"], state)
mark_state_changed()
return render_template(
"index.html",
**state,
default_output_sheet=DEFAULT_OUTPUT_SHEET_NAME,
default_models=default_models(),
readme_guide_html=read_readme_guide(),
can_apply=can_apply_blueprint(),
message=message,
error=error,
)
def can_apply_blueprint() -> bool:
"""The Apply button requires workbook, blueprint, and target sheet."""
state = get_state()
return bool(
state["apply_workbook_path"]
and state["apply_blueprint_path"]
and state["apply_sheets"]
and state["apply_selected_sheet"]
)
def wants_json_response() -> bool:
"""AJAX upload routes ask for JSON; normal form posts render the page."""
return "application/json" in request.headers.get("Accept", "")
def ui_state_payload(message: str = "", error: str = ""):
"""Return just enough state for the frontend to update without a reload."""
state = get_state()
return {
"message": message,
"error": error,
"apply_workbook_path": state["apply_workbook_path"],
"apply_workbook_filename": state["apply_workbook_filename"],
"apply_sheets": state["apply_sheets"],
"apply_selected_sheet": state["apply_selected_sheet"],
"apply_blueprint_path": state["apply_blueprint_path"],
"apply_blueprint_filename": state["apply_blueprint_filename"],
"can_apply": can_apply_blueprint(),
}
def pick_sheet(sheets, preferred_sheet=None, state=None):
"""Choose a stable sheet selection when workbooks are uploaded/refreshed."""
state = state or get_state()
if preferred_sheet and preferred_sheet in sheets:
return preferred_sheet
if state["output_sheet"] in sheets:
return state["output_sheet"]
return sheets[0] if sheets else ""
def update_ui_state_from_form(form):
"""Preserve current UI selections while a file upload request is processed."""
state = get_state()
state["clean_selected_sheet"] = form.get("clean_selected_sheet") or state["clean_selected_sheet"]
state["output_sheet"] = form.get("output_sheet") or state["output_sheet"] or DEFAULT_OUTPUT_SHEET_NAME
state["models"] = form.get("models") or state["models"]
state["apply_selected_sheet"] = form.get("apply_selected_sheet") or state["apply_selected_sheet"]
mark_state_changed()
@app.route("/")
def index():
return render_page()
@app.route("/prepare-clean", methods=["POST"])
def prepare_clean():
try:
update_ui_state_from_form(request.form)
state = get_state()
filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR)
sheets = read_workbook_sheets(path)
except Exception as exc:
return render_page(error=str(exc))
# The uploaded workbook becomes both the cleaning input and default apply target.
state["clean_path"] = str(path)
state["clean_filename"] = filename
state["clean_sheets"] = sheets
state["clean_selected_sheet"] = pick_sheet(sheets, request.form.get("clean_selected_sheet"), state)
state["apply_workbook_path"] = str(path)
state["apply_workbook_filename"] = filename
state["apply_sheets"] = sheets
state["apply_selected_sheet"] = pick_sheet(sheets, request.form.get("apply_selected_sheet") or state["output_sheet"], state)
mark_state_changed()
return render_page(message=f"Loaded {filename}.")
@app.route("/remove-clean", methods=["POST"])
def remove_clean():
"""Clear the current session's cleaning workbook without touching other sessions."""
update_ui_state_from_form(request.form)
state = get_state()
old_path = state["clean_path"]
state["clean_path"] = ""
state["clean_filename"] = ""
state["clean_sheets"] = []
state["clean_selected_sheet"] = ""
if state["apply_workbook_path"] == old_path:
state["apply_workbook_path"] = ""
state["apply_workbook_filename"] = ""
state["apply_sheets"] = []
state["apply_selected_sheet"] = ""
mark_state_changed()
return render_page(message="File removed.")
@app.route("/prepare-apply-workbook", methods=["POST"])
def prepare_apply_workbook():
"""AJAX upload for the workbook that will receive Blueprint corrections."""
try:
update_ui_state_from_form(request.form)
state = get_state()
filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR)
sheets = read_workbook_sheets(path)
except Exception as exc:
if wants_json_response():
return jsonify(ui_state_payload(error=str(exc))), 400
return render_page(error=str(exc))
state["apply_workbook_path"] = str(path)
state["apply_workbook_filename"] = filename
state["apply_sheets"] = sheets
state["apply_selected_sheet"] = pick_sheet(sheets, request.form.get("apply_selected_sheet"), state)
mark_state_changed()
if wants_json_response():
return jsonify(ui_state_payload(message=f"Loaded apply workbook {filename}."))
return render_page(message=f"Loaded apply workbook {filename}.")
@app.route("/prepare-apply-blueprint", methods=["POST"])
def prepare_apply_blueprint():
"""AJAX upload for an externally reviewed Blueprint workbook."""
try:
update_ui_state_from_form(request.form)
state = get_state()
if state["apply_workbook_path"] and Path(state["apply_workbook_path"]).is_file():
state["apply_sheets"] = read_workbook_sheets(Path(state["apply_workbook_path"]))
state["apply_selected_sheet"] = pick_sheet(state["apply_sheets"], request.form.get("apply_selected_sheet"), state)
filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR)
except Exception as exc:
if wants_json_response():
return jsonify(ui_state_payload(error=str(exc))), 400
return render_page(error=str(exc))
state["apply_blueprint_path"] = str(path)
state["apply_blueprint_filename"] = filename
mark_state_changed()
if wants_json_response():
return jsonify(ui_state_payload(message=f"Loaded blueprint {filename}."))
return render_page(message=f"Loaded blueprint {filename}.")
@app.route("/models")
def models_endpoint():
"""Fetch the currently usable Groq fallback model list for the UI."""
try:
models = select_groq_chat_models(limit=len(PREFERRED_PRODUCTION_CHAT_MODELS))
except Exception as exc:
return jsonify({"error": str(exc)}), 500
return jsonify({"models": models})
@app.route("/references/status")
def references_status():
"""Tell the UI whether Hugging Face reference sync can be used."""
return jsonify(reference_sync_status())
@app.route("/references/save", methods=["POST"])
def save_references():
"""Commit manual references back to the Space repo when HF sync is configured."""
try:
result = save_manual_references_to_hub(APP_ROOT)
except Exception as exc:
return jsonify({"error": str(exc)}), 400
return jsonify({"message": "Manual references saved to Hugging Face.", **result})
@app.route("/sheets")
def sheets_endpoint():
"""Return workbook sheet names for dynamic apply-sheet selection."""
try:
workbook_path = resolve_allowed_path(request.args.get("path", ""), APP_ROOT, ALLOWED_FILE_ROOTS)
if not workbook_path.is_file():
return jsonify({"error": "Workbook is not available."}), 404
return jsonify({"sheets": read_workbook_sheets(workbook_path)})
except Exception as exc:
return jsonify({"error": str(exc)}), 500
@app.route("/download-blueprint")
def download_blueprint():
"""Download either the session Blueprint or an explicitly requested run file."""
state = get_state()
requested_path = request.args.get("path") or state["apply_blueprint_path"]
if not requested_path:
return jsonify({"error": "Blueprint has not been generated yet."}), 404
blueprint_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS)
if not blueprint_path.exists():
return jsonify({"error": "Blueprint has not been generated yet."}), 404
return send_file(blueprint_path, as_attachment=True, download_name="Blueprint.xlsx")
@app.route("/download-cleaned-workbook")
def download_cleaned_workbook():
"""Download the cleaned workbook copy for this session/run."""
state = get_state()
requested_path = request.args.get("path") or state["clean_path"]
if not requested_path:
return jsonify({"error": "Cleaned workbook is not available."}), 404
workbook_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS)
if not workbook_path.is_file():
return jsonify({"error": "Cleaned workbook is not available."}), 404
download_name = request.args.get("filename") or state["clean_filename"] or workbook_path.name
return send_file(
workbook_path,
as_attachment=True,
download_name=f"cleaned_{download_name}",
)
@app.route("/download-applied-workbook")
def download_applied_workbook():
"""Download the workbook after Blueprint corrections have been applied."""
state = get_state()
requested_path = request.args.get("path") or state["apply_workbook_path"]
if not requested_path:
return jsonify({"error": "Applied workbook is not available."}), 404
workbook_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS)
if not workbook_path.is_file():
return jsonify({"error": "Applied workbook is not available."}), 404
download_name = request.args.get("filename") or state["apply_workbook_filename"] or workbook_path.name
return send_file(
workbook_path,
as_attachment=True,
download_name=f"cleaned_{download_name}",
)
@app.route("/run")
def run():
"""Start the cleaning subprocess and stream its logs as server-sent events."""
job_id = request.args.get("job_id", uuid.uuid4().hex)
input_path = request.args.get("input", "")
sheet = request.args.get("sheet", "")
output_sheet = request.args.get("output_sheet", DEFAULT_OUTPUT_SHEET_NAME)
model_list = request.args.get("models", "")
if not input_path or not sheet:
return jsonify({"error": "Input file and source sheet are required."}), 400
try:
workbook_path = resolve_allowed_path(input_path, APP_ROOT, ALLOWED_FILE_ROOTS)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
# Each run gets its own Blueprint so simultaneous users cannot overwrite it.
blueprint_path = UPLOAD_DIR / f"Blueprint_{job_id}.xlsx"
state = get_state()
state["apply_blueprint_path"] = str(blueprint_path)
state["apply_blueprint_filename"] = blueprint_path.name
mark_state_changed()
command = [
sys.executable,
"-u",
str(APP_ROOT / "main.py"),
"--input",
str(workbook_path),
"--sheet",
sheet,
"--output_sheet",
output_sheet,
"--blueprint",
str(blueprint_path),
]
if model_list:
command.extend(["--models", model_list])
return Response(stream_process(command, cwd=APP_ROOT, job_id=job_id), mimetype="text/event-stream")
@app.route("/stop", methods=["POST"])
def stop():
"""Stop a running cleaning subprocess for the given frontend job id."""
job_id = request.args.get("job_id", "")
if not stop_process(job_id):
return jsonify({"stopped": False, "message": "No active run found."}), 404
return jsonify({"stopped": True})
@app.route("/apply")
def apply_blueprint():
"""Start the Blueprint-apply subprocess and stream its logs."""
input_path = request.args.get("input", "")
blueprint_path = request.args.get("blueprint", "")
sheet = request.args.get("sheet", DEFAULT_OUTPUT_SHEET_NAME)
if not input_path or not blueprint_path or not sheet:
return jsonify({"error": "Workbook, blueprint, and target sheet are required."}), 400
try:
workbook_path = resolve_allowed_path(input_path, APP_ROOT, ALLOWED_FILE_ROOTS)
resolved_blueprint_path = resolve_allowed_path(blueprint_path, APP_ROOT, ALLOWED_FILE_ROOTS)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
command = [
sys.executable,
"-u",
str(APP_ROOT / "apply_blueprint.py"),
"--input",
str(workbook_path),
"--blueprint",
str(resolved_blueprint_path),
"--sheet",
sheet,
]
return Response(stream_process(command, cwd=APP_ROOT), mimetype="text/event-stream")
if __name__ == "__main__":
app.run(debug=False, threaded=True)