BabakScrapes's picture
Upgraded used packages
f042f29 verified
Raw
History Blame Contribute Delete
6.53 kB
from __future__ import annotations
import csv
import hashlib
import os
import tempfile
from pathlib import Path
from flask import Flask, after_this_request, render_template, request, send_file
from werkzeug.utils import secure_filename
from pipeline import labels2attrs, run_pipeline
app = Flask(__name__, template_folder="templates", static_folder="templates")
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {"txt", "csv"}
def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def iter_texts_from_file(file_path: Path):
suffix = file_path.suffix.lower()
if suffix == ".txt":
with file_path.open("r", encoding="utf-8-sig", errors="ignore") as f:
for line in f:
text = line.strip()
if text:
yield text
elif suffix == ".csv":
with file_path.open("r", encoding="utf-8-sig", errors="ignore", newline="") as f:
reader = csv.reader(f)
for row in reader:
if row and row[0].strip():
yield row[0].strip()
def process_file(file_path: Path):
results = []
for text in iter_texts_from_file(file_path):
results.append({"input": text, "output": run_pipeline(text)})
return results
def summarize_result(result_rows):
counts = {}
individual_labels = {}
proportions = {}
for idx, item in enumerate(result_rows):
individual_labels[idx] = []
counts[idx] = {
"generic": 0,
"specific": 0,
"stative": 0,
"dynamic": 0,
"static": 0,
"episodic": 0,
"habitual": 0,
"NA genericity": 0,
"NA eventivity": 0,
"NA boundedness": 0,
}
for clause_text, label_name in item["output"][1]:
individual_labels[idx].append(label_name)
attrs = labels2attrs[label_name]
for attr_index, feature in enumerate(attrs):
if "NA" not in feature:
counts[idx][feature] += 1
elif attr_index == 0:
counts[idx]["NA genericity"] += 1
elif attr_index == 1:
counts[idx]["NA eventivity"] += 1
else:
counts[idx]["NA boundedness"] += 1
gen_total = (
counts[idx]["generic"]
+ counts[idx]["specific"]
+ counts[idx]["NA genericity"]
)
evt_total = (
counts[idx]["stative"]
+ counts[idx]["dynamic"]
+ counts[idx]["NA eventivity"]
)
bnd_total = (
counts[idx]["static"]
+ counts[idx]["episodic"]
+ counts[idx]["habitual"]
+ counts[idx]["NA boundedness"]
)
proportions[idx] = [
counts[idx]["generic"] / gen_total if gen_total else 0.0,
counts[idx]["specific"] / gen_total if gen_total else 0.0,
counts[idx]["stative"] / evt_total if evt_total else 0.0,
counts[idx]["dynamic"] / evt_total if evt_total else 0.0,
counts[idx]["static"] / bnd_total if bnd_total else 0.0,
counts[idx]["episodic"] / bnd_total if bnd_total else 0.0,
counts[idx]["habitual"] / bnd_total if bnd_total else 0.0,
]
return counts, individual_labels, proportions
def write_results_csv(result_rows) -> str:
counts, individual_labels, proportions = summarize_result(result_rows)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
tmp_path = tmp.name
tmp.close()
with open(tmp_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"input",
"clauses",
"individual labels",
"genericity: generic count",
"genericity: specific count",
"eventivity: stative count",
"eventivity: dynamic count",
"boundedness: static count",
"boundedness: episodic count",
"habitual count",
"genericity: proportion generic",
"genericity: proportion specific",
"eventivity: proportion stative",
"eventivity: proportion dynamic",
"boundedness: proportion static",
"boundedness: proportion episodic",
"proportion habitual",
])
for idx, item in enumerate(result_rows):
clauses = "\n".join(
f"{clause_id}: {clause_text}"
for clause_text, clause_id in item["output"][0]
)
label_lines = "\n".join(individual_labels[idx])
row = [
item["input"],
clauses,
label_lines,
counts[idx]["generic"],
counts[idx]["specific"],
counts[idx]["stative"],
counts[idx]["dynamic"],
counts[idx]["static"],
counts[idx]["episodic"],
counts[idx]["habitual"],
*proportions[idx],
]
writer.writerow(row)
return tmp_path
@app.route("/", methods=["GET"])
def index():
return render_template("index.html")
@app.route("/", methods=["POST"])
def upload_file():
if "file" not in request.files:
return "No file selected", 400
file = request.files["file"]
if not file or file.filename == "":
return "No file selected", 400
if not allowed_file(file.filename):
return "File type not allowed", 400
original_name = secure_filename(file.filename)
suffix = Path(original_name).suffix.lower()
file_hash = hashlib.md5(file.read()).hexdigest()
saved_path = UPLOAD_DIR / f"{file_hash}{suffix}"
file.seek(0)
file.save(saved_path)
result_rows = process_file(saved_path)
output_csv = write_results_csv(result_rows)
@after_this_request
def cleanup(response):
try:
if saved_path.exists():
saved_path.unlink()
if os.path.exists(output_csv):
os.remove(output_csv)
except Exception:
app.logger.exception("Failed to clean up temp files")
return response
return send_file(output_csv, as_attachment=True, download_name="result.csv")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False)