| from __future__ import annotations |
|
|
| import csv |
| import hashlib |
| import os |
| import tempfile |
| from pathlib import Path |
|
|
| from flask import Flask, after_this_request, render_template, request, send_file |
| from werkzeug.utils import secure_filename |
|
|
| from pipeline import labels2attrs, run_pipeline |
|
|
| app = Flask(__name__, template_folder="templates", static_folder="templates") |
|
|
| UPLOAD_DIR = Path("uploads") |
| UPLOAD_DIR.mkdir(exist_ok=True) |
|
|
| ALLOWED_EXTENSIONS = {"txt", "csv"} |
|
|
|
|
| def allowed_file(filename: str) -> bool: |
| return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
|
| def iter_texts_from_file(file_path: Path): |
| suffix = file_path.suffix.lower() |
|
|
| if suffix == ".txt": |
| with file_path.open("r", encoding="utf-8-sig", errors="ignore") as f: |
| for line in f: |
| text = line.strip() |
| if text: |
| yield text |
|
|
| elif suffix == ".csv": |
| with file_path.open("r", encoding="utf-8-sig", errors="ignore", newline="") as f: |
| reader = csv.reader(f) |
| for row in reader: |
| if row and row[0].strip(): |
| yield row[0].strip() |
|
|
|
|
| def process_file(file_path: Path): |
| results = [] |
| for text in iter_texts_from_file(file_path): |
| results.append({"input": text, "output": run_pipeline(text)}) |
| return results |
|
|
|
|
| def summarize_result(result_rows): |
| counts = {} |
| individual_labels = {} |
| proportions = {} |
|
|
| for idx, item in enumerate(result_rows): |
| individual_labels[idx] = [] |
| counts[idx] = { |
| "generic": 0, |
| "specific": 0, |
| "stative": 0, |
| "dynamic": 0, |
| "static": 0, |
| "episodic": 0, |
| "habitual": 0, |
| "NA genericity": 0, |
| "NA eventivity": 0, |
| "NA boundedness": 0, |
| } |
|
|
| for clause_text, label_name in item["output"][1]: |
| individual_labels[idx].append(label_name) |
| attrs = labels2attrs[label_name] |
|
|
| for attr_index, feature in enumerate(attrs): |
| if "NA" not in feature: |
| counts[idx][feature] += 1 |
| elif attr_index == 0: |
| counts[idx]["NA genericity"] += 1 |
| elif attr_index == 1: |
| counts[idx]["NA eventivity"] += 1 |
| else: |
| counts[idx]["NA boundedness"] += 1 |
|
|
| gen_total = ( |
| counts[idx]["generic"] |
| + counts[idx]["specific"] |
| + counts[idx]["NA genericity"] |
| ) |
| evt_total = ( |
| counts[idx]["stative"] |
| + counts[idx]["dynamic"] |
| + counts[idx]["NA eventivity"] |
| ) |
| bnd_total = ( |
| counts[idx]["static"] |
| + counts[idx]["episodic"] |
| + counts[idx]["habitual"] |
| + counts[idx]["NA boundedness"] |
| ) |
|
|
| proportions[idx] = [ |
| counts[idx]["generic"] / gen_total if gen_total else 0.0, |
| counts[idx]["specific"] / gen_total if gen_total else 0.0, |
| counts[idx]["stative"] / evt_total if evt_total else 0.0, |
| counts[idx]["dynamic"] / evt_total if evt_total else 0.0, |
| counts[idx]["static"] / bnd_total if bnd_total else 0.0, |
| counts[idx]["episodic"] / bnd_total if bnd_total else 0.0, |
| counts[idx]["habitual"] / bnd_total if bnd_total else 0.0, |
| ] |
|
|
| return counts, individual_labels, proportions |
|
|
|
|
| def write_results_csv(result_rows) -> str: |
| counts, individual_labels, proportions = summarize_result(result_rows) |
|
|
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") |
| tmp_path = tmp.name |
| tmp.close() |
|
|
| with open(tmp_path, "w", encoding="utf-8", newline="") as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| "input", |
| "clauses", |
| "individual labels", |
| "genericity: generic count", |
| "genericity: specific count", |
| "eventivity: stative count", |
| "eventivity: dynamic count", |
| "boundedness: static count", |
| "boundedness: episodic count", |
| "habitual count", |
| "genericity: proportion generic", |
| "genericity: proportion specific", |
| "eventivity: proportion stative", |
| "eventivity: proportion dynamic", |
| "boundedness: proportion static", |
| "boundedness: proportion episodic", |
| "proportion habitual", |
| ]) |
|
|
| for idx, item in enumerate(result_rows): |
| clauses = "\n".join( |
| f"{clause_id}: {clause_text}" |
| for clause_text, clause_id in item["output"][0] |
| ) |
| label_lines = "\n".join(individual_labels[idx]) |
|
|
| row = [ |
| item["input"], |
| clauses, |
| label_lines, |
| counts[idx]["generic"], |
| counts[idx]["specific"], |
| counts[idx]["stative"], |
| counts[idx]["dynamic"], |
| counts[idx]["static"], |
| counts[idx]["episodic"], |
| counts[idx]["habitual"], |
| *proportions[idx], |
| ] |
| writer.writerow(row) |
|
|
| return tmp_path |
|
|
|
|
| @app.route("/", methods=["GET"]) |
| def index(): |
| return render_template("index.html") |
|
|
|
|
| @app.route("/", methods=["POST"]) |
| def upload_file(): |
| if "file" not in request.files: |
| return "No file selected", 400 |
|
|
| file = request.files["file"] |
| if not file or file.filename == "": |
| return "No file selected", 400 |
|
|
| if not allowed_file(file.filename): |
| return "File type not allowed", 400 |
|
|
| original_name = secure_filename(file.filename) |
| suffix = Path(original_name).suffix.lower() |
| file_hash = hashlib.md5(file.read()).hexdigest() |
| saved_path = UPLOAD_DIR / f"{file_hash}{suffix}" |
|
|
| file.seek(0) |
| file.save(saved_path) |
|
|
| result_rows = process_file(saved_path) |
| output_csv = write_results_csv(result_rows) |
|
|
| @after_this_request |
| def cleanup(response): |
| try: |
| if saved_path.exists(): |
| saved_path.unlink() |
| if os.path.exists(output_csv): |
| os.remove(output_csv) |
| except Exception: |
| app.logger.exception("Failed to clean up temp files") |
| return response |
|
|
| return send_file(output_csv, as_attachment=True, download_name="result.csv") |
|
|
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860, debug=False) |