bichnhan2701 commited on
Commit
7158b5e
·
1 Parent(s): 557b80a

Update Phowhisper services logic

Browse files
.dockerignore CHANGED
@@ -1,6 +1,7 @@
1
  test/
2
  *.md
3
  .myvenv
 
4
  __pycache__
5
  *.pyc
6
  .DS_Store
 
1
  test/
2
  *.md
3
  .myvenv
4
+ .myvenv1
5
  __pycache__
6
  *.pyc
7
  .DS_Store
.gitignore CHANGED
@@ -4,6 +4,5 @@ __pycache__/
4
  *.pyc
5
  .env
6
  *.md
7
- docker-compose.yml
8
- examples/
9
- docs/
 
4
  *.pyc
5
  .env
6
  *.md
7
+ docs/
8
+ *.json
 
Dockerfile CHANGED
@@ -3,34 +3,27 @@ FROM python:3.11-slim
3
  ENV PYTHONDONTWRITEBYTECODE=1 \
4
  PYTHONUNBUFFERED=1 \
5
  TMP_DIR=/tmp/uploads \
6
- PORT=7860
7
- ENV HF_HOME=/tmp/huggingface
8
- ENV TRANSFORMERS_CACHE=/tmp/huggingface
9
 
10
- # system deps (single RUN to minimize layers)
11
  RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
