SongFormer / export_utils.py
SidSaxena's picture
Deploy hf-space @ 164d9c7: Audacity label export
329ea23 verified
Raw
History Blame Contribute Delete
7.63 kB
"""Serialize SongFormer analysis results to downloadable files.
Pure, UI-agnostic helpers used by app.py. No model or Gradio imports, so
these can be unit-tested without loading any checkpoint.
"""
import csv
import io
import json
import os
import shutil
import tempfile
import time
import zipfile
# Per-run export directories older than this (seconds) are swept at the start
# of each analysis. Recent runs are kept so their download files stay servable.
DEFAULT_EXPORT_TTL_SECONDS = 3600
def format_time(t: float) -> str:
"""Render seconds as mm:ss.mmm (e.g. 61.5 -> '01:01.500')."""
minutes = int(t // 60)
seconds = t % 60
return f"{minutes:02d}:{seconds:06.3f}"
def stem_of(audio_path: str) -> str:
"""Return the audio filename without directory or extension."""
return os.path.splitext(os.path.basename(audio_path))[0]
def segments_to_table(segments) -> list:
"""Build display table rows: [start "(mm:ss.mmm)", end "(mm:ss.mmm)", label]."""
rows = []
for seg in segments:
start = float(seg["start"])
end = float(seg["end"])
rows.append(
[
f"{start:.2f} ({format_time(start)})",
f"{end:.2f} ({format_time(end)})",
seg["label"],
]
)
return rows
def segments_to_csv(segments) -> str:
"""Build CSV text from segment dicts.
Each segment is {"start": str|float, "end": str|float, "label": str}.
Columns: start_sec, start_mmss, end_sec, end_mmss, label.
"""
buf = io.StringIO()
writer = csv.writer(buf, lineterminator="\n")
writer.writerow(["start_sec", "start_mmss", "end_sec", "end_mmss", "label"])
for seg in segments:
start = float(seg["start"])
end = float(seg["end"])
writer.writerow(
[
f"{start:.2f}",
format_time(start),
f"{end:.2f}",
format_time(end),
seg["label"],
]
)
return buf.getvalue()
def segments_to_audacity(segments) -> str:
"""Build an Audacity label-track file from segment dicts.
One line per segment: start<TAB>end<TAB>label, seconds with six
decimals (Audacity's File > Import > Labels format).
"""
lines = []
for seg in segments:
start = float(seg["start"])
end = float(seg["end"])
lines.append(f"{start:.6f}\t{end:.6f}\t{seg['label']}")
return "\n".join(lines) + ("\n" if lines else "")
def segments_to_combined_csv(named) -> str:
"""Build a combined CSV across files.
`named` is a list of (filename, segments). Columns:
filename, start_sec, start_mmss, end_sec, end_mmss, label.
"""
buf = io.StringIO()
writer = csv.writer(buf, lineterminator="\n")
writer.writerow(
["filename", "start_sec", "start_mmss", "end_sec", "end_mmss", "label"]
)
for filename, segments in named:
for seg in segments:
start = float(seg["start"])
end = float(seg["end"])
writer.writerow(
[
filename,
f"{start:.2f}",
format_time(start),
f"{end:.2f}",
format_time(end),
seg["label"],
]
)
return buf.getvalue()
def combined_json(named) -> str:
"""Build a combined JSON mapping {filename: segments} across files."""
return json.dumps(
{filename: segments for filename, segments in named},
indent=2,
ensure_ascii=False,
)
def write_exports(audio_path, segments, json_str, msa_str, fig, out_dir, stem=None) -> dict:
"""Write json/msa/csv/audacity/png into out_dir; return {format: path}.
Reuses the already-built json_str/msa_str from app.py rather than
re-serializing. Saves the matplotlib figure as PNG. `stem` overrides the
filename stem (used by batch to keep de-duplicated folder and file names
consistent); defaults to the audio filename's stem.
"""
if stem is None:
stem = stem_of(audio_path)
paths = {
"json": os.path.join(out_dir, f"{stem}.json"),
"msa": os.path.join(out_dir, f"{stem}.msa.txt"),
"csv": os.path.join(out_dir, f"{stem}.csv"),
"audacity": os.path.join(out_dir, f"{stem}.audacity.txt"),
"png": os.path.join(out_dir, f"{stem}.png"),
}
with open(paths["json"], "w", encoding="utf-8") as f:
f.write(json_str)
with open(paths["msa"], "w", encoding="utf-8") as f:
f.write(msa_str)
with open(paths["csv"], "w", encoding="utf-8", newline="") as f:
f.write(segments_to_csv(segments))
with open(paths["audacity"], "w", encoding="utf-8") as f:
f.write(segments_to_audacity(segments))
fig.savefig(paths["png"], dpi=150, bbox_inches="tight")
return paths
def make_zip(paths, zip_path) -> str:
"""Bundle the given files into zip_path using their basenames.
Returns zip_path.
"""
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for p in paths:
zf.write(p, arcname=os.path.basename(p))
return zip_path
# File types that are already compressed: deflating them again wastes CPU
# (which matters because the batch ZIP is rebuilt incrementally per file).
_STORED_EXTENSIONS = {".png", ".jpg", ".jpeg", ".zip", ".mp3", ".flac", ".ogg"}
def zip_dir(src_dir, zip_path) -> str:
"""Zip the contents of src_dir into zip_path.
Arcnames are relative to src_dir, preserving subfolders. Files that are
already compressed (see _STORED_EXTENSIONS) are stored uncompressed.
Returns zip_path.
"""
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for root, _dirs, files in os.walk(src_dir):
for name in files:
full = os.path.join(root, name)
arcname = os.path.relpath(full, src_dir)
compress = (
zipfile.ZIP_STORED
if os.path.splitext(name)[1].lower() in _STORED_EXTENSIONS
else zipfile.ZIP_DEFLATED
)
zf.write(full, arcname=arcname, compress_type=compress)
return zip_path
def cleanup_old_exports(parent_dir, max_age_seconds, now=None) -> list:
"""Remove run subdirectories of parent_dir older than max_age_seconds.
Only directories are swept (stray files are left alone). A missing
parent_dir is a no-op. Recent runs are preserved so their download files
remain servable. Returns the list of removed directory paths.
"""
if now is None:
now = time.time()
removed = []
if not os.path.isdir(parent_dir):
return removed
cutoff = now - max_age_seconds
for name in sorted(os.listdir(parent_dir)):
path = os.path.join(parent_dir, name)
if not os.path.isdir(path):
continue
if os.path.getmtime(path) < cutoff:
shutil.rmtree(path, ignore_errors=True)
removed.append(path)
return removed
def new_run_dir(parent_dir=None, ttl_seconds=DEFAULT_EXPORT_TTL_SECONDS) -> str:
"""Create a fresh run directory for export files, sweeping stale runs.
Shared bootstrap for the single-file and batch handlers. parent_dir
defaults to <system tempdir>/songformer_exports.
"""
if parent_dir is None:
parent_dir = os.path.join(tempfile.gettempdir(), "songformer_exports")
os.makedirs(parent_dir, exist_ok=True)
cleanup_old_exports(parent_dir, ttl_seconds)
return tempfile.mkdtemp(prefix="run_", dir=parent_dir)