Lior-0618 commited on
Commit
cc028c9
Β·
1 Parent(s): a01a078

fix: async job polling for timeout + FER ONNX ort>=1.19.0

Browse files
Files changed (3) hide show
  1. api/requirements.txt +2 -2
  2. proxy/index.js +70 -49
  3. web/src/app/studio/page.tsx +42 -11
api/requirements.txt CHANGED
@@ -13,6 +13,6 @@ accelerate>=1.0.0
13
  mistral-common
14
  safetensors
15
  sentencepiece
16
- # FER inference
17
- onnxruntime>=1.16.0
18
  opencv-python-headless>=4.8.0
 
13
  mistral-common
14
  safetensors
15
  sentencepiece
16
+ # FER inference β€” model uses ONNX IR v10, requires ort>=1.19.0
17
+ onnxruntime>=1.19.0
18
  opencv-python-headless>=4.8.0
proxy/index.js CHANGED
@@ -1,6 +1,10 @@
1
  /**
2
  * Server layer: proxy client requests to Model layer (voxtral-server).
3
  * Port default 3000, Model layer default http://127.0.0.1:8000
 
 
 
 
4
  */
5
  import express from "express";
6
  import multer from "multer";
@@ -9,8 +13,8 @@ import cors from "cors";
9
  const PORT = Number(process.env.PORT) || 3000;
10
  const MODEL_URL = (process.env.MODEL_URL || "http://127.0.0.1:8000").replace(/\/$/, "");
11
  const MAX_UPLOAD_BYTES = 100 * 1024 * 1024; // 100 MB
12
- const TRANSCRIBE_TIMEOUT_MS = 30 * 60 * 1000; // 30 min (CPU inference is slow)
13
- const DIARIZE_TIMEOUT_MS = 60 * 60 * 1000; // 60 min (CPU: ~50s audio/min)
14
 
