from __future__ import annotations import csv import hashlib import os import tempfile from pathlib import Path from flask import Flask, after_this_request, render_template, request, send_file from werkzeug.utils import secure_filename from pipeline import labels2attrs, run_pipeline app = Flask(__name__, template_folder="templates", static_folder="templates") UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) ALLOWED_EXTENSIONS = {"txt", "csv"} def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def iter_texts_from_file(file_path: Path): suffix = file_path.suffix.lower() if suffix == ".txt": with file_path.open("r", encoding="utf-8-sig", errors="ignore") as f: for line in f: text = line.strip() if text: yield text elif suffix == ".csv": with file_path.open("r", encoding="utf-8-sig", errors="ignore", newline="") as f: reader = csv.reader(f) for row in reader: if row and row[0].strip(): yield row[0].strip() def process_file(file_path: Path): results = [] for text in iter_texts_from_file(file_path): results.append({"input": text, "output": run_pipeline(text)}) return results def summarize_result(result_rows): counts = {} individual_labels = {} proportions = {} for idx, item in enumerate(result_rows): individual_labels[idx] = [] counts[idx] = { "generic": 0, "specific": 0, "stative": 0, "dynamic": 0, "static": 0, "episodic": 0, "habitual": 0, "NA genericity": 0, "NA eventivity": 0, "NA boundedness": 0, } for clause_text, label_name in item["output"][1]: individual_labels[idx].append(label_name) attrs = labels2attrs[label_name] for attr_index, feature in enumerate(attrs): if "NA" not in feature: counts[idx][feature] += 1 elif attr_index == 0: counts[idx]["NA genericity"] += 1 elif attr_index == 1: counts[idx]["NA eventivity"] += 1 else: counts[idx]["NA boundedness"] += 1 gen_total = ( counts[idx]["generic"] + counts[idx]["specific"] + counts[idx]["NA genericity"] ) evt_total = ( counts[idx]["stative"] + counts[idx]["dynamic"] + counts[idx]["NA eventivity"] ) bnd_total = ( counts[idx]["static"] + counts[idx]["episodic"] + counts[idx]["habitual"] + counts[idx]["NA boundedness"] ) proportions[idx] = [ counts[idx]["generic"] / gen_total if gen_total else 0.0, counts[idx]["specific"] / gen_total if gen_total else 0.0, counts[idx]["stative"] / evt_total if evt_total else 0.0, counts[idx]["dynamic"] / evt_total if evt_total else 0.0, counts[idx]["static"] / bnd_total if bnd_total else 0.0, counts[idx]["episodic"] / bnd_total if bnd_total else 0.0, counts[idx]["habitual"] / bnd_total if bnd_total else 0.0, ] return counts, individual_labels, proportions def write_results_csv(result_rows) -> str: counts, individual_labels, proportions = summarize_result(result_rows) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") tmp_path = tmp.name tmp.close() with open(tmp_path, "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) writer.writerow([ "input", "clauses", "individual labels", "genericity: generic count", "genericity: specific count", "eventivity: stative count", "eventivity: dynamic count", "boundedness: static count", "boundedness: episodic count", "habitual count", "genericity: proportion generic", "genericity: proportion specific", "eventivity: proportion stative", "eventivity: proportion dynamic", "boundedness: proportion static", "boundedness: proportion episodic", "proportion habitual", ]) for idx, item in enumerate(result_rows): clauses = "\n".join( f"{clause_id}: {clause_text}" for clause_text, clause_id in item["output"][0] ) label_lines = "\n".join(individual_labels[idx]) row = [ item["input"], clauses, label_lines, counts[idx]["generic"], counts[idx]["specific"], counts[idx]["stative"], counts[idx]["dynamic"], counts[idx]["static"], counts[idx]["episodic"], counts[idx]["habitual"], *proportions[idx], ] writer.writerow(row) return tmp_path @app.route("/", methods=["GET"]) def index(): return render_template("index.html") @app.route("/", methods=["POST"]) def upload_file(): if "file" not in request.files: return "No file selected", 400 file = request.files["file"] if not file or file.filename == "": return "No file selected", 400 if not allowed_file(file.filename): return "File type not allowed", 400 original_name = secure_filename(file.filename) suffix = Path(original_name).suffix.lower() file_hash = hashlib.md5(file.read()).hexdigest() saved_path = UPLOAD_DIR / f"{file_hash}{suffix}" file.seek(0) file.save(saved_path) result_rows = process_file(saved_path) output_csv = write_results_csv(result_rows) @after_this_request def cleanup(response): try: if saved_path.exists(): saved_path.unlink() if os.path.exists(output_csv): os.remove(output_csv) except Exception: app.logger.exception("Failed to clean up temp files") return response return send_file(output_csv, as_attachment=True, download_name="result.csv") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=False)