"""Serialize SongFormer analysis results to downloadable files. Pure, UI-agnostic helpers used by app.py. No model or Gradio imports, so these can be unit-tested without loading any checkpoint. """ import csv import io import json import os import shutil import tempfile import time import zipfile # Per-run export directories older than this (seconds) are swept at the start # of each analysis. Recent runs are kept so their download files stay servable. DEFAULT_EXPORT_TTL_SECONDS = 3600 def format_time(t: float) -> str: """Render seconds as mm:ss.mmm (e.g. 61.5 -> '01:01.500').""" minutes = int(t // 60) seconds = t % 60 return f"{minutes:02d}:{seconds:06.3f}" def stem_of(audio_path: str) -> str: """Return the audio filename without directory or extension.""" return os.path.splitext(os.path.basename(audio_path))[0] def segments_to_table(segments) -> list: """Build display table rows: [start "(mm:ss.mmm)", end "(mm:ss.mmm)", label].""" rows = [] for seg in segments: start = float(seg["start"]) end = float(seg["end"]) rows.append( [ f"{start:.2f} ({format_time(start)})", f"{end:.2f} ({format_time(end)})", seg["label"], ] ) return rows def segments_to_csv(segments) -> str: """Build CSV text from segment dicts. Each segment is {"start": str|float, "end": str|float, "label": str}. Columns: start_sec, start_mmss, end_sec, end_mmss, label. """ buf = io.StringIO() writer = csv.writer(buf, lineterminator="\n") writer.writerow(["start_sec", "start_mmss", "end_sec", "end_mmss", "label"]) for seg in segments: start = float(seg["start"]) end = float(seg["end"]) writer.writerow( [ f"{start:.2f}", format_time(start), f"{end:.2f}", format_time(end), seg["label"], ] ) return buf.getvalue() def segments_to_audacity(segments) -> str: """Build an Audacity label-track file from segment dicts. One line per segment: startendlabel, seconds with six decimals (Audacity's File > Import > Labels format). """ lines = [] for seg in segments: start = float(seg["start"]) end = float(seg["end"]) lines.append(f"{start:.6f}\t{end:.6f}\t{seg['label']}") return "\n".join(lines) + ("\n" if lines else "") def segments_to_combined_csv(named) -> str: """Build a combined CSV across files. `named` is a list of (filename, segments). Columns: filename, start_sec, start_mmss, end_sec, end_mmss, label. """ buf = io.StringIO() writer = csv.writer(buf, lineterminator="\n") writer.writerow( ["filename", "start_sec", "start_mmss", "end_sec", "end_mmss", "label"] ) for filename, segments in named: for seg in segments: start = float(seg["start"]) end = float(seg["end"]) writer.writerow( [ filename, f"{start:.2f}", format_time(start), f"{end:.2f}", format_time(end), seg["label"], ] ) return buf.getvalue() def combined_json(named) -> str: """Build a combined JSON mapping {filename: segments} across files.""" return json.dumps( {filename: segments for filename, segments in named}, indent=2, ensure_ascii=False, ) def write_exports(audio_path, segments, json_str, msa_str, fig, out_dir, stem=None) -> dict: """Write json/msa/csv/audacity/png into out_dir; return {format: path}. Reuses the already-built json_str/msa_str from app.py rather than re-serializing. Saves the matplotlib figure as PNG. `stem` overrides the filename stem (used by batch to keep de-duplicated folder and file names consistent); defaults to the audio filename's stem. """ if stem is None: stem = stem_of(audio_path) paths = { "json": os.path.join(out_dir, f"{stem}.json"), "msa": os.path.join(out_dir, f"{stem}.msa.txt"), "csv": os.path.join(out_dir, f"{stem}.csv"), "audacity": os.path.join(out_dir, f"{stem}.audacity.txt"), "png": os.path.join(out_dir, f"{stem}.png"), } with open(paths["json"], "w", encoding="utf-8") as f: f.write(json_str) with open(paths["msa"], "w", encoding="utf-8") as f: f.write(msa_str) with open(paths["csv"], "w", encoding="utf-8", newline="") as f: f.write(segments_to_csv(segments)) with open(paths["audacity"], "w", encoding="utf-8") as f: f.write(segments_to_audacity(segments)) fig.savefig(paths["png"], dpi=150, bbox_inches="tight") return paths def make_zip(paths, zip_path) -> str: """Bundle the given files into zip_path using their basenames. Returns zip_path. """ with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for p in paths: zf.write(p, arcname=os.path.basename(p)) return zip_path # File types that are already compressed: deflating them again wastes CPU # (which matters because the batch ZIP is rebuilt incrementally per file). _STORED_EXTENSIONS = {".png", ".jpg", ".jpeg", ".zip", ".mp3", ".flac", ".ogg"} def zip_dir(src_dir, zip_path) -> str: """Zip the contents of src_dir into zip_path. Arcnames are relative to src_dir, preserving subfolders. Files that are already compressed (see _STORED_EXTENSIONS) are stored uncompressed. Returns zip_path. """ with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for root, _dirs, files in os.walk(src_dir): for name in files: full = os.path.join(root, name) arcname = os.path.relpath(full, src_dir) compress = ( zipfile.ZIP_STORED if os.path.splitext(name)[1].lower() in _STORED_EXTENSIONS else zipfile.ZIP_DEFLATED ) zf.write(full, arcname=arcname, compress_type=compress) return zip_path def cleanup_old_exports(parent_dir, max_age_seconds, now=None) -> list: """Remove run subdirectories of parent_dir older than max_age_seconds. Only directories are swept (stray files are left alone). A missing parent_dir is a no-op. Recent runs are preserved so their download files remain servable. Returns the list of removed directory paths. """ if now is None: now = time.time() removed = [] if not os.path.isdir(parent_dir): return removed cutoff = now - max_age_seconds for name in sorted(os.listdir(parent_dir)): path = os.path.join(parent_dir, name) if not os.path.isdir(path): continue if os.path.getmtime(path) < cutoff: shutil.rmtree(path, ignore_errors=True) removed.append(path) return removed def new_run_dir(parent_dir=None, ttl_seconds=DEFAULT_EXPORT_TTL_SECONDS) -> str: """Create a fresh run directory for export files, sweeping stale runs. Shared bootstrap for the single-file and batch handlers. parent_dir defaults to /songformer_exports. """ if parent_dir is None: parent_dir = os.path.join(tempfile.gettempdir(), "songformer_exports") os.makedirs(parent_dir, exist_ok=True) cleanup_old_exports(parent_dir, ttl_seconds) return tempfile.mkdtemp(prefix="run_", dir=parent_dir)