import os import secrets import sys import uuid from html import escape from pathlib import Path from flask import Flask, Response, jsonify, render_template, request, send_file, session from newest_model import PREFERRED_PRODUCTION_CHAT_MODELS, select_groq_chat_models from src.config import ( APP_PASSWORD, APP_USERNAME, AVAILABLE_MODELS, DATA_DIR, DEFAULT_OUTPUT_SHEET_NAME, SPACE_ID, ) from src.process_runner import stop_process, stream_process from src.utils import reference_sync_status, save_manual_references_to_hub from src.workbook_io import read_workbook_sheets, resolve_allowed_path, save_uploaded_excel APP_ROOT = Path(__file__).resolve().parent UPLOAD_DIR = APP_ROOT / DATA_DIR / "uploads" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # Download/apply routes only accept files inside data/ to avoid arbitrary file reads. ALLOWED_FILE_ROOTS = [APP_ROOT / DATA_DIR] app = Flask( __name__, template_folder=str(APP_ROOT / "ui" / "templates"), static_folder=str(APP_ROOT / "ui" / "static"), ) app.config["MAX_CONTENT_LENGTH"] = 100 * 1024 * 1024 app.secret_key = os.getenv("APP_SECRET_KEY", "local-dev-secret") DEFAULT_STATE = { "clean_path": "", "clean_filename": "", "clean_sheets": [], "clean_selected_sheet": "", "output_sheet": DEFAULT_OUTPUT_SHEET_NAME, "models": "", "apply_workbook_path": "", "apply_workbook_filename": "", "apply_sheets": [], "apply_selected_sheet": "", "apply_blueprint_path": "", "apply_blueprint_filename": "", } def fresh_state(): """Create a clean UI state for one browser session.""" return { key: list(value) if isinstance(value, list) else value for key, value in DEFAULT_STATE.items() } def get_state(): """Return the current browser's state, creating it on first visit.""" if "ui_state" not in session: session["ui_state"] = fresh_state() return session["ui_state"] def mark_state_changed(): """Tell Flask to re-sign the session cookie after nested state edits.""" session.modified = True def auth_required_response() -> Response: """Ask the browser for basic-auth credentials.""" return Response( "Authentication required", 401, {"WWW-Authenticate": 'Basic realm="MasterMap Cleaner"'}, ) def missing_auth_config_response() -> Response: """Fail closed on Hugging Face if password protection was not configured.""" return Response( "App login credentials are not configured.", 503, ) @app.before_request def require_basic_auth(): """Protect every app route with a shared password when configured.""" if not APP_PASSWORD or not APP_USERNAME: if SPACE_ID: return missing_auth_config_response() return None auth = request.authorization if not auth: return auth_required_response() valid_username = secrets.compare_digest(auth.username or "", APP_USERNAME) valid_password = secrets.compare_digest(auth.password or "", APP_PASSWORD) if valid_username and valid_password: return None return auth_required_response() @app.after_request def prevent_browser_cache(response): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response def default_models() -> str: """Prefer configured Groq models, falling back to the curated production list.""" preferred_model_ids = {model.lower() for model in PREFERRED_PRODUCTION_CHAT_MODELS} env_preferred_models = [ model for model in AVAILABLE_MODELS if model.lower() in preferred_model_ids ] return ",".join(env_preferred_models or PREFERRED_PRODUCTION_CHAT_MODELS) def read_readme_guide() -> str: """Render README.md content for the in-app collapsible guide.""" readme_path = APP_ROOT / "README.md" if not readme_path.is_file(): return "

User guide is not available.