15
  const upload = multer({
16
  storage: multer.memoryStorage(),
@@ -20,12 +24,7 @@ const upload = multer({
20
  const app = express();
21
 
22
  app.use(cors({
23
- origin: [
24
- "http://localhost:3030",
25
- "http://127.0.0.1:3030",
26
- "http://localhost:3000",
27
- "http://127.0.0.1:3000",
28
- ],
29
  methods: ["GET", "POST", "OPTIONS"],
30
  allowedHeaders: ["Content-Type"],
31
  }));
@@ -38,6 +37,18 @@ app.use((req, res, next) => {
38
  next();
39
  });
40
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  // ─── /health ──────────────────────────────────────────────────────────────────
42
  app.get("/health", async (req, res) => {
43
  try {
@@ -57,36 +68,25 @@ app.get("/health", async (req, res) => {
57
  }
58
  });
59
 
60
- // ─── shared proxy helper ──────────────────────────────────────────────────────
61
- async function proxyToModel(req, res, modelPath, timeoutMs) {
62
  const reqId = `req-${Date.now().toString(36)}`;
63
  const start = Date.now();
64
-
65
- if (!req.file) {
66
- return res.status(400).json({ error: "Upload an audio file (form field: audio)" });
67
- }
68
-
69
- const { buffer, size, originalname } = req.file;
70
- if (size > MAX_UPLOAD_BYTES) {
71
- return res.status(400).json({
72
- error: `File size exceeds ${MAX_UPLOAD_BYTES / 1024 / 1024}MB limit`,
73
- });
74
- }
75
 
76
  const form = new FormData();
77
  form.append("audio", new Blob([buffer]), originalname || "audio");
78
 
79
- // Forward num_speakers query param if present
80
- const numSpeakers = req.query.num_speakers;
81
  const url = numSpeakers
82
- ? `${MODEL_URL}${modelPath}?num_speakers=${encodeURIComponent(numSpeakers)}`
83
- : `${MODEL_URL}${modelPath}`;
84
 
85
  const controller = new AbortController();
86
- const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
87
 
88
  try {
89
- console.log(`[server] ${reqId} β†’ ${url} file=${originalname} size=${size}`);
90
  const r = await fetch(url, { method: "POST", body: form, signal: controller.signal });
91
  clearTimeout(timeoutId);
92
 
@@ -96,27 +96,58 @@ async function proxyToModel(req, res, modelPath, timeoutMs) {
96
 
97
  if (!r.ok) {
98
  const errMsg = data.detail || data.error || "Failed";
99
- console.error(`[server] ${reqId} model error ${r.status}: ${errMsg} | raw=${rawText.slice(0, 300)}`);
100
- return res.status(r.status >= 500 ? 502 : r.status).json({
101
- error: typeof errMsg === "string" ? errMsg : "Model error",
102
- });
103
  }
104
 
105
- console.log(`[server] ${reqId} ok in ${Date.now() - start}ms`);
106
- res.json(data);
107
  } catch (err) {
108
  clearTimeout(timeoutId);
109
  const isAbort = err.name === "AbortError";
110
- console.error(`[server] ${reqId} ${isAbort ? "timeout" : "error"} after ${Date.now() - start}ms:`, err.message);
111
- res.status(isAbort ? 504 : 502).json({
 
112
  error: isAbort
113
- ? `Request timeout (>${timeoutMs / 60000} min); try shorter audio`
114
  : "Cannot reach Model layer; ensure voxtral-server is running",
 
115
  });
116
  }
117
  }
118
 
119
- // ─── /api/debug-inference (proxies to model /debug-inference) ────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  app.get("/api/debug-inference", async (req, res) => {
121
  try {
122
  const r = await fetch(`${MODEL_URL}/debug-inference`, { signal: AbortSignal.timeout(60000) });
@@ -127,20 +158,10 @@ app.get("/api/debug-inference", async (req, res) => {
127
  }
128
  });
129
 
130
- // ─── /api/speech-to-text ──────────────────────────────────────────────────────
131
- app.post("/api/speech-to-text", upload.single("audio"), (req, res) => {
132
- return proxyToModel(req, res, "/transcribe", TRANSCRIBE_TIMEOUT_MS);
133
- });
134
-
135
- // ─── /api/transcribe-diarize ──────────────────────────────────────────────────
136
- app.post("/api/transcribe-diarize", upload.single("audio"), (req, res) => {
137
- return proxyToModel(req, res, "/transcribe-diarize", DIARIZE_TIMEOUT_MS);
138
- });
139
-
140
  // ─── start ────────────────────────────────────────────────────────────────────
141
  app.listen(PORT, () => {
142
  console.log(`[server] Server layer listening on http://0.0.0.0:${PORT}`);
143
  console.log("[server] Model layer URL:", MODEL_URL);
144
- console.log("[server] POST /api/speech-to-text β†’ batch transcription");
145
- console.log("[server] POST /api/transcribe-diarize β†’ transcription + speaker diarization");
146
  });
 
1
  /**
2
  * Server layer: proxy client requests to Model layer (voxtral-server).
3
  * Port default 3000, Model layer default http://127.0.0.1:8000
4
+ *
5
+ * POST /api/transcribe-diarize β†’ returns {job_id} immediately (202)
6
+ * GET /api/job/:id β†’ returns {status, data?, error?}
7
+ * Polling avoids HF Spaces ~3 min proxy timeout during long CPU inference.
8
  */
9
  import express from "express";
10
  import multer from "multer";
 
13
  const PORT = Number(process.env.PORT) || 3000;
14
  const MODEL_URL = (process.env.MODEL_URL || "http://127.0.0.1:8000").replace(/\/$/, "");
15
  const MAX_UPLOAD_BYTES = 100 * 1024 * 1024; // 100 MB
16
+ const DIARIZE_TIMEOUT_MS = 60 * 60 * 1000; // 60 min (CPU: ~50s/min of audio)
17
+ const JOB_TTL_MS = 30 * 60 * 1000; // keep completed jobs 30 min then evict
18
 
19
  const upload = multer({
20
  storage: multer.memoryStorage(),
 
24
  const app = express();
25
 
26
  app.use(cors({
27
+ origin: "*",
 
 
 
 
 
28
  methods: ["GET", "POST", "OPTIONS"],
29
  allowedHeaders: ["Content-Type"],
30
  }));
 
37
  next();
38
  });
39
 
40
+ // ─── Job store ────────────────────────────────────────────────────────────────
41
+ /** @type {Map<string, {status:"pending"|"done"|"error", data?:object, error?:string, ts:number}>} */
42
+ const jobs = new Map();
43
+
44
+ function evictOldJobs() {
45
+ const cutoff = Date.now() - JOB_TTL_MS;
46
+ for (const [id, job] of jobs) {
47
+ if (job.status !== "pending" && job.ts < cutoff) jobs.delete(id);
48
+ }
49
+ }
50
+ setInterval(evictOldJobs, 5 * 60 * 1000);
51
+
52
  // ─── /health ──────────────────────────────────────────────────────────────────
53
  app.get("/health", async (req, res) => {
54
  try {
 
68
  }
69
  });
70
 
71
+ // ─── Background job processor ─────────────────────────────────────────────────
72
+ async function runDiarizeJob(jobId, file, query) {
73
  const reqId = `req-${Date.now().toString(36)}`;
74
  const start = Date.now();
75
+ const { buffer, size, originalname } = file;
 
 
 
 
 
 
 
 
 
 
76
 
77
  const form = new FormData();
78
  form.append("audio", new Blob([buffer]), originalname || "audio");
79
 
80
+ const numSpeakers = query?.num_speakers;
 
81
  const url = numSpeakers
82
+ ? `${MODEL_URL}/transcribe-diarize?num_speakers=${encodeURIComponent(numSpeakers)}`
83
+ : `${MODEL_URL}/transcribe-diarize`;
84
 
85
  const controller = new AbortController();
86
+ const timeoutId = setTimeout(() => controller.abort(), DIARIZE_TIMEOUT_MS);
87
 
88
  try {
89
+ console.log(`[server] ${reqId} job=${jobId} β†’ ${url} file=${originalname} size=${size}`);
90
  const r = await fetch(url, { method: "POST", body: form, signal: controller.signal });
91
  clearTimeout(timeoutId);
92
 
 
96
 
97
  if (!r.ok) {
98
  const errMsg = data.detail || data.error || "Failed";
99
+ console.error(`[server] ${reqId} model error ${r.status}: ${errMsg}`);
100
+ jobs.set(jobId, { status: "error", error: typeof errMsg === "string" ? errMsg : "Model error", ts: Date.now() });
101
+ return;
 
102
  }
103
 
104
+ console.log(`[server] ${reqId} job=${jobId} done in ${Date.now() - start}ms`);
105
+ jobs.set(jobId, { status: "done", data, ts: Date.now() });
106
  } catch (err) {
107
  clearTimeout(timeoutId);
108
  const isAbort = err.name === "AbortError";
109
+ console.error(`[server] ${reqId} job=${jobId} ${isAbort ? "timeout" : "error"} after ${Date.now() - start}ms:`, err.message);
110
+ jobs.set(jobId, {
111
+ status: "error",
112
  error: isAbort
113
+ ? `Request timeout (>60 min); try shorter audio`
114
  : "Cannot reach Model layer; ensure voxtral-server is running",
115
+ ts: Date.now(),
116
  });
117
  }
118
  }
119
 
120
+ // ─── /api/job/:id β€” poll for job result ───────────────────────────────────────
121
+ app.get("/api/job/:id", (req, res) => {
122
+ const job = jobs.get(req.params.id);
123
+ if (!job) return res.status(404).json({ error: "Job not found or expired" });
124
+ if (job.status === "pending") return res.json({ status: "pending" });
125
+ if (job.status === "error") return res.status(200).json({ status: "error", error: job.error });
126
+ return res.json({ status: "done", data: job.data });
127
+ });
128
+
129
+ // ─── /api/transcribe-diarize β€” submit job, return immediately ─────────────────
130
+ app.post("/api/transcribe-diarize", upload.single("audio"), (req, res) => {
131
+ if (!req.file) {
132
+ return res.status(400).json({ error: "Upload an audio file (form field: audio)" });
133
+ }
134
+ if (req.file.size > MAX_UPLOAD_BYTES) {
135
+ return res.status(400).json({ error: `File size exceeds ${MAX_UPLOAD_BYTES / 1024 / 1024}MB limit` });
136
+ }
137
+
138
+ const jobId = `job-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 6)}`;
139
+ jobs.set(jobId, { status: "pending", ts: Date.now() });
140
+
141
+ // Respond immediately β€” don't await
142
+ res.status(202).json({ job_id: jobId });
143
+
144
+ // Kick off background processing
145
+ runDiarizeJob(jobId, req.file, req.query).catch(err => {
146
+ jobs.set(jobId, { status: "error", error: err.message, ts: Date.now() });
147
+ });
148
+ });
149
+
150
+ // ─── /api/debug-inference ─────────────────────────────────────────────────────
151
  app.get("/api/debug-inference", async (req, res) => {
152
  try {
153
  const r = await fetch(`${MODEL_URL}/debug-inference`, { signal: AbortSignal.timeout(60000) });
 
158
  }
159
  });
160
 
 
 
 
 
 
 
 
 
 
 
161
  // ─── start ────────────────────────────────────────────────────────────────────
162
  app.listen(PORT, () => {
163
  console.log(`[server] Server layer listening on http://0.0.0.0:${PORT}`);
164
  console.log("[server] Model layer URL:", MODEL_URL);
165
+ console.log("[server] POST /api/transcribe-diarize β†’ submit async job (202 + job_id)");
166
+ console.log("[server] GET /api/job/:id β†’ poll job status");
167
  });
web/src/app/studio/page.tsx CHANGED
@@ -687,41 +687,72 @@ function StudioContent() {
687
  }
688
  }, [sessionId])
689
 
690
- // Automatic processing for pending sessions
 
 
691
  useEffect(() => {
692
  if (!session || processingRef.current || processError) return
693
 
694
- // If we have a file but no segments, it's a pending session
695
  if (session.file && session.data.segments.length === 0) {
696
- processingRef.current = true // synchronous guard β€” prevents re-entry before state update commits
697
  const process = async () => {
698
  setIsProcessing(true)
699
  setProcessError(null)
700
  try {
 
701
  const formData = new FormData()
702
  formData.append("audio", session.file!, session.filename)
703
 
704
- const res = await fetch(`${API_BASE}/api/transcribe-diarize`, {
705
  method: "POST",
706
  body: formData,
707
  })
708
 
709
- if (!res.ok) {
710
- const errData = await res.json().catch(() => ({}))
711
- throw new Error(errData.error ?? "Processing failed")
712
  }
713
 
714
- const data = await res.json() as DiarizeResult
715
- updateSession(session.id, data)
 
 
 
 
716
 
717
- // Re-fetch to update local state and trigger re-render
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
718
  const updated = getSession(session.id)
719
  setSession(updated)
720
  if (updated?.data.segments && updated.data.segments.length > 0) {
721
  setActiveId(updated.data.segments[0].id)
722
  }
723
  } catch (e) {
724
- processingRef.current = false // allow retry on error
725
  setProcessError(e instanceof Error ? e.message : "Request failed")
726
  } finally {
727
  setIsProcessing(false)
 
687
  }
688
  }, [sessionId])
689
 
690
+ // Automatic processing for pending sessions.
691
+ // Uses async job polling: POST returns {job_id} immediately, then GET /api/job/:id
692
+ // until done β€” avoids HF Spaces ~3 min proxy timeout during long CPU inference.
693
  useEffect(() => {
694
  if (!session || processingRef.current || processError) return
695
 
 
696
  if (session.file && session.data.segments.length === 0) {
697
+ processingRef.current = true
698
  const process = async () => {
699
  setIsProcessing(true)
700
  setProcessError(null)
701
  try {
702
+ // 1. Submit job β€” server responds immediately with job_id (202)
703
  const formData = new FormData()
704
  formData.append("audio", session.file!, session.filename)
705
 
706
+ const submitRes = await fetch(`${API_BASE}/api/transcribe-diarize`, {
707
  method: "POST",
708
  body: formData,
709
  })
710
 
711
+ if (!submitRes.ok) {
712
+ const errData = await submitRes.json().catch(() => ({}))
713
+ throw new Error(errData.error ?? "Submit failed")
714
  }
715
 
716
+ const { job_id } = await submitRes.json() as { job_id: string }
717
+
718
+ // 2. Poll until done (every 3s)
719
+ const POLL_INTERVAL = 3000
720
+ const MAX_POLLS = 60 * 20 // 60 min max
721
+ let polls = 0
722
 
723
+ const data = await new Promise<DiarizeResult>((resolve, reject) => {
724
+ const tick = async () => {
725
+ polls++
726
+ if (polls > MAX_POLLS) {
727
+ reject(new Error("Processing timed out after 60 minutes"))
728
+ return
729
+ }
730
+ try {
731
+ const pollRes = await fetch(`${API_BASE}/api/job/${job_id}`)
732
+ const pollData = await pollRes.json()
733
+ if (pollData.status === "done") {
734
+ resolve(pollData.data as DiarizeResult)
735
+ } else if (pollData.status === "error") {
736
+ reject(new Error(pollData.error ?? "Processing failed"))
737
+ } else {
738
+ // still pending β€” keep polling
739
+ setTimeout(tick, POLL_INTERVAL)
740
+ }
741
+ } catch (e) {
742
+ reject(e)
743
+ }
744
+ }
745
+ setTimeout(tick, POLL_INTERVAL)
746
+ })
747
+
748
+ updateSession(session.id, data)
749
  const updated = getSession(session.id)
750
  setSession(updated)
751
  if (updated?.data.segments && updated.data.segments.length > 0) {
752
  setActiveId(updated.data.segments[0].id)
753
  }
754
  } catch (e) {
755
+ processingRef.current = false
756
  setProcessError(e instanceof Error ? e.message : "Request failed")
757
  } finally {
758
  setIsProcessing(false)