Spaces:
Running on Zero
Running on Zero
File size: 5,966 Bytes
1073ba3 aff6f3d 1073ba3 aff6f3d 1073ba3 aff6f3d 1073ba3 aff6f3d 1073ba3 aff6f3d 1073ba3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | import argparse
import math
import os
import struct
import sys
import time
import wave
from datetime import datetime, timezone
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from app import (
INFERENCE_MODE_HYBRID,
INFERENCE_MODE_MF_ONLY,
INFERENCE_MODE_QWEN_ONLY,
process_audio,
)
def _slug(text):
return text.lower().replace(" + ", "_plus_").replace(" ", "_")
def _ensure_synthetic_sample(audio_path):
if os.path.exists(audio_path):
return
if os.path.basename(audio_path) != "public_domain_tone.wav":
return
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
sample_rate = 22050
segment_sec = 2.0
progression = [
(261.63, 329.63, 392.00),
(293.66, 369.99, 440.00),
(349.23, 440.00, 523.25),
(392.00, 493.88, 587.33),
]
frames = []
for chord in progression:
total = int(segment_sec * sample_rate)
for i in range(total):
t = i / sample_rate
attack = min(1.0, i / (0.05 * sample_rate))
release = min(1.0, (total - i) / (0.05 * sample_rate))
envelope = attack * release
mixed = 0.0
for freq in chord:
mixed += math.sin(2.0 * math.pi * freq * t)
mixed = (mixed / len(chord)) * 0.35 * envelope
sample = int(max(-1.0, min(1.0, mixed)) * 32767)
frames.append(struct.pack("<h", sample))
with wave.open(audio_path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(b"".join(frames))
def run_mode(audio_path, extraction_mode, inference_mode, out_dir):
started = time.time()
print(f"[start] mode={inference_mode}", flush=True)
try:
raw_timeline, mf_analysis, final_output, file_path = process_audio(
audio_path,
extraction_mode,
inference_mode,
progress=None,
)
elapsed = time.time() - started
base = _slug(inference_mode)
mode_dir = os.path.join(out_dir, base)
os.makedirs(mode_dir, exist_ok=True)
with open(os.path.join(mode_dir, "summary.txt"), "w", encoding="utf-8") as f:
f.write(f"inference_mode: {inference_mode}\n")
f.write(f"extraction_mode: {extraction_mode}\n")
f.write(f"elapsed_seconds: {elapsed:.2f}\n")
f.write(f"output_file: {file_path}\n")
with open(os.path.join(mode_dir, "raw_timeline.txt"), "w", encoding="utf-8") as f:
f.write(raw_timeline)
with open(os.path.join(mode_dir, "music_flamingo.txt"), "w", encoding="utf-8") as f:
f.write(mf_analysis)
with open(os.path.join(mode_dir, "final_output.md"), "w", encoding="utf-8") as f:
f.write(final_output)
print(f"[done] mode={inference_mode} elapsed={elapsed:.2f}s", flush=True)
return True, elapsed, None
except Exception as e:
elapsed = time.time() - started
base = _slug(inference_mode)
mode_dir = os.path.join(out_dir, base)
os.makedirs(mode_dir, exist_ok=True)
with open(os.path.join(mode_dir, "error.txt"), "w", encoding="utf-8") as f:
f.write(str(e))
print(f"[fail] mode={inference_mode} elapsed={elapsed:.2f}s error={e}", flush=True)
return False, elapsed, str(e)
def main():
parser = argparse.ArgumentParser(description="Run one-file E2E smoke tests across inference runtime modes.")
parser.add_argument("--audio", required=True, help="Path to test audio file.")
parser.add_argument(
"--extraction-mode",
default="Chords & Lyrics (Combined)",
choices=["Chords & Lyrics (Combined)", "Chords Only", "Lyrics Only"],
help="Extraction mode passed to process_audio.",
)
parser.add_argument(
"--output-dir",
default="verification_outputs",
help="Directory to write test artifacts.",
)
parser.add_argument(
"--modes",
nargs="+",
choices=[
INFERENCE_MODE_QWEN_ONLY,
INFERENCE_MODE_MF_ONLY,
INFERENCE_MODE_HYBRID,
],
default=[
INFERENCE_MODE_QWEN_ONLY,
INFERENCE_MODE_MF_ONLY,
INFERENCE_MODE_HYBRID,
],
help="Subset of runtime modes to execute.",
)
args = parser.parse_args()
audio_path = args.audio
if not os.path.isabs(audio_path):
repo_relative = os.path.join(ROOT, audio_path)
if os.path.exists(repo_relative):
audio_path = repo_relative
if not os.path.exists(audio_path):
_ensure_synthetic_sample(audio_path)
if not os.path.exists(audio_path):
raise SystemExit(f"Audio file not found: {audio_path}")
print(f"[env] python={sys.version.split()[0]}", flush=True)
print(f"[env] script={os.path.abspath(__file__)}", flush=True)
print(f"[env] audio={os.path.abspath(audio_path)}", flush=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
run_dir = os.path.join(args.output_dir, ts)
os.makedirs(run_dir, exist_ok=True)
modes = args.modes
rows = []
for mode in modes:
ok, elapsed, error = run_mode(audio_path, args.extraction_mode, mode, run_dir)
rows.append((mode, ok, elapsed, error))
with open(os.path.join(run_dir, "report.txt"), "w", encoding="utf-8") as f:
for mode, ok, elapsed, error in rows:
f.write(f"{mode}\tstatus={'OK' if ok else 'FAIL'}\telapsed={elapsed:.2f}s\n")
if error:
f.write(f" error: {error}\n")
print(f"Wrote artifacts to: {run_dir}")
for mode, ok, elapsed, error in rows:
print(f"{mode}: {'OK' if ok else 'FAIL'} ({elapsed:.2f}s)")
if error:
print(f" error: {error}")
if __name__ == "__main__":
main() |