" text = readme_path.read_text(encoding="utf-8") if text.startswith("---"): parts = text.split("---", 2) if len(parts) == 3: text = parts[2].lstrip() try: import markdown except ImportError: return f"
{escape(text)}
" return markdown.markdown(text, extensions=["extra", "sane_lists"]) def render_page(message: str = "", error: str = ""): """Render the app with state scoped to the current browser session.""" state = get_state() if state["clean_sheets"]: state["clean_selected_sheet"] = pick_sheet(state["clean_sheets"], state["clean_selected_sheet"], state) if state["apply_sheets"]: state["apply_selected_sheet"] = pick_sheet(state["apply_sheets"], state["apply_selected_sheet"], state) mark_state_changed() return render_template( "index.html", **state, default_output_sheet=DEFAULT_OUTPUT_SHEET_NAME, default_models=default_models(), readme_guide_html=read_readme_guide(), can_apply=can_apply_blueprint(), message=message, error=error, ) def can_apply_blueprint() -> bool: """The Apply button requires workbook, blueprint, and target sheet.""" state = get_state() return bool( state["apply_workbook_path"] and state["apply_blueprint_path"] and state["apply_sheets"] and state["apply_selected_sheet"] ) def wants_json_response() -> bool: """AJAX upload routes ask for JSON; normal form posts render the page.""" return "application/json" in request.headers.get("Accept", "") def ui_state_payload(message: str = "", error: str = ""): """Return just enough state for the frontend to update without a reload.""" state = get_state() return { "message": message, "error": error, "apply_workbook_path": state["apply_workbook_path"], "apply_workbook_filename": state["apply_workbook_filename"], "apply_sheets": state["apply_sheets"], "apply_selected_sheet": state["apply_selected_sheet"], "apply_blueprint_path": state["apply_blueprint_path"], "apply_blueprint_filename": state["apply_blueprint_filename"], "can_apply": can_apply_blueprint(), } def pick_sheet(sheets, preferred_sheet=None, state=None): """Choose a stable sheet selection when workbooks are uploaded/refreshed.""" state = state or get_state() if preferred_sheet and preferred_sheet in sheets: return preferred_sheet if state["output_sheet"] in sheets: return state["output_sheet"] return sheets[0] if sheets else "" def update_ui_state_from_form(form): """Preserve current UI selections while a file upload request is processed.""" state = get_state() state["clean_selected_sheet"] = form.get("clean_selected_sheet") or state["clean_selected_sheet"] state["output_sheet"] = form.get("output_sheet") or state["output_sheet"] or DEFAULT_OUTPUT_SHEET_NAME state["models"] = form.get("models") or state["models"] state["apply_selected_sheet"] = form.get("apply_selected_sheet") or state["apply_selected_sheet"] mark_state_changed() @app.route("/") def index(): return render_page() @app.route("/prepare-clean", methods=["POST"]) def prepare_clean(): try: update_ui_state_from_form(request.form) state = get_state() filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR) sheets = read_workbook_sheets(path) except Exception as exc: return render_page(error=str(exc)) # The uploaded workbook becomes both the cleaning input and default apply target. state["clean_path"] = str(path) state["clean_filename"] = filename state["clean_sheets"] = sheets state["clean_selected_sheet"] = pick_sheet(sheets, request.form.get("clean_selected_sheet"), state) state["apply_workbook_path"] = str(path) state["apply_workbook_filename"] = filename state["apply_sheets"] = sheets state["apply_selected_sheet"] = pick_sheet(sheets, request.form.get("apply_selected_sheet") or state["output_sheet"], state) mark_state_changed() return render_page(message=f"Loaded {filename}.") @app.route("/remove-clean", methods=["POST"]) def remove_clean(): """Clear the current session's cleaning workbook without touching other sessions.""" update_ui_state_from_form(request.form) state = get_state() old_path = state["clean_path"] state["clean_path"] = "" state["clean_filename"] = "" state["clean_sheets"] = [] state["clean_selected_sheet"] = "" if state["apply_workbook_path"] == old_path: state["apply_workbook_path"] = "" state["apply_workbook_filename"] = "" state["apply_sheets"] = [] state["apply_selected_sheet"] = "" mark_state_changed() return render_page(message="File removed.") @app.route("/prepare-apply-workbook", methods=["POST"]) def prepare_apply_workbook(): """AJAX upload for the workbook that will receive Blueprint corrections.""" try: update_ui_state_from_form(request.form) state = get_state() filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR) sheets = read_workbook_sheets(path) except Exception as exc: if wants_json_response(): return jsonify(ui_state_payload(error=str(exc))), 400 return render_page(error=str(exc)) state["apply_workbook_path"] = str(path) state["apply_workbook_filename"] = filename state["apply_sheets"] = sheets state["apply_selected_sheet"] = pick_sheet(sheets, request.form.get("apply_selected_sheet"), state) mark_state_changed() if wants_json_response(): return jsonify(ui_state_payload(message=f"Loaded apply workbook {filename}.")) return render_page(message=f"Loaded apply workbook {filename}.") @app.route("/prepare-apply-blueprint", methods=["POST"]) def prepare_apply_blueprint(): """AJAX upload for an externally reviewed Blueprint workbook.""" try: update_ui_state_from_form(request.form) state = get_state() if state["apply_workbook_path"] and Path(state["apply_workbook_path"]).is_file(): state["apply_sheets"] = read_workbook_sheets(Path(state["apply_workbook_path"])) state["apply_selected_sheet"] = pick_sheet(state["apply_sheets"], request.form.get("apply_selected_sheet"), state) filename, path = save_uploaded_excel(request.files.get("file"), UPLOAD_DIR) except Exception as exc: if wants_json_response(): return jsonify(ui_state_payload(error=str(exc))), 400 return render_page(error=str(exc)) state["apply_blueprint_path"] = str(path) state["apply_blueprint_filename"] = filename mark_state_changed() if wants_json_response(): return jsonify(ui_state_payload(message=f"Loaded blueprint {filename}.")) return render_page(message=f"Loaded blueprint {filename}.") @app.route("/models") def models_endpoint(): """Fetch the currently usable Groq fallback model list for the UI.""" try: models = select_groq_chat_models(limit=len(PREFERRED_PRODUCTION_CHAT_MODELS)) except Exception as exc: return jsonify({"error": str(exc)}), 500 return jsonify({"models": models}) @app.route("/references/status") def references_status(): """Tell the UI whether Hugging Face reference sync can be used.""" return jsonify(reference_sync_status()) @app.route("/references/save", methods=["POST"]) def save_references(): """Commit manual references back to the Space repo when HF sync is configured.""" try: result = save_manual_references_to_hub(APP_ROOT) except Exception as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"message": "Manual references saved to Hugging Face.", **result}) @app.route("/sheets") def sheets_endpoint(): """Return workbook sheet names for dynamic apply-sheet selection.""" try: workbook_path = resolve_allowed_path(request.args.get("path", ""), APP_ROOT, ALLOWED_FILE_ROOTS) if not workbook_path.is_file(): return jsonify({"error": "Workbook is not available."}), 404 return jsonify({"sheets": read_workbook_sheets(workbook_path)}) except Exception as exc: return jsonify({"error": str(exc)}), 500 @app.route("/download-blueprint") def download_blueprint(): """Download either the session Blueprint or an explicitly requested run file.""" state = get_state() requested_path = request.args.get("path") or state["apply_blueprint_path"] if not requested_path: return jsonify({"error": "Blueprint has not been generated yet."}), 404 blueprint_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS) if not blueprint_path.exists(): return jsonify({"error": "Blueprint has not been generated yet."}), 404 return send_file(blueprint_path, as_attachment=True, download_name="Blueprint.xlsx") @app.route("/download-cleaned-workbook") def download_cleaned_workbook(): """Download the cleaned workbook copy for this session/run.""" state = get_state() requested_path = request.args.get("path") or state["clean_path"] if not requested_path: return jsonify({"error": "Cleaned workbook is not available."}), 404 workbook_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS) if not workbook_path.is_file(): return jsonify({"error": "Cleaned workbook is not available."}), 404 download_name = request.args.get("filename") or state["clean_filename"] or workbook_path.name return send_file( workbook_path, as_attachment=True, download_name=f"cleaned_{download_name}", ) @app.route("/download-applied-workbook") def download_applied_workbook(): """Download the workbook after Blueprint corrections have been applied.""" state = get_state() requested_path = request.args.get("path") or state["apply_workbook_path"] if not requested_path: return jsonify({"error": "Applied workbook is not available."}), 404 workbook_path = resolve_allowed_path(requested_path, APP_ROOT, ALLOWED_FILE_ROOTS) if not workbook_path.is_file(): return jsonify({"error": "Applied workbook is not available."}), 404 download_name = request.args.get("filename") or state["apply_workbook_filename"] or workbook_path.name return send_file( workbook_path, as_attachment=True, download_name=f"cleaned_{download_name}", ) @app.route("/run") def run(): """Start the cleaning subprocess and stream its logs as server-sent events.""" job_id = request.args.get("job_id", uuid.uuid4().hex) input_path = request.args.get("input", "") sheet = request.args.get("sheet", "") output_sheet = request.args.get("output_sheet", DEFAULT_OUTPUT_SHEET_NAME) model_list = request.args.get("models", "") if not input_path or not sheet: return jsonify({"error": "Input file and source sheet are required."}), 400 try: workbook_path = resolve_allowed_path(input_path, APP_ROOT, ALLOWED_FILE_ROOTS) except ValueError as exc: return jsonify({"error": str(exc)}), 400 # Each run gets its own Blueprint so simultaneous users cannot overwrite it. blueprint_path = UPLOAD_DIR / f"Blueprint_{job_id}.xlsx" state = get_state() state["apply_blueprint_path"] = str(blueprint_path) state["apply_blueprint_filename"] = blueprint_path.name mark_state_changed() command = [ sys.executable, "-u", str(APP_ROOT / "main.py"), "--input", str(workbook_path), "--sheet", sheet, "--output_sheet", output_sheet, "--blueprint", str(blueprint_path), ] if model_list: command.extend(["--models", model_list]) return Response(stream_process(command, cwd=APP_ROOT, job_id=job_id), mimetype="text/event-stream") @app.route("/stop", methods=["POST"]) def stop(): """Stop a running cleaning subprocess for the given frontend job id.""" job_id = request.args.get("job_id", "") if not stop_process(job_id): return jsonify({"stopped": False, "message": "No active run found."}), 404 return jsonify({"stopped": True}) @app.route("/apply") def apply_blueprint(): """Start the Blueprint-apply subprocess and stream its logs.""" input_path = request.args.get("input", "") blueprint_path = request.args.get("blueprint", "") sheet = request.args.get("sheet", DEFAULT_OUTPUT_SHEET_NAME) if not input_path or not blueprint_path or not sheet: return jsonify({"error": "Workbook, blueprint, and target sheet are required."}), 400 try: workbook_path = resolve_allowed_path(input_path, APP_ROOT, ALLOWED_FILE_ROOTS) resolved_blueprint_path = resolve_allowed_path(blueprint_path, APP_ROOT, ALLOWED_FILE_ROOTS) except ValueError as exc: return jsonify({"error": str(exc)}), 400 command = [ sys.executable, "-u", str(APP_ROOT / "apply_blueprint.py"), "--input", str(workbook_path), "--blueprint", str(resolved_blueprint_path), "--sheet", sheet, ] return Response(stream_process(command, cwd=APP_ROOT), mimetype="text/event-stream") if __name__ == "__main__": app.run(debug=False, threaded=True)