12
- ffmpeg libsndfile1 git build-essential wget curl && \
13
  rm -rf /var/lib/apt/lists/*
14
 
15
  WORKDIR /app
16
 
17
- # install python deps using cached layer
18
- COPY requirements.txt /app/requirements.txt
19
  RUN pip install --upgrade pip && \
20
- pip install --no-cache-dir -r /app/requirements.txt
21
 
22
- # copy app code
23
- COPY . /app
24
-
25
- # create tmp dir and non-root user
26
- RUN mkdir -p ${TMP_DIR} && groupadd -r app && useradd -r -g app app && \
27
- chown -R app:app /app ${TMP_DIR}
28
-
29
- USER app
30
 
31
  EXPOSE ${PORT}
32
 
33
- HEALTHCHECK --interval=30s --timeout=3s --start-period=10s \
34
  CMD curl -f http://localhost:${PORT}/health || exit 1
35
 
36
- CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860", "--workers", "1"]
 
3
  ENV PYTHONDONTWRITEBYTECODE=1 \
4
  PYTHONUNBUFFERED=1 \
5
  TMP_DIR=/tmp/uploads \
6
+ PORT=7860 \
7
+ HF_HOME=/tmp/huggingface \
8
+ TRANSFORMERS_CACHE=/tmp/huggingface
9
 
 
10
  RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
11
+ ffmpeg libsndfile1 git build-essential wget curl redis-server && \
12
  rm -rf /var/lib/apt/lists/*
13
 
14
  WORKDIR /app
15
 
16
+ COPY requirements.txt .
 
17
  RUN pip install --upgrade pip && \
18
+ pip install --no-cache-dir -r requirements.txt rq
19
 
20
+ COPY . .
21
+ COPY start.sh /start.sh
22
+ RUN chmod +x /start.sh && mkdir -p ${TMP_DIR}
 
 
 
 
 
23
 
24
  EXPOSE ${PORT}
25
 
26
+ HEALTHCHECK --interval=30s --timeout=3s --start-period=15s \
27
  CMD curl -f http://localhost:${PORT}/health || exit 1
28
 
29
+ CMD ["/start.sh"]
app/api/transcribe.py CHANGED
@@ -7,11 +7,19 @@ from fastapi.responses import JSONResponse
7
  from pathlib import Path
8
  from typing import Optional
9
  import time
10
- from app.core.audio_utils import save_upload_file, get_audio_info, ensure_wav_16k_mono, make_temp_path, download_file_from_url
11
- from app.core.asr_engine import load_model, transcribe_file, transcribe_file_chunks
 
 
 
 
 
 
 
 
 
 
12
  from app.config import settings
13
- from app.services.nlp_postprocess import normalize_and_extract
14
- # Summary and mindmap generation moved to Note Service; do not import here
15
  from app.services.note_client import NoteServiceClient
16
  from rq import Queue
17
  from app.infra.redis_client import redis_client
@@ -21,14 +29,9 @@ from app.infra.metrics import (
21
  REQUEST_COUNT,
22
  REQUEST_LATENCY,
23
  ASR_DURATION,
24
- NORMALIZE_DURATION,
25
- ERROR_COUNT,
26
  )
27
 
28
  router = APIRouter()
29
-
30
- # load model on import/startup to avoid repeated initialization
31
- # you may prefer to call load_model in FastAPI startup event
32
  ASR_MODEL = None
33
 
34
  @router.on_event("startup")
@@ -100,36 +103,39 @@ async def transcribe(file: UploadFile = File(...)):
100
  model = ASR_MODEL or await asyncio.to_thread(load_model, 30)
101
  with ASR_DURATION.labels(endpoint).time():
102
  text = await asyncio.to_thread(transcribe_file, model, tmp_wav, 30.0, 5.0)
103
- chunks = await asyncio.to_thread(transcribe_file_chunks, model, tmp_wav, 30.0, 5.0)
104
-
105
- # normalize via Gemini (already async safe in your service)
106
- with NORMALIZE_DURATION.labels(endpoint).time():
107
- # normalized_text = await normalize_text(text)
108
- nlp = await normalize_and_extract(text)
109
- normalized_text = nlp["normalized_text"]
110
- keywords = nlp["keywords"]
111
-
112
- # Summary / mindmap are generated by the Note Service; omit local generation
113
- summary = None
114
- mindmap = None
115
 
116
  info2 = get_audio_info(tmp_wav) or {}
117
  # persist to Note Service (async HTTP)
118
  payload = {
119
  "note_id": note_id,
 
 
120
  "raw_text": text,
121
- "normalized_text": normalized_text,
122
- "keywords": keywords,
123
- "summary": summary,
124
- "mindmap": mindmap,
125
- "duration": info2.get("duration"),
126
- "sample_rate": info2.get("samplerate"),
127
- "chunks": chunks,
128
- "asr_model": "PhoWhisper-base",
129
- "normalization_model": "gemini-1.5",
130
- "generate": ["summary", "mindmap"], # <-- thêm dòng này
131
  }
132
- await note_service.save_transcript(payload)
 
 
 
 
 
 
 
133
 
134
  duration = time.perf_counter() - start_time
135
  logging.info(f"/transcribe success note_id={note_id} duration={duration:.2f}s audio_dur={info2.get('duration')}")
@@ -138,22 +144,11 @@ async def transcribe(file: UploadFile = File(...)):
138
  status_code=200,
139
  content={
140
  "note_id": note_id,
141
- "status": "transcribed",
142
  "duration": info2.get("duration"),
143
  },
144
  )
145
 
146
- except HTTPException:
147
- status_label = "http_error"
148
- ERROR_COUNT.labels(endpoint, status_label).inc()
149
- raise
150
-
151
- except Exception as e:
152
- status_label = "error"
153
- ERROR_COUNT.labels(endpoint, status_label).inc()
154
- logging.exception(f"/transcribe failed note_id={note_id}")
155
- raise HTTPException(status_code=500, detail=f"Transcription failed: {e}")
156
-
157
  finally:
158
  # cleanup
159
  for p in [tmp_in, tmp_wav]:
@@ -235,32 +230,39 @@ async def transcribe_url(payload: dict):
235
  chunks = await asyncio.to_thread(
236
  transcribe_file_chunks, model, tmp_wav, 30.0, 5.0
237
  )
 
 
 
 
 
238
 
239
- with NORMALIZE_DURATION.labels(endpoint).time():
240
- nlp = await normalize_and_extract(text)
241
- normalized_text = nlp["normalized_text"]
242
- keywords = nlp["keywords"]
243
-
244
- # Summary / mindmap are generated by the Note Service; omit local generation
245
- summary = None
246
- mindmap = None
247
 
248
  # 5. Persist to Note Service
249
  payload = {
250
  "note_id": note_id,
 
 
251
  "raw_text": text,
252
- "normalized_text": normalized_text,
253
- "keywords": keywords,
254
- "summary": summary,
255
- "mindmap": mindmap,
256
- "duration": info.get("duration"),
257
- "sample_rate": info.get("samplerate"),
258
- "chunks": chunks,
259
- "asr_model": "PhoWhisper-base",
260
- "normalization_model": "gemini-1.5",
261
- "generate": ["summary", "mindmap"], # <-- thêm dòng này
262
  }
263
- await note_service.save_transcript(payload)
 
 
 
 
 
 
 
 
264
 
265
  duration = time.perf_counter() - start_time
266
  logging.info(
@@ -273,98 +275,15 @@ async def transcribe_url(payload: dict):
273
  status_code=200,
274
  content={
275
  "note_id": note_id,
276
- "status": "transcribed",
277
  "duration": info.get("duration"),
278
  },
279
  )
280
 
281
- except HTTPException:
282
- status_label = "http_error"
283
- ERROR_COUNT.labels(endpoint, status_label).inc()
284
- raise
285
-
286
- except Exception as e:
287
- status_label = "error"
288
- ERROR_COUNT.labels(endpoint, status_label).inc()
289
- logging.exception(f"/transcribe-url failed note_id={note_id}")
290
- raise HTTPException(status_code=500, detail=str(e))
291
-
292
  finally:
293
  for p in [tmp_in, tmp_wav]:
294
  try:
295
  if p and os.path.exists(p):
296
  os.remove(p)
297
  except Exception:
298
- pass
299
-
300
- # @router.post("/transcribe-url", response_model=TranscribeResponse)
301
- # async def transcribe_url(payload: dict):
302
- # audio_url = payload.get("audio_url")
303
- # user_id = payload.get("user_id")
304
- # if not audio_url:
305
- # raise HTTPException(status_code=400, detail="audio_url required")
306
- # if not user_id:
307
- # raise HTTPException(status_code=400, detail="user_id required")
308
-
309
- # tmp_in = make_temp_path(suffix=Path(audio_url).suffix or ".tmp")
310
- # tmp_wav = None
311
- # note_service = NoteServiceClient()
312
- # note_id = str(uuid.uuid4())
313
-
314
- # start_time = time.perf_counter()
315
- # try:
316
- # # download blocking -> thread
317
- # await asyncio.to_thread(download_file_from_url, audio_url, tmp_in)
318
-
319
- # _ensure_file_limits(tmp_in)
320
-
321
- # tmp_wav = make_temp_path(suffix=".wav")
322
- # await asyncio.to_thread(ensure_wav_16k_mono, tmp_in, tmp_wav)
323
-
324
- # model = ASR_MODEL or await asyncio.to_thread(load_model, 30)
325
- # text = await asyncio.to_thread(transcribe_file, model, tmp_wav, 30.0, 5.0)
326
- # chunks = await asyncio.to_thread(transcribe_file_chunks, model, tmp_wav, 30.0, 5.0)
327
-
328
- # # NLP pipeline: normalize, extract keywords, then summary and mindmap
329
- # nlp = await normalize_and_extract(text)
330
- # normalized_text = nlp.get("normalized_text", text)
331
- # keywords = nlp.get("keywords", [])
332
-
333
- # summary = await generate_summary(normalized_text)
334
- # mindmap = await generate_mindmap(normalized_text)
335
-
336
- # info2 = get_audio_info(tmp_wav) or {}
337
-
338
- # await note_service.save_transcript(
339
- # note_id=note_id,
340
- # raw_text=text,
341
- # normalized_text=normalized_text,
342
- # keywords=keywords,
343
- # summary=summary,
344
- # mindmap=mindmap,
345
- # duration=info2.get("duration"),
346
- # sample_rate=info2.get("samplerate"),
347
- # chunks=chunks,
348
- # asr_model="PhoWhisper-base",
349
- # normalization_model="gemini-1.5"
350
- # )
351
-
352
- # duration = time.perf_counter() - start_time
353
- # logging.info(f"/transcribe-url success note_id={note_id} duration={duration:.2f}s audio_dur={info2.get('duration')}")
354
- # return JSONResponse(status_code=200, content={
355
- # "note_id": note_id,
356
- # "status": "transcribed",
357
- # "duration": info2.get("duration")
358
- # })
359
- # except HTTPException:
360
- # raise
361
- # except Exception as e:
362
- # logging.exception(f"/transcribe-url failed note_id={note_id}")
363
- # raise HTTPException(status_code=500, detail=f"Transcription failed: {e}")
364
- # finally:
365
- # for p in [tmp_in, tmp_wav]:
366
- # try:
367
- # if p and os.path.exists(p):
368
- # os.remove(p)
369
- # except Exception:
370
- # pass
 
7
  from pathlib import Path
8
  from typing import Optional
9
  import time
10
+ from app.core.audio_utils import (
11
+ save_upload_file,
12
+ get_audio_info,
13
+ ensure_wav_16k_mono,
14
+ make_temp_path,
15
+ download_file_from_url
16
+ )
17
+ from app.core.asr_engine import (
18
+ load_model,
19
+ transcribe_file,
20
+ transcribe_file_chunks
21
+ )
22
  from app.config import settings
 
 
23
  from app.services.note_client import NoteServiceClient
24
  from rq import Queue
25
  from app.infra.redis_client import redis_client
 
29
  REQUEST_COUNT,
30
  REQUEST_LATENCY,
31
  ASR_DURATION,
 
 
32
  )
33
 
34
  router = APIRouter()
 
 
 
35
  ASR_MODEL = None
36
 
37
  @router.on_event("startup")
 
103
  model = ASR_MODEL or await asyncio.to_thread(load_model, 30)
104
  with ASR_DURATION.labels(endpoint).time():
105
  text = await asyncio.to_thread(transcribe_file, model, tmp_wav, 30.0, 5.0)
106
+ chunks = await asyncio.to_thread(transcribe_file_chunks, model, tmp_wav, 30.0, 5.0)
107
+ # 🔥 DROP invalid chunks
108
+ chunks = [
109
+ c for c in chunks
110
+ if c.get("text", "").strip() and c.get("end", 0) > c.get("start", 0)
111
+ ]
112
+ note_status = "transcribed" if chunks and any(c.get("text", "").strip() for c in chunks) else "error"
 
 
 
 
 
113
 
114
  info2 = get_audio_info(tmp_wav) or {}
115
  # persist to Note Service (async HTTP)
116
  payload = {
117
  "note_id": note_id,
118
+ "type": "audio",
119
+ "status": note_status,
120
  "raw_text": text,
121
+ "metadata": {
122
+ "audio": {
123
+ "duration": info2.get("duration"),
124
+ "sample_rate": info2.get("samplerate"),
125
+ "chunks": chunks,
126
+ "asr_model": "PhoWhisper-base"
127
+ }
128
+ },
129
+ "generate": ["normalize", "keywords", "summary", "mindmap"]
 
130
  }
131
+ logging.info(
132
+ "Create audio note note_id=%s status=%s chunks=%d text_len=%d",
133
+ note_id,
134
+ note_status,
135
+ len(chunks) if chunks else 0,
136
+ len(text or ""),
137
+ )
138
+ await note_service.create_audio_note(payload)
139
 
140
  duration = time.perf_counter() - start_time
141
  logging.info(f"/transcribe success note_id={note_id} duration={duration:.2f}s audio_dur={info2.get('duration')}")
 
144
  status_code=200,
145
  content={
146
  "note_id": note_id,
147
+ "status": note_status,
148
  "duration": info2.get("duration"),
149
  },
150
  )
151
 
 
 
 
 
 
 
 
 
 
 
 
152
  finally:
153
  # cleanup
154
  for p in [tmp_in, tmp_wav]:
 
230
  chunks = await asyncio.to_thread(
231
  transcribe_file_chunks, model, tmp_wav, 30.0, 5.0
232
  )
233
+ # 🔥 DROP invalid chunks
234
+ chunks = [
235
+ c for c in chunks
236
+ if c.get("text", "").strip() and c.get("end", 0) > c.get("start", 0)
237
+ ]
238
 
239
+ note_status = "transcribed" if chunks and any(c.get("text", "").strip() for c in chunks) else "error"
 
 
 
 
 
 
 
240
 
241
  # 5. Persist to Note Service
242
  payload = {
243
  "note_id": note_id,
244
+ "type": "audio",
245
+ "status": note_status,
246
  "raw_text": text,
247
+ "metadata": {
248
+ "audio": {
249
+ "duration": info.get("duration"),
250
+ "sample_rate": info.get("samplerate"),
251
+ "chunks": chunks,
252
+ "asr_model": "PhoWhisper-base"
253
+ }
254
+ },
255
+ "generate": ["normalize", "keywords", "summary", "mindmap"]
 
256
  }
257
+
258
+ logging.info(
259
+ "Create audio note note_id=%s status=%s chunks=%d text_len=%d",
260
+ note_id,
261
+ note_status,
262
+ len(chunks) if chunks else 0,
263
+ len(text or ""),
264
+ )
265
+ await note_service.create_audio_note(payload)
266
 
267
  duration = time.perf_counter() - start_time
268
  logging.info(
 
275
  status_code=200,
276
  content={
277
  "note_id": note_id,
278
+ "status": note_status,
279
  "duration": info.get("duration"),
280
  },
281
  )
282
 
 
 
 
 
 
 
 
 
 
 
 
283
  finally:
284
  for p in [tmp_in, tmp_wav]:
285
  try:
286
  if p and os.path.exists(p):
287
  os.remove(p)
288
  except Exception:
289
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/config/settings.py CHANGED
@@ -1,34 +1,25 @@
1
- # App settings and configuration
2
-
3
  import os
4
 
5
- # Limits & model setting
6
- MAX_UPLOAD_BYTES = int(os.getenv("MAX_UPLOAD_BYTES", 100 * 1024 * 1024)) # 100 MB
7
- MAX_DURATION_SECS = int(os.getenv("MAX_DURATION_SECS", 60 * 60)) # 1 hour
8
- MODEL_NAME = os.getenv("MODEL_NAME", "vinai/PhoWhisper-base") # change if desired
9
 
10
- # Temporary storage
11
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/uploads")
12
  os.makedirs(TMP_DIR, exist_ok=True)
13
 
14
- # Cloud credentials (set as HF Spaces secrets or env)
15
- # FIREBASE_SERVICE_ACCOUNT = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON") # optional
16
- # CLOUDINARY_URL = os.getenv("CLOUDINARY_URL") # optional
17
-
18
- # Gemini API Key (for text normalization)
19
- GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
20
  GEMINI_MODEL = os.getenv("GEMINI_MODEL", "")
21
 
22
- # External services
23
- NOTE_SERVICE_URL = os.getenv(
24
- "NOTE_SERVICE_URL",
25
- "https://bichnhan2701-NoteServicesAPI.hf.space"
26
- )
27
- # HTTP timeouts
28
- HTTPX_TIMEOUT = float(os.getenv("HTTPX_TIMEOUT", "10.0"))
29
 
30
- # Redis URL
31
- REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
32
- REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
33
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
34
  REDIS_DB = int(os.getenv("REDIS_DB", "0"))
 
 
 
 
 
 
 
 
 
 
1
  import os
2
 
3
+ MAX_UPLOAD_BYTES = int(os.getenv("MAX_UPLOAD_BYTES", 100 * 1024 * 1024))
4
+ MAX_DURATION_SECS = int(os.getenv("MAX_DURATION_SECS", 60 * 60))
5
+ MODEL_NAME = os.getenv("MODEL_NAME", "vinai/PhoWhisper-base")
 
6
 
 
7
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/uploads")
8
  os.makedirs(TMP_DIR, exist_ok=True)
9
 
 
 
 
 
 
 
10
  GEMINI_MODEL = os.getenv("GEMINI_MODEL", "")
11
 
12
+ NOTE_SERVICE_URL = os.getenv("NOTE_SERVICE_URL")
13
+ if not NOTE_SERVICE_URL:
14
+ raise RuntimeError("NOTE_SERVICE_URL must be set")
 
 
 
 
15
 
16
+ REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")
 
 
17
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
18
  REDIS_DB = int(os.getenv("REDIS_DB", "0"))
19
+
20
+ REDIS_URL = os.getenv(
21
+ "REDIS_URL",
22
+ f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
23
+ )
24
+
25
+ HTTPX_TIMEOUT = float(os.getenv("HTTPX_TIMEOUT", "10.0"))
app/core/asr_engine.py CHANGED
@@ -3,6 +3,11 @@
3
  import logging
4
  from transformers import pipeline
5
  from app.config.settings import MODEL_NAME
 
 
 
 
 
6
 
7
  _model = None
8
 
@@ -17,6 +22,51 @@ def load_model(chunk_length_s: int = None):
17
  logging.info("Model loaded")
18
  return _model
19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  # Heuristic merge for chunked transcripts
21
  def merge_transcripts(prev_text: str, new_text: str, max_overlap_words: int = 8) -> str:
22
  if not prev_text:
@@ -41,14 +91,10 @@ def merge_transcripts(prev_text: str, new_text: str, max_overlap_words: int = 8)
41
  return prev_text.rstrip() + " " + new_text.lstrip()
42
 
43
  def transcribe_long_audio(model, wav_path: str, chunk_length_s: float = 30.0, overlap_s: float = 5.0, parallel: bool = False) -> str:
44
- from app.core.chunking import split_audio_to_chunks
45
- from app.core.audio_utils import make_temp_path
46
- import os
47
  chunks = split_audio_to_chunks(wav_path, chunk_length_s=chunk_length_s, overlap_s=overlap_s)
48
  logging.info(f"Split into {len(chunks)} chunks")
49
  texts = []
50
  if parallel:
51
- from concurrent.futures import ThreadPoolExecutor, as_completed
52
  def process_chunk(path):
53
  try:
54
  out = model(path)
@@ -80,7 +126,6 @@ def transcribe_long_audio(model, wav_path: str, chunk_length_s: float = 30.0, ov
80
  return merged
81
 
82
  def transcribe_file(model, wav_path: str, max_chunk_length: float = 30.0, overlap_s: float = 5.0):
83
- from app.core.audio_utils import get_audio_info
84
  info = get_audio_info(wav_path) or {}
85
  duration = info.get("duration", 0.0)
86
  if duration and duration > max_chunk_length * 1.1:
@@ -91,33 +136,54 @@ def transcribe_file(model, wav_path: str, max_chunk_length: float = 30.0, overla
91
  return out.get("text") or ""
92
  return str(out)
93
 
94
- def transcribe_file_chunks(model, wav_path: str, max_chunk_length: float = 30.0, overlap_s: float = 5.0):
95
- from app.core.audio_utils import get_audio_info, make_temp_path
96
- from app.core.chunking import ffmpeg_extract_segment
97
- import os
 
 
98
  info = get_audio_info(wav_path) or {}
99
  duration = info.get("duration", 0.0)
 
100
  step = max_chunk_length - overlap_s
101
  if step <= 0:
102
  raise ValueError("max_chunk_length must be > overlap_s")
 
103
  starts = []
104
  t = 0.0
105
  while t < duration:
106
  starts.append(t)
107
  t += step
108
- results = []
 
 
109
  for i, s in enumerate(starts):
110
  chunk_end = min(s + max_chunk_length, duration)
111
  dst = make_temp_path(suffix=f".chunk{i}.wav")
 
112
  ffmpeg_extract_segment(wav_path, s, chunk_end - s, dst)
 
113
  out = model(dst)
114
- if isinstance(out, dict):
115
- text = out.get("text", "")
116
- else:
117
- text = str(out)
118
- results.append({"start": s, "end": chunk_end, "text": text})
 
 
 
119
  try:
120
  os.remove(dst)
121
  except Exception:
122
  pass
123
- return results
 
 
 
 
 
 
 
 
 
 
 
3
  import logging
4
  from transformers import pipeline
5
  from app.config.settings import MODEL_NAME
6
+ from app.core.chunking import split_audio_to_chunks, ffmpeg_extract_segment
7
+ from app.core.audio_utils import make_temp_path
8
+ import os
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
+ from app.core.audio_utils import get_audio_info, make_temp_path
11
 
12
  _model = None
13
 
 
22
  logging.info("Model loaded")
23
  return _model
24
 
25
+ def merge_chunks(chunks, max_overlap_words=12):
26
+ merged = []
27
+
28
+ for ch in chunks:
29
+ if not merged:
30
+ merged.append(ch)
31
+ continue
32
+
33
+ prev = merged[-1]
34
+ merged_text = merge_transcripts(
35
+ prev["text"],
36
+ ch["text"],
37
+ max_overlap_words=max_overlap_words
38
+ )
39
+
40
+ if merged_text != prev["text"]:
41
+ prev["text"] = merged_text
42
+ prev["end"] = ch["end"]
43
+ else:
44
+ merged.append(ch)
45
+
46
+ return merged
47
+
48
+ def normalize_chunks(chunks):
49
+ normalized = []
50
+ last_end = 0.0
51
+
52
+ for ch in chunks:
53
+ start = max(ch["start"], last_end)
54
+ end = max(start, ch["end"])
55
+
56
+ text = ch["text"].strip()
57
+ if not text:
58
+ continue
59
+
60
+ normalized.append({
61
+ "start": round(start, 3),
62
+ "end": round(end, 3),
63
+ "text": text
64
+ })
65
+
66
+ last_end = end
67
+
68
+ return normalized
69
+
70
  # Heuristic merge for chunked transcripts
71
  def merge_transcripts(prev_text: str, new_text: str, max_overlap_words: int = 8) -> str:
72
  if not prev_text:
 
91
  return prev_text.rstrip() + " " + new_text.lstrip()
92
 
93
  def transcribe_long_audio(model, wav_path: str, chunk_length_s: float = 30.0, overlap_s: float = 5.0, parallel: bool = False) -> str:
 
 
 
94
  chunks = split_audio_to_chunks(wav_path, chunk_length_s=chunk_length_s, overlap_s=overlap_s)
95
  logging.info(f"Split into {len(chunks)} chunks")
96
  texts = []
97
  if parallel:
 
98
  def process_chunk(path):
99
  try:
100
  out = model(path)
 
126
  return merged
127
 
128
  def transcribe_file(model, wav_path: str, max_chunk_length: float = 30.0, overlap_s: float = 5.0):
 
129
  info = get_audio_info(wav_path) or {}
130
  duration = info.get("duration", 0.0)
131
  if duration and duration > max_chunk_length * 1.1:
 
136
  return out.get("text") or ""
137
  return str(out)
138
 
139
+ def transcribe_file_chunks(
140
+ model,
141
+ wav_path: str,
142
+ max_chunk_length: float = 30.0,
143
+ overlap_s: float = 5.0,
144
+ ):
145
  info = get_audio_info(wav_path) or {}
146
  duration = info.get("duration", 0.0)
147
+
148
  step = max_chunk_length - overlap_s
149
  if step <= 0:
150
  raise ValueError("max_chunk_length must be > overlap_s")
151
+
152
  starts = []
153
  t = 0.0
154
  while t < duration:
155
  starts.append(t)
156
  t += step
157
+
158
+ raw_chunks = []
159
+
160
  for i, s in enumerate(starts):
161
  chunk_end = min(s + max_chunk_length, duration)
162
  dst = make_temp_path(suffix=f".chunk{i}.wav")
163
+
164
  ffmpeg_extract_segment(wav_path, s, chunk_end - s, dst)
165
+
166
  out = model(dst)
167
+ text = out.get("text", "") if isinstance(out, dict) else str(out)
168
+
169
+ raw_chunks.append({
170
+ "start": s,
171
+ "end": chunk_end,
172
+ "text": text
173
+ })
174
+
175
  try:
176
  os.remove(dst)
177
  except Exception:
178
  pass
179
+
180
+ # 🔽 CHUỖI XỬ LÝ CHUẨN
181
+ merged = merge_chunks(raw_chunks)
182
+ normalized = normalize_chunks(merged)
183
+ logging.info(
184
+ "ASR result: raw=%d merged=%d normalized=%d",
185
+ len(raw_chunks),
186
+ len(merged),
187
+ len(normalized),
188
+ )
189
+ return normalized
app/infra/metrics.py CHANGED
@@ -1,4 +1,6 @@
1
- from prometheus_client import Counter, Histogram
 
 
2
 
3
  REQUEST_COUNT = Counter(
4
  "asr_requests_total",
@@ -6,7 +8,6 @@ REQUEST_COUNT = Counter(
6
  ["endpoint", "status"]
7
  )
8
 
9
-
10
  REQUEST_LATENCY = Histogram(
11
  "asr_request_latency_seconds",
12
  "ASR request latency",
@@ -19,14 +20,7 @@ ASR_DURATION = Histogram(
19
  ["endpoint"]
20
  )
21
 
22
- NORMALIZE_DURATION = Histogram(
23
- "normalize_duration_seconds",
24
- "Text normalization duration",
25
- ["endpoint"]
26
- )
27
-
28
- ERROR_COUNT = Counter(
29
- "asr_error_total",
30
- "Total ASR errors",
31
- ["endpoint", "error_type"]
32
- )
 
1
+ from prometheus_client import Counter, Histogram, generate_latest
2
+ from fastapi import FastAPI
3
+ from fastapi.responses import Response
4
 
5
  REQUEST_COUNT = Counter(
6
  "asr_requests_total",
 
8
  ["endpoint", "status"]
9
  )
10
 
 
11
  REQUEST_LATENCY = Histogram(
12
  "asr_request_latency_seconds",
13
  "ASR request latency",
 
20
  ["endpoint"]
21
  )
22
 
23
+ def setup_metrics(app: FastAPI):
24
+ @app.get("/metrics")
25
+ def metrics():
26
+ return Response(generate_latest(), media_type="text/plain")
 
 
 
 
 
 
 
app/infra/redis_client.py CHANGED
@@ -1,22 +1,7 @@
1
- # import os
2
- # import redis
3
- # from app.config.settings import REDIS_URL
4
-
5
- # redis_client = redis.Redis.from_url(
6
- # REDIS_URL,
7
- # decode_responses=True
8
- # )
9
-
10
- import os
11
  import redis
12
- from app.config.settings import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_URL
13
-
14
 
15
- if REDIS_URL:
16
- redis_client = redis.from_url(REDIS_URL)
17
- else:
18
- redis_client = redis.Redis(
19
- host=REDIS_HOST,
20
- port=REDIS_PORT,
21
- db=REDIS_DB,
22
- )
 
 
 
 
 
 
 
 
 
 
 
1
  import redis
2
+ from app.config.settings import REDIS_URL
 
3
 
4
+ redis_client = redis.from_url(
5
+ REDIS_URL,
6
+ decode_responses=True,
7
+ )
 
 
 
 
app/jobs/transcribe_job.py CHANGED
@@ -1,65 +1,41 @@
1
  import asyncio
2
- from app.core.asr_engine import load_model, transcribe_file
3
  from app.services.note_client import NoteServiceClient
4
- from app.services.nlp_postprocess import normalize_and_extract
5
 
6
- def transcribe_job(tmp_wav: str, note_id: str):
7
  model = load_model()
8
- raw_text = transcribe_file(model, tmp_wav, 30.0, 5.0)
9
 
10
- nlp = asyncio.run(normalize_and_extract(raw_text))
 
 
11
 
12
- payload = {
13
- "note_id": note_id,
14
- "raw_text": raw_text,
15
- "normalized_text": nlp["normalized_text"],
16
- "keywords": nlp["keywords"],
17
- "chunks": [],
18
- "duration": None,
19
- "sample_rate": None,
20
- "asr_model": "PhoWhisper-base",
21
- "normalization_model": "gemini-1.5",
22
- "generate": ["summary", "mindmap"]
23
- }
24
-
25
- note_service = NoteServiceClient()
26
- asyncio.run(note_service.save_transcript(payload))
27
- return True
28
 
29
- # from app.core.asr_engine import load_model, transcribe_file
30
- # from app.services.note_client import NoteServiceClient
31
- # from app.services.nlp_postprocess import normalize_and_extract
32
- # from app.services.summary_service import generate_summary
33
- # from app.services.mindmap_service import generate_mindmap
34
 
35
- # # This function will be run by RQ worker
36
- # def transcribe_job(tmp_wav: str, note_id: str):
37
- # model = load_model()
38
- # raw_text = transcribe_file(model, tmp_wav, 30.0, 5.0)
39
- # nlp = asyncio.run(normalize_and_extract(raw_text))
40
- # normalized = nlp["normalized_text"]
41
- # keywords = nlp["keywords"]
42
 
43
- # summary = asyncio.run(generate_summary(normalized))
44
- # mindmap = asyncio.run(generate_mindmap(normalized))
45
-
46
- # note_service = NoteServiceClient()
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
- # # normalize_text có thể là async, nhưng RQ chỉ chạy sync nên cần chạy event loop nếu cần
49
- # import asyncio
50
- # asyncio.run(
51
- # note_service.save_transcript(
52
- # note_id=note_id,
53
- # raw_text=raw_text,
54
- # normalized_text=normalized,
55
- # keywords=keywords,
56
- # summary=summary,
57
- # mindmap=mindmap,
58
- # duration=None,
59
- # sample_rate=None,
60
- # chunks=None,
61
- # asr_model="PhoWhisper-base",
62
- # normalization_model="gemini-1.5",
63
- # )
64
- # )
65
- # return True
 
1
  import asyncio
2
+ from app.core.asr_engine import load_model, transcribe_file, transcribe_file_chunks
3
  from app.services.note_client import NoteServiceClient
4
+ from app.core.audio_utils import get_audio_info
5
 
6
+ def transcribe_job(wav_path: str, note_id: str, user_id: str | None = None):
7
  model = load_model()
 
8
 
9
+ # 🔥 ASR giống hệt API sync
10
+ text = transcribe_file(model, wav_path, 30.0, 5.0)
11
+ chunks = transcribe_file_chunks(model, wav_path, 30.0, 5.0)
12
 
13
+ # drop invalid chunks (defensive)
14
+ chunks = [
15
+ c for c in chunks
16
+ if c.get("text", "").strip() and c.get("end", 0) > c.get("start", 0)
17
+ ]
 
 
 
 
 
 
 
 
 
 
 
18
 
19
+ note_status = "transcribed" if chunks else "error"
 
 
 
 
20
 
21
+ info = get_audio_info(wav_path) or {}
 
 
 
 
 
 
22
 
23
+ payload = {
24
+ "note_id": note_id,
25
+ "type": "audio",
26
+ "status": note_status,
27
+ "raw_text": text,
28
+ "metadata": {
29
+ "audio": {
30
+ "duration": info.get("duration"),
31
+ "sample_rate": info.get("samplerate"),
32
+ "chunks": chunks,
33
+ "asr_model": "PhoWhisper-base",
34
+ },
35
+ "client": {"user_id": user_id},
36
+ },
37
+ "generate": ["normalize", "keywords", "summary", "mindmap"],
38
+ }
39
 
40
+ client = NoteServiceClient()
41
+ asyncio.run(client.create_audio_note(payload))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/main.py CHANGED
@@ -1,48 +1,12 @@
1
-
2
- from fastapi import FastAPI, Response
3
- from prometheus_client import generate_latest
4
- import asyncio
5
- import logging
6
- from fastapi.middleware.cors import CORSMiddleware
7
  from app.api.transcribe import router as transcribe_router
8
- from app.core.asr_engine import load_model
9
-
10
-
11
- app = FastAPI(title="PhoWhisper ASR API")
12
-
13
- # Preload ASR model at startup
14
- @app.on_event("startup")
15
- async def preload_asr_model():
16
- # Load model in thread to avoid blocking event loop
17
- logging.info("Preloading ASR model at startup...")
18
- await asyncio.to_thread(load_model, 30)
19
- logging.info("ASR model preloaded.")
20
-
21
-
22
- # CORS — tighten in prod
23
- app.add_middleware(
24
- CORSMiddleware,
25
- allow_origins=["*"],
26
- allow_methods=["GET","POST","OPTIONS"],
27
- allow_headers=["*"],
28
- )
29
 
 
30
 
31
- # --- OLD LOGIC: Đã chuyển sang app/api/transcribe.py ---
32
- # - Định nghĩa endpoint trực tiếp
33
- # - Chứa toàn bộ logic xử lý
34
- # - Đã refactor thành router riêng và tách core/service
35
 
36
- # Health check (có thể giữ lại nếu muốn)
37
  @app.get("/health")
38
  def health():
39
  return {"status": "ok"}
40
-
41
-
42
- # Expose /metrics endpoint for Prometheus
43
- @app.get("/metrics")
44
- def metrics():
45
- return Response(generate_latest(), media_type="text/plain")
46
-
47
- # Include API routers
48
- app.include_router(transcribe_router)
 
1
+ from fastapi import FastAPI
 
 
 
 
 
2
  from app.api.transcribe import router as transcribe_router
3
+ from app.infra.metrics import setup_metrics
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
+ app = FastAPI(title="PhoWhisper ASR Service")
6
 
7
+ setup_metrics(app)
8
+ app.include_router(transcribe_router)
 
 
9
 
 
10
  @app.get("/health")
11
  def health():
12
  return {"status": "ok"}
 
 
 
 
 
 
 
 
 
app/schemas/transcribe.py CHANGED
@@ -1,12 +1,5 @@
1
- # Request/Response models for transcription
2
-
3
  from pydantic import BaseModel
4
- from typing import List, Optional
5
-
6
- class Chunk(BaseModel):
7
- start: float
8
- end: float
9
- text: str
10
 
11
  class TranscribeResponse(BaseModel):
12
  note_id: str
 
 
 
1
  from pydantic import BaseModel
2
+ from typing import Optional
 
 
 
 
 
3
 
4
  class TranscribeResponse(BaseModel):
5
  note_id: str
app/services/mindmap_service.py DELETED
@@ -1,56 +0,0 @@
1
- # import asyncio, json
2
- # from app.config.settings import GEMINI_API_KEY
3
- # import google.generativeai as genai
4
-
5
- # if GEMINI_API_KEY:
6
- # genai.configure(api_key=GEMINI_API_KEY)
7
- # _model = genai.GenerativeModel("gemini-pro")
8
- # else:
9
- # _model = None
10
-
11
-
12
- # async def generate_mindmap(text: str) -> dict:
13
- # if not _model:
14
- # return {}
15
-
16
- # prompt = f"""
17
- # Bạn là chuyên gia tạo Sơ đồ tư duy. Hãy phân tích văn bản sau và tạo cấu trúc JSON Mindmap.
18
- # Yêu cầu:
19
- # 1. Xác định Ý chính làm Root.
20
- # 2. Phân tách ý phụ thành nhánh con (tối đa 3 cấp).
21
- # 3. Nhãn (label) ngắn gọn (< 7 từ).
22
- # 4. Màu sắc (colorHex): Root="#6200EE", Con="#F59E2B", "#2ECF9A", "#2F9BFF".
23
-
24
- # Cấu trúc JSON bắt buộc (Chỉ trả về JSON):
25
- # {{
26
- # "root": {{
27
- # "label": "Chủ đề",
28
- # "colorHex": "#6200EE",
29
- # "children": [
30
- # {{
31
- # "label": "Ý 1",
32
- # "colorHex": "#F59E2B",
33
- # "children": []
34
- # }}
35
- # ]
36
- # }}
37
- # }}
38
-
39
- # Văn bản:
40
- # {text}
41
- # """
42
-
43
- # loop = asyncio.get_event_loop()
44
-
45
- # def call():
46
- # r = _model.generate_content(prompt)
47
- # return r.text
48
-
49
- # raw = await loop.run_in_executor(None, call)
50
-
51
- # start = raw.find("{")
52
- # end = raw.rfind("}")
53
- # if start != -1 and end != -1:
54
- # return json.loads(raw[start:end+1])
55
-
56
- # return {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/nlp_postprocess.py DELETED
@@ -1,156 +0,0 @@
1
- import asyncio
2
- import json
3
- import logging
4
- import random
5
- import time
6
-
7
- from app.infra.redis_client import redis_client
8
- from app.utils.hashing import sha256
9
- from app.config.settings import GEMINI_API_KEY, GEMINI_MODEL
10
-
11
- # New official client
12
- try:
13
- import google.genai as genai
14
- from google.api_core.exceptions import GoogleAPIError # optional but useful
15
- except Exception:
16
- genai = None
17
- # fallback exception type so except GoogleAPIError still works
18
- class GoogleAPIError(Exception):
19
- pass
20
-
21
- CACHE_TTL = 60 * 60 * 24 * 3 # 3 days
22
- # Retry settings for transient model errors (503 / UNAVAILABLE)
23
- RETRY_MAX_ATTEMPTS = 3
24
- RETRY_BASE_BACKOFF = 1.0
25
-
26
- # Tạo client Gemini nếu có API key
27
- _gemini_client = None
28
- _GEMINI_MODEL = GEMINI_MODEL
29
-
30
- if GEMINI_API_KEY and genai is not None:
31
- try:
32
- _gemini_client = genai.Client(api_key=GEMINI_API_KEY)
33
- logging.info(f"[nlp_postprocess] Initialized google.genai client with model={_GEMINI_MODEL}")
34
- except Exception as e:
35
- logging.exception(f"[nlp_postprocess] Failed to init google.genai client: {e}")
36
- _gemini_client = None
37
- elif GEMINI_API_KEY and genai is None:
38
- logging.warning("[nlp_postprocess] google.genai package not installed; GEMINI API disabled")
39
- else:
40
- logging.warning("[nlp_postprocess] GEMINI_API_KEY is not set, using raw_text as normalization fallback")
41
-
42
-
43
- async def normalize_and_extract(raw_text: str) -> dict:
44
- """
45
- return {
46
- "normalized_text": "...",
47
- "keywords": [...]
48
- }
49
- """
50
- cache_key = f"nlp:{sha256(raw_text)}"
51
-
52
- # 1) Try get from Redis cache (best effort)
53
- try:
54
- cached = redis_client.get(cache_key)
55
- if cached:
56
- return json.loads(cached)
57
- except Exception as e:
58
- logging.warning(f"[nlp_postprocess] Redis GET failed, skip cache: {e}")
59
-
60
- # 2) Default fallback result (if no model or error)
61
- result = {
62
- "normalized_text": raw_text,
63
- "keywords": [],
64
- }
65
-
66
- # 3) Call Gemini if available
67
- if _gemini_client:
68
- prompt = f"""
69
- Bạn là một hệ thống Xử lý Hậu kỳ NLP (NLP Post-Processing) Tiếng Việt.
70
- Đầu vào là văn bản thô (raw transcript), có thể thiếu dấu câu và sai chính tả do nhận dạng giọng nói.
71
-
72
- Nhiệm vụ (Trả về JSON duy nhất):
73
- 1. Sửa lỗi chính tả ASR, thêm dấu câu, viết hoa chuẩn xác, loại bỏ các từ bị lặp lại vô nghĩa.
74
- 2. Trích xuất danh sách từ khóa quan trọng (keywords) liên quan đến chủ đề, độ dài từ 1-4 từ.
75
-
76
- Văn bản đầu vào:
77
- \"\"\"{raw_text}\"\"\"
78
-
79
- Cấu trúc JSON bắt buộc (chỉ trả JSON, không giải thích thêm):
80
- {{
81
- "normalizedText": "Văn bản đã sửa hoàn chỉnh...",
82
- "keywords": ["Từ khóa 1", "Từ khóa 2", "..."]
83
- }}
84
- """
85
-
86
- loop = asyncio.get_event_loop()
87
-
88
- def call():
89
- # Nếu lỗi từ API, để try/except bên ngoài handle
90
- resp = _gemini_client.models.generate_content(
91
- model=_GEMINI_MODEL,
92
- contents=prompt,
93
- )
94
- # resp.text là chuỗi model trả (có thể chứa code block)
95
- return resp.text
96
-
97
- # Try with a small exponential backoff for transient server errors
98
- text = None
99
- attempt = 0
100
- while attempt < RETRY_MAX_ATTEMPTS:
101
- attempt += 1
102
- try:
103
- text = await loop.run_in_executor(None, call)
104
- break
105
- except Exception as e:
106
- # Try to detect transient server-side/genai errors (503 / UNAVAILABLE)
107
- is_transient = False
108
- try:
109
- # try to import genai-specific ServerError if available
110
- from google.genai import errors as _genai_errors # type: ignore
111
- ServerError = getattr(_genai_errors, "ServerError", None)
112
- except Exception:
113
- ServerError = None
114
-
115
- if ServerError is not None and isinstance(e, ServerError):
116
- is_transient = True
117
- else:
118
- msg = str(e)
119
- if "503" in msg or "UNAVAILABLE" in msg.upper() or "model is overloaded" in msg.lower():
120
- is_transient = True
121
-
122
- if is_transient and attempt < RETRY_MAX_ATTEMPTS:
123
- backoff = RETRY_BASE_BACKOFF * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
124
- logging.warning(f"[nlp_postprocess] Gemini transient error (attempt {attempt}): {e}; retrying in {backoff:.1f}s")
125
- # use asyncio.sleep to not block event loop
126
- await asyncio.sleep(backoff)
127
- continue
128
- else:
129
- logging.exception(f"[nlp_postprocess] Gemini call failed, fallback to raw_text: {e}")
130
- text = None
131
- break
132
-
133
- if text:
134
- if text:
135
- # clean JSON
136
- start = text.find("{")
137
- end = text.rfind("}")
138
- if start != -1 and end != -1:
139
- try:
140
- data = json.loads(text[start:end + 1])
141
- result = {
142
- "normalized_text": data.get("normalizedText", raw_text),
143
- "keywords": data.get("keywords", []) or [],
144
- }
145
- except Exception as e:
146
- logging.warning(f"[nlp_postprocess] Failed to parse Gemini JSON, fallback to raw_text: {e}")
147
- else:
148
- logging.warning("[nlp_postprocess] Gemini response has no JSON block, fallback to raw_text")
149
-
150
- # 4) Try write back to Redis (best effort)
151
- try:
152
- redis_client.setex(cache_key, CACHE_TTL, json.dumps(result))
153
- except Exception as e:
154
- logging.warning(f"[nlp_postprocess] Redis SETEX failed, skip cache: {e}")
155
-
156
- return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/note_client.py CHANGED
@@ -1,64 +1,38 @@
 
1
  import httpx
2
  from app.config.settings import NOTE_SERVICE_URL
3
 
4
- class NoteServiceClient:
5
- async def save_transcript(self, payload: dict):
6
- async with httpx.AsyncClient(timeout=30) as client:
7
- r = await client.post(f"{NOTE_SERVICE_URL}/notes", json=payload)
8
- r.raise_for_status()
9
- return r.json()
10
-
11
 
 
 
 
12
 
13
- # import httpx
14
- # from app.config.settings import NOTE_SERVICE_URL, HTTPX_TIMEOUT
15
- # from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
16
-
17
- # class NoteServiceClient:
18
- # def __init__(self, base_url: str = None):
19
- # self.base_url = (base_url or NOTE_SERVICE_URL).rstrip("/")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
- # @retry(
22
- # stop=stop_after_attempt(3),
23
- # wait=wait_exponential(multiplier=1, min=1, max=8),
24
- # reraise=True,
25
- # retry=retry_if_exception(
26
- # lambda e: (
27
- # isinstance(e, httpx.RequestError) or
28
- # (isinstance(e, httpx.HTTPStatusError) and 500 <= e.response.status_code < 600)
29
- # )
30
- # )
31
- # )
32
- # async def save_transcript(self, note_id: str, raw_text: str, normalized_text: str,
33
- # keywords: list, summary: str, mindmap: dict,
34
- # duration: float, sample_rate: int, chunks: list,
35
- # asr_model: str = "PhoWhisper-base",
36
- # normalization_model: str = "gemini-1.5"):
37
- # url = f"{self.base_url}/notes/{note_id}/transcript"
38
- # payload = {
39
- # "raw_text": raw_text,
40
- # "normalized_text": normalized_text,
41
- # "keywords": keywords,
42
- # "summary": summary,
43
- # "mindmap": mindmap,
44
- # "duration": duration,
45
- # "sample_rate": sample_rate,
46
- # "chunks": chunks,
47
- # "asr_model": asr_model,
48
- # "normalization_model": normalization_model
49
- # }
50
- # timeout = httpx.Timeout(HTTPX_TIMEOUT)
51
- # async with httpx.AsyncClient(timeout=timeout) as client:
52
- # try:
53
- # resp = await client.post(url, json=payload)
54
- # resp.raise_for_status()
55
- # return resp.json()
56
- # except httpx.HTTPStatusError as e:
57
- # # Chỉ retry nếu là 5xx
58
- # if 500 <= e.response.status_code < 600:
59
- # raise
60
- # else:
61
- # raise
62
- # except httpx.RequestError as e:
63
- # # Retry network errors
64
- # raise
 
1
+ import logging
2
  import httpx
3
  from app.config.settings import NOTE_SERVICE_URL
4
 
 
 
 
 
 
 
 
5
 
6
+ class NoteServiceClient:
7
+ async def create_audio_note(self, payload: dict):
8
+ """Call the Note Service to create an audio note.
9
 
10
+ This method catches HTTP errors and logs them instead of raising,
11
+ to avoid making transcription endpoints return 500 when the
12
+ Note Service is unavailable or returns 4xx/5xx.
13
+ Returns parsed JSON on success or None on failure.
14
+ """
15
+ try:
16
+ async with httpx.AsyncClient(timeout=30) as client:
17
+ r = await client.post(
18
+ f"{NOTE_SERVICE_URL}/internal/notes/audio",
19
+ json=payload,
20
+ )
21
+ r.raise_for_status()
22
+ return r.json()
23
+ except httpx.HTTPStatusError as exc:
24
+ status = getattr(exc.response, "status_code", "?")
25
+ logging.warning(
26
+ "NoteService returned HTTP %s for %s: %s",
27
+ status,
28
+ f"{NOTE_SERVICE_URL}/internal/notes/audio",
29
+ exc,
30
+ )
31
+ return None
32
+ except Exception as exc: # network errors, timeouts, etc.
33
+ logging.exception("Failed to call NoteService: %s", exc)
34
+ return None
35
 
36
+ async def save_transcript(self, payload: dict):
37
+ # alias used elsewhere in the codebase
38
+ return await self.create_audio_note(payload)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/summary_service.py DELETED
@@ -1,35 +0,0 @@
1
- # import asyncio
2
- # from app.config.settings import GEMINI_API_KEY
3
- # import google.generativeai as genai
4
-
5
- # if GEMINI_API_KEY:
6
- # genai.configure(api_key=GEMINI_API_KEY)
7
- # _model = genai.GenerativeModel("gemini-pro")
8
- # else:
9
- # _model = None
10
-
11
-
12
- # async def generate_summary(text: str) -> str:
13
- # if not _model:
14
- # return ""
15
-
16
- # prompt = f"""
17
- # Bạn là chuyên gia tóm tắt. Hãy tóm tắt văn bản sau thành **một đoạn văn duy nhất**.
18
- # Yêu cầu:
19
- # 1. Viết khoảng 3-5 câu, tổng hợp đầy đủ chủ đề và các ý chính.
20
- # 2. Viết liền mạch, KHÔNG xuống dòng, KHÔNG dùng gạch đầu dòng hay đánh số.
21
- # 3. Chỉ dựa trên thông tin được cung cấp, tuyệt đối KHÔNG tự thêm thông tin bên ngoài.
22
- # 4. Trả về văn bản thuần (plain text).
23
-
24
- # Văn bản:
25
- # \"\"\"{text}\"\"\"
26
- # """
27
-
28
- # loop = asyncio.get_event_loop()
29
-
30
- # def call():
31
- # r = _model.generate_content(prompt)
32
- # return r.text.strip()
33
-
34
- # result = await loop.run_in_executor(None, call)
35
- # return result.replace("```", "").strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/text_normalizer.py DELETED
@@ -1,74 +0,0 @@
1
- from app.infra.redis_client import redis_client
2
- from app.utils.hashing import sha256
3
-
4
- CACHE_TTL = 60 * 60 * 24 * 3 # 3 days
5
-
6
- # Simple in-memory cache (có thể thay bằng Redis, v.v. sau này)
7
- # _normalize_cache = {}
8
-
9
- # --- Gemini client (use new `google.genai` if available) ---
10
- try:
11
- import google.genai as genai
12
- from google.api_core.exceptions import GoogleAPIError # optional but useful
13
- except Exception:
14
- genai = None
15
- class GoogleAPIError(Exception):
16
- pass
17
-
18
- from app.config.settings import GEMINI_API_KEY, GEMINI_MODEL
19
-
20
- _gemini_client = None
21
- _GEMINI_MODEL = GEMINI_MODEL
22
-
23
- if GEMINI_API_KEY and genai is not None:
24
- try:
25
- _gemini_client = genai.Client(api_key=GEMINI_API_KEY)
26
- except Exception:
27
- _gemini_client = None
28
- elif GEMINI_API_KEY and genai is None:
29
- # package not installed
30
- _gemini_client = None
31
- else:
32
- _gemini_client = None
33
-
34
- async def normalize_text(raw_text: str) -> str:
35
- cache_key = f"normalize:{sha256(raw_text)}"
36
- cached = redis_client.get(cache_key)
37
- if cached:
38
- return cached
39
-
40
- prompt = f"""
41
- Bạn là hệ thống chuẩn hóa transcript tiếng Việt.
42
- - KHÔNG thêm ý mới
43
- - Giữ nguyên nội dung
44
- - Chỉ sửa chính tả, dấu câu, xuống dòng hợp lý
45
-
46
- Văn bản:
47
- {raw_text}
48
- """
49
- result = raw_text
50
- if _gemini_client:
51
- import asyncio
52
- loop = asyncio.get_event_loop()
53
-
54
- def call_gemini():
55
- resp = _gemini_client.models.generate_content(
56
- model=_GEMINI_MODEL,
57
- contents=prompt,
58
- )
59
- return resp.text if hasattr(resp, 'text') else str(resp)
60
-
61
- try:
62
- result = await loop.run_in_executor(None, call_gemini)
63
- if isinstance(result, str):
64
- result = result.strip()
65
- except GoogleAPIError:
66
- result = raw_text
67
- except Exception:
68
- result = raw_text
69
- else:
70
- # Nếu chưa cấu hình Gemini, trả về text gốc
71
- result = raw_text
72
- result = result.strip()
73
- redis_client.setex(cache_key, CACHE_TTL, result)
74
- return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/utils/hashing.py DELETED
@@ -1,7 +0,0 @@
1
-
2
- # Hashing utilities for cache keys, helpers
3
-
4
- import hashlib
5
-
6
- def sha256(text: str) -> str:
7
- return hashlib.sha256(text.encode('utf-8')).hexdigest()
 
 
 
 
 
 
 
 
start.sh ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+ set -e
3
+
4
+ echo "Starting Redis..."
5
+ redis-server --daemonize yes --save "" --appendonly no
6
+
7
+ sleep 2
8
+
9
+ echo "Starting RQ worker..."
10
+ rq worker asr --url redis://127.0.0.1:6379 &
11
+ WORKER_PID=$!
12
+
13
+ echo "Starting FastAPI..."
14
+ uvicorn app.main:app --host 0.0.0.0 --port ${PORT} &
15
+
16
+ wait $WORKER_PID
test/conftest.py DELETED
@@ -1,11 +0,0 @@
1
- import pytest
2
- import tempfile
3
- import os
4
-
5
- @pytest.fixture(autouse=True)
6
- def mock_env(monkeypatch):
7
- monkeypatch.setenv("TMP_DIR", tempfile.gettempdir())
8
- monkeypatch.setenv("MAX_UPLOAD_BYTES", "1048576")
9
- monkeypatch.setenv("MAX_DURATION_SECS", "3600")
10
- monkeypatch.setenv("NOTE_SERVICE_URL", "http://note")
11
- monkeypatch.setenv("REDIS_URL", "redis://localhost:6379/